buffer_store.rs

   1use crate::{
   2    lsp_store::OpenLspBufferHandle,
   3    search::SearchQuery,
   4    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   5    ProjectItem as _, ProjectPath,
   6};
   7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
   8use anyhow::{anyhow, bail, Context as _, Result};
   9use client::Client;
  10use collections::{hash_map, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{
  13    channel::oneshot,
  14    future::{OptionFuture, Shared},
  15    Future, FutureExt as _, StreamExt,
  16};
  17use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
  18use gpui::{
  19    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
  20};
  21use http_client::Url;
  22use language::{
  23    proto::{
  24        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  25        split_operations,
  26    },
  27    Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
  28};
  29use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  30use serde::Deserialize;
  31use smol::channel::Receiver;
  32use std::{
  33    io,
  34    ops::Range,
  35    path::{Path, PathBuf},
  36    pin::pin,
  37    str::FromStr as _,
  38    sync::Arc,
  39    time::Instant,
  40};
  41use text::{BufferId, Rope};
  42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
  44
  45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
  46enum ChangeSetKind {
  47    Unstaged,
  48    Uncommitted,
  49}
  50
  51/// A set of open buffers.
  52pub struct BufferStore {
  53    state: BufferStoreState,
  54    #[allow(clippy::type_complexity)]
  55    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
  56    #[allow(clippy::type_complexity)]
  57    loading_change_sets: HashMap<
  58        (BufferId, ChangeSetKind),
  59        Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>,
  60    >,
  61    worktree_store: Entity<WorktreeStore>,
  62    opened_buffers: HashMap<BufferId, OpenBuffer>,
  63    downstream_client: Option<(AnyProtoClient, u64)>,
  64    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  65}
  66
  67#[derive(Hash, Eq, PartialEq, Clone)]
  68struct SharedBuffer {
  69    buffer: Entity<Buffer>,
  70    change_set: Option<Entity<BufferChangeSet>>,
  71    lsp_handle: Option<OpenLspBufferHandle>,
  72}
  73
  74#[derive(Default)]
  75struct BufferChangeSetState {
  76    unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
  77    uncommitted_changes: Option<WeakEntity<BufferChangeSet>>,
  78    recalculate_diff_task: Option<Task<Result<()>>>,
  79    language: Option<Arc<Language>>,
  80    language_registry: Option<Arc<LanguageRegistry>>,
  81    diff_updated_futures: Vec<oneshot::Sender<()>>,
  82    buffer_subscription: Option<Subscription>,
  83
  84    head_text: Option<Arc<String>>,
  85    index_text: Option<Arc<String>>,
  86    head_changed: bool,
  87    index_changed: bool,
  88}
  89
  90#[derive(Clone, Debug)]
  91enum DiffBasesChange {
  92    SetIndex(Option<String>),
  93    SetHead(Option<String>),
  94    SetEach {
  95        index: Option<String>,
  96        head: Option<String>,
  97    },
  98    SetBoth(Option<String>),
  99}
 100
 101impl BufferChangeSetState {
 102    fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
 103        self.language = buffer.read(cx).language().cloned();
 104        self.index_changed = self.index_text.is_some();
 105        self.head_changed = self.head_text.is_some();
 106        let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
 107    }
 108
 109    fn unstaged_changes(&self) -> Option<Entity<BufferChangeSet>> {
 110        self.unstaged_changes.as_ref().and_then(|set| set.upgrade())
 111    }
 112
 113    fn uncommitted_changes(&self) -> Option<Entity<BufferChangeSet>> {
 114        self.uncommitted_changes
 115            .as_ref()
 116            .and_then(|set| set.upgrade())
 117    }
 118
 119    fn handle_base_texts_updated(
 120        &mut self,
 121        buffer: text::BufferSnapshot,
 122        message: proto::UpdateDiffBases,
 123        cx: &mut Context<Self>,
 124    ) {
 125        use proto::update_diff_bases::Mode;
 126
 127        let Some(mode) = Mode::from_i32(message.mode) else {
 128            return;
 129        };
 130
 131        let diff_bases_change = match mode {
 132            Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
 133            Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
 134            Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.staged_text),
 135            Mode::IndexAndHead => DiffBasesChange::SetEach {
 136                index: message.staged_text,
 137                head: message.committed_text,
 138            },
 139        };
 140
 141        let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
 142    }
 143
 144    fn diff_bases_changed(
 145        &mut self,
 146        buffer: text::BufferSnapshot,
 147        diff_bases_change: DiffBasesChange,
 148        cx: &mut Context<Self>,
 149    ) -> oneshot::Receiver<()> {
 150        match diff_bases_change {
 151            DiffBasesChange::SetIndex(index) => {
 152                self.index_text = index.map(|mut text| {
 153                    text::LineEnding::normalize(&mut text);
 154                    Arc::new(text)
 155                });
 156                self.index_changed = true;
 157            }
 158            DiffBasesChange::SetHead(head) => {
 159                self.head_text = head.map(|mut text| {
 160                    text::LineEnding::normalize(&mut text);
 161                    Arc::new(text)
 162                });
 163                self.head_changed = true;
 164            }
 165            DiffBasesChange::SetBoth(mut text) => {
 166                if let Some(text) = text.as_mut() {
 167                    text::LineEnding::normalize(text);
 168                }
 169                self.head_text = text.map(Arc::new);
 170                self.index_text = self.head_text.clone();
 171                self.head_changed = true;
 172                self.index_changed = true;
 173            }
 174            DiffBasesChange::SetEach { index, head } => {
 175                self.index_text = index.map(|mut text| {
 176                    text::LineEnding::normalize(&mut text);
 177                    Arc::new(text)
 178                });
 179                self.head_text = head.map(|mut text| {
 180                    text::LineEnding::normalize(&mut text);
 181                    Arc::new(text)
 182                });
 183                self.head_changed = true;
 184                self.index_changed = true;
 185            }
 186        }
 187
 188        self.recalculate_diffs(buffer, cx)
 189    }
 190
 191    fn recalculate_diffs(
 192        &mut self,
 193        buffer: text::BufferSnapshot,
 194        cx: &mut Context<Self>,
 195    ) -> oneshot::Receiver<()> {
 196        let (tx, rx) = oneshot::channel();
 197        self.diff_updated_futures.push(tx);
 198
 199        let language = self.language.clone();
 200        let language_registry = self.language_registry.clone();
 201        let unstaged_changes = self.unstaged_changes();
 202        let uncommitted_changes = self.uncommitted_changes();
 203        let head = self.head_text.clone();
 204        let index = self.index_text.clone();
 205        let index_changed = self.index_changed;
 206        let head_changed = self.head_changed;
 207        let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
 208            (Some(index), Some(head)) => Arc::ptr_eq(index, head),
 209            (None, None) => true,
 210            _ => false,
 211        };
 212        self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
 213            let snapshot = if index_changed {
 214                let snapshot = cx.update(|cx| {
 215                    index.as_ref().map(|head| {
 216                        language::Buffer::build_snapshot(
 217                            Rope::from(head.as_str()),
 218                            language.clone(),
 219                            language_registry.clone(),
 220                            cx,
 221                        )
 222                    })
 223                })?;
 224                cx.background_executor()
 225                    .spawn(OptionFuture::from(snapshot))
 226                    .await
 227            } else if let Some(unstaged_changes) = &unstaged_changes {
 228                unstaged_changes.read_with(&cx, |change_set, _| change_set.base_text.clone())?
 229            } else if let Some(uncommitted_changes) = &uncommitted_changes {
 230                uncommitted_changes
 231                    .read_with(&cx, |change_set, _| change_set.staged_text.clone())?
 232            } else {
 233                return Ok(());
 234            };
 235
 236            if let Some(unstaged_changes) = &unstaged_changes {
 237                let diff = cx
 238                    .background_executor()
 239                    .spawn({
 240                        let buffer = buffer.clone();
 241                        async move {
 242                            BufferDiff::build(index.as_ref().map(|index| index.as_str()), &buffer)
 243                        }
 244                    })
 245                    .await;
 246
 247                unstaged_changes.update(&mut cx, |unstaged_changes, cx| {
 248                    unstaged_changes.set_state(snapshot.clone(), diff, &buffer, cx);
 249                })?;
 250
 251                if let Some(uncommitted_changes) = &uncommitted_changes {
 252                    uncommitted_changes.update(&mut cx, |uncommitted_changes, _| {
 253                        uncommitted_changes.staged_text = snapshot;
 254                    })?;
 255                }
 256            }
 257
 258            if let Some(uncommitted_changes) = &uncommitted_changes {
 259                let (snapshot, diff) = if let (Some(unstaged_changes), true) =
 260                    (&unstaged_changes, index_matches_head)
 261                {
 262                    unstaged_changes.read_with(&cx, |change_set, _| {
 263                        (
 264                            change_set.base_text.clone(),
 265                            change_set.diff_to_buffer.clone(),
 266                        )
 267                    })?
 268                } else {
 269                    let snapshot = cx.update(|cx| {
 270                        head.as_deref().map(|head| {
 271                            language::Buffer::build_snapshot(
 272                                Rope::from(head.as_str()),
 273                                language.clone(),
 274                                language_registry.clone(),
 275                                cx,
 276                            )
 277                        })
 278                    })?;
 279                    let snapshot = cx.background_executor().spawn(OptionFuture::from(snapshot));
 280                    let diff = cx.background_executor().spawn({
 281                        let buffer = buffer.clone();
 282                        let head = head.clone();
 283                        async move {
 284                            BufferDiff::build(head.as_ref().map(|head| head.as_str()), &buffer)
 285                        }
 286                    });
 287                    futures::join!(snapshot, diff)
 288                };
 289
 290                uncommitted_changes.update(&mut cx, |change_set, cx| {
 291                    change_set.set_state(snapshot, diff, &buffer, cx);
 292                })?;
 293
 294                if index_changed || head_changed {
 295                    let staged_text = uncommitted_changes
 296                        .read_with(&cx, |change_set, _| change_set.staged_text.clone())?;
 297
 298                    let diff = if index_matches_head {
 299                        staged_text.as_ref().map(|buffer| BufferDiff::new(buffer))
 300                    } else if let Some(staged_text) = staged_text {
 301                        Some(
 302                            cx.background_executor()
 303                                .spawn(async move {
 304                                    BufferDiff::build(
 305                                        head.as_ref().map(|head| head.as_str()),
 306                                        &staged_text,
 307                                    )
 308                                })
 309                                .await,
 310                        )
 311                    } else {
 312                        None
 313                    };
 314
 315                    uncommitted_changes.update(&mut cx, |change_set, _| {
 316                        change_set.staged_diff = diff;
 317                    })?;
 318                }
 319            }
 320
 321            if let Some(this) = this.upgrade() {
 322                this.update(&mut cx, |this, _| {
 323                    this.index_changed = false;
 324                    this.head_changed = false;
 325                    for tx in this.diff_updated_futures.drain(..) {
 326                        tx.send(()).ok();
 327                    }
 328                })?;
 329            }
 330
 331            Ok(())
 332        }));
 333
 334        rx
 335    }
 336}
 337
 338pub struct BufferChangeSet {
 339    pub buffer_id: BufferId,
 340    pub base_text: Option<language::BufferSnapshot>,
 341    pub diff_to_buffer: BufferDiff,
 342    pub staged_text: Option<language::BufferSnapshot>,
 343    // For an uncommitted changeset, this is the diff between HEAD and the index.
 344    pub staged_diff: Option<BufferDiff>,
 345}
 346
 347impl std::fmt::Debug for BufferChangeSet {
 348    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 349        f.debug_struct("BufferChangeSet")
 350            .field("buffer_id", &self.buffer_id)
 351            .field("base_text", &self.base_text.as_ref().map(|s| s.text()))
 352            .field("diff_to_buffer", &self.diff_to_buffer)
 353            .field("staged_text", &self.staged_text.as_ref().map(|s| s.text()))
 354            .field("staged_diff", &self.staged_diff)
 355            .finish()
 356    }
 357}
 358
 359pub enum BufferChangeSetEvent {
 360    DiffChanged { changed_range: Range<text::Anchor> },
 361}
 362
 363enum BufferStoreState {
 364    Local(LocalBufferStore),
 365    Remote(RemoteBufferStore),
 366}
 367
 368struct RemoteBufferStore {
 369    shared_with_me: HashSet<Entity<Buffer>>,
 370    upstream_client: AnyProtoClient,
 371    project_id: u64,
 372    loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
 373    remote_buffer_listeners:
 374        HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
 375    worktree_store: Entity<WorktreeStore>,
 376}
 377
 378struct LocalBufferStore {
 379    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
 380    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
 381    worktree_store: Entity<WorktreeStore>,
 382    _subscription: Subscription,
 383}
 384
 385enum OpenBuffer {
 386    Complete {
 387        buffer: WeakEntity<Buffer>,
 388        change_set_state: Entity<BufferChangeSetState>,
 389    },
 390    Operations(Vec<Operation>),
 391}
 392
 393pub enum BufferStoreEvent {
 394    BufferAdded(Entity<Buffer>),
 395    BufferDropped(BufferId),
 396    BufferChangedFilePath {
 397        buffer: Entity<Buffer>,
 398        old_file: Option<Arc<dyn language::File>>,
 399    },
 400}
 401
 402#[derive(Default, Debug)]
 403pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
 404
 405impl EventEmitter<BufferStoreEvent> for BufferStore {}
 406
 407impl RemoteBufferStore {
 408    fn open_unstaged_changes(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
 409        let project_id = self.project_id;
 410        let client = self.upstream_client.clone();
 411        cx.background_executor().spawn(async move {
 412            let response = client
 413                .request(proto::OpenUnstagedChanges {
 414                    project_id,
 415                    buffer_id: buffer_id.to_proto(),
 416                })
 417                .await?;
 418            Ok(response.staged_text)
 419        })
 420    }
 421
 422    fn open_uncommitted_changes(
 423        &self,
 424        buffer_id: BufferId,
 425        cx: &App,
 426    ) -> Task<Result<DiffBasesChange>> {
 427        use proto::open_uncommitted_changes_response::Mode;
 428
 429        let project_id = self.project_id;
 430        let client = self.upstream_client.clone();
 431        cx.background_executor().spawn(async move {
 432            let response = client
 433                .request(proto::OpenUncommittedChanges {
 434                    project_id,
 435                    buffer_id: buffer_id.to_proto(),
 436                })
 437                .await?;
 438            let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
 439            let bases = match mode {
 440                Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.staged_text),
 441                Mode::IndexAndHead => DiffBasesChange::SetEach {
 442                    head: response.committed_text,
 443                    index: response.staged_text,
 444                },
 445            };
 446            Ok(bases)
 447        })
 448    }
 449
 450    pub fn wait_for_remote_buffer(
 451        &mut self,
 452        id: BufferId,
 453        cx: &mut Context<BufferStore>,
 454    ) -> Task<Result<Entity<Buffer>>> {
 455        let (tx, rx) = oneshot::channel();
 456        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 457
 458        cx.spawn(|this, cx| async move {
 459            if let Some(buffer) = this
 460                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
 461                .ok()
 462                .flatten()
 463            {
 464                return Ok(buffer);
 465            }
 466
 467            cx.background_executor()
 468                .spawn(async move { rx.await? })
 469                .await
 470        })
 471    }
 472
 473    fn save_remote_buffer(
 474        &self,
 475        buffer_handle: Entity<Buffer>,
 476        new_path: Option<proto::ProjectPath>,
 477        cx: &Context<BufferStore>,
 478    ) -> Task<Result<()>> {
 479        let buffer = buffer_handle.read(cx);
 480        let buffer_id = buffer.remote_id().into();
 481        let version = buffer.version();
 482        let rpc = self.upstream_client.clone();
 483        let project_id = self.project_id;
 484        cx.spawn(move |_, mut cx| async move {
 485            let response = rpc
 486                .request(proto::SaveBuffer {
 487                    project_id,
 488                    buffer_id,
 489                    new_path,
 490                    version: serialize_version(&version),
 491                })
 492                .await?;
 493            let version = deserialize_version(&response.version);
 494            let mtime = response.mtime.map(|mtime| mtime.into());
 495
 496            buffer_handle.update(&mut cx, |buffer, cx| {
 497                buffer.did_save(version.clone(), mtime, cx);
 498            })?;
 499
 500            Ok(())
 501        })
 502    }
 503
 504    pub fn handle_create_buffer_for_peer(
 505        &mut self,
 506        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 507        replica_id: u16,
 508        capability: Capability,
 509        cx: &mut Context<BufferStore>,
 510    ) -> Result<Option<Entity<Buffer>>> {
 511        match envelope
 512            .payload
 513            .variant
 514            .ok_or_else(|| anyhow!("missing variant"))?
 515        {
 516            proto::create_buffer_for_peer::Variant::State(mut state) => {
 517                let buffer_id = BufferId::new(state.id)?;
 518
 519                let buffer_result = maybe!({
 520                    let mut buffer_file = None;
 521                    if let Some(file) = state.file.take() {
 522                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 523                        let worktree = self
 524                            .worktree_store
 525                            .read(cx)
 526                            .worktree_for_id(worktree_id, cx)
 527                            .ok_or_else(|| {
 528                                anyhow!("no worktree found for id {}", file.worktree_id)
 529                            })?;
 530                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 531                            as Arc<dyn language::File>);
 532                    }
 533                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 534                });
 535
 536                match buffer_result {
 537                    Ok(buffer) => {
 538                        let buffer = cx.new(|_| buffer);
 539                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 540                    }
 541                    Err(error) => {
 542                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 543                            for listener in listeners {
 544                                listener.send(Err(anyhow!(error.cloned()))).ok();
 545                            }
 546                        }
 547                    }
 548                }
 549            }
 550            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 551                let buffer_id = BufferId::new(chunk.buffer_id)?;
 552                let buffer = self
 553                    .loading_remote_buffers_by_id
 554                    .get(&buffer_id)
 555                    .cloned()
 556                    .ok_or_else(|| {
 557                        anyhow!(
 558                            "received chunk for buffer {} without initial state",
 559                            chunk.buffer_id
 560                        )
 561                    })?;
 562
 563                let result = maybe!({
 564                    let operations = chunk
 565                        .operations
 566                        .into_iter()
 567                        .map(language::proto::deserialize_operation)
 568                        .collect::<Result<Vec<_>>>()?;
 569                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 570                    anyhow::Ok(())
 571                });
 572
 573                if let Err(error) = result {
 574                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 575                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 576                        for listener in listeners {
 577                            listener.send(Err(error.cloned())).ok();
 578                        }
 579                    }
 580                } else if chunk.is_last {
 581                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 582                    if self.upstream_client.is_via_collab() {
 583                        // retain buffers sent by peers to avoid races.
 584                        self.shared_with_me.insert(buffer.clone());
 585                    }
 586
 587                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 588                        for sender in senders {
 589                            sender.send(Ok(buffer.clone())).ok();
 590                        }
 591                    }
 592                    return Ok(Some(buffer));
 593                }
 594            }
 595        }
 596        return Ok(None);
 597    }
 598
 599    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 600        self.loading_remote_buffers_by_id
 601            .keys()
 602            .copied()
 603            .collect::<Vec<_>>()
 604    }
 605
 606    pub fn deserialize_project_transaction(
 607        &self,
 608        message: proto::ProjectTransaction,
 609        push_to_history: bool,
 610        cx: &mut Context<BufferStore>,
 611    ) -> Task<Result<ProjectTransaction>> {
 612        cx.spawn(|this, mut cx| async move {
 613            let mut project_transaction = ProjectTransaction::default();
 614            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 615            {
 616                let buffer_id = BufferId::new(buffer_id)?;
 617                let buffer = this
 618                    .update(&mut cx, |this, cx| {
 619                        this.wait_for_remote_buffer(buffer_id, cx)
 620                    })?
 621                    .await?;
 622                let transaction = language::proto::deserialize_transaction(transaction)?;
 623                project_transaction.0.insert(buffer, transaction);
 624            }
 625
 626            for (buffer, transaction) in &project_transaction.0 {
 627                buffer
 628                    .update(&mut cx, |buffer, _| {
 629                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 630                    })?
 631                    .await?;
 632
 633                if push_to_history {
 634                    buffer.update(&mut cx, |buffer, _| {
 635                        buffer.push_transaction(transaction.clone(), Instant::now());
 636                    })?;
 637                }
 638            }
 639
 640            Ok(project_transaction)
 641        })
 642    }
 643
 644    fn open_buffer(
 645        &self,
 646        path: Arc<Path>,
 647        worktree: Entity<Worktree>,
 648        cx: &mut Context<BufferStore>,
 649    ) -> Task<Result<Entity<Buffer>>> {
 650        let worktree_id = worktree.read(cx).id().to_proto();
 651        let project_id = self.project_id;
 652        let client = self.upstream_client.clone();
 653        let path_string = path.clone().to_string_lossy().to_string();
 654        cx.spawn(move |this, mut cx| async move {
 655            let response = client
 656                .request(proto::OpenBufferByPath {
 657                    project_id,
 658                    worktree_id,
 659                    path: path_string,
 660                })
 661                .await?;
 662            let buffer_id = BufferId::new(response.buffer_id)?;
 663
 664            let buffer = this
 665                .update(&mut cx, {
 666                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 667                })?
 668                .await?;
 669
 670            Ok(buffer)
 671        })
 672    }
 673
 674    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
 675        let create = self.upstream_client.request(proto::OpenNewBuffer {
 676            project_id: self.project_id,
 677        });
 678        cx.spawn(|this, mut cx| async move {
 679            let response = create.await?;
 680            let buffer_id = BufferId::new(response.buffer_id)?;
 681
 682            this.update(&mut cx, |this, cx| {
 683                this.wait_for_remote_buffer(buffer_id, cx)
 684            })?
 685            .await
 686        })
 687    }
 688
 689    fn reload_buffers(
 690        &self,
 691        buffers: HashSet<Entity<Buffer>>,
 692        push_to_history: bool,
 693        cx: &mut Context<BufferStore>,
 694    ) -> Task<Result<ProjectTransaction>> {
 695        let request = self.upstream_client.request(proto::ReloadBuffers {
 696            project_id: self.project_id,
 697            buffer_ids: buffers
 698                .iter()
 699                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 700                .collect(),
 701        });
 702
 703        cx.spawn(|this, mut cx| async move {
 704            let response = request
 705                .await?
 706                .transaction
 707                .ok_or_else(|| anyhow!("missing transaction"))?;
 708            this.update(&mut cx, |this, cx| {
 709                this.deserialize_project_transaction(response, push_to_history, cx)
 710            })?
 711            .await
 712        })
 713    }
 714}
 715
 716impl LocalBufferStore {
 717    fn worktree_for_buffer(
 718        &self,
 719        buffer: &Entity<Buffer>,
 720        cx: &App,
 721    ) -> Option<(Entity<Worktree>, Arc<Path>)> {
 722        let file = buffer.read(cx).file()?;
 723        let worktree_id = file.worktree_id(cx);
 724        let path = file.path().clone();
 725        let worktree = self
 726            .worktree_store
 727            .read(cx)
 728            .worktree_for_id(worktree_id, cx)?;
 729        Some((worktree, path))
 730    }
 731
 732    fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
 733        if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
 734            worktree.read(cx).load_staged_file(path.as_ref(), cx)
 735        } else {
 736            return Task::ready(Err(anyhow!("no such worktree")));
 737        }
 738    }
 739
 740    fn load_committed_text(
 741        &self,
 742        buffer: &Entity<Buffer>,
 743        cx: &App,
 744    ) -> Task<Result<Option<String>>> {
 745        if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
 746            worktree.read(cx).load_committed_file(path.as_ref(), cx)
 747        } else {
 748            Task::ready(Err(anyhow!("no such worktree")))
 749        }
 750    }
 751
 752    fn save_local_buffer(
 753        &self,
 754        buffer_handle: Entity<Buffer>,
 755        worktree: Entity<Worktree>,
 756        path: Arc<Path>,
 757        mut has_changed_file: bool,
 758        cx: &mut Context<BufferStore>,
 759    ) -> Task<Result<()>> {
 760        let buffer = buffer_handle.read(cx);
 761
 762        let text = buffer.as_rope().clone();
 763        let line_ending = buffer.line_ending();
 764        let version = buffer.version();
 765        let buffer_id = buffer.remote_id();
 766        if buffer
 767            .file()
 768            .is_some_and(|file| file.disk_state() == DiskState::New)
 769        {
 770            has_changed_file = true;
 771        }
 772
 773        let save = worktree.update(cx, |worktree, cx| {
 774            worktree.write_file(path.as_ref(), text, line_ending, cx)
 775        });
 776
 777        cx.spawn(move |this, mut cx| async move {
 778            let new_file = save.await?;
 779            let mtime = new_file.disk_state().mtime();
 780            this.update(&mut cx, |this, cx| {
 781                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 782                    if has_changed_file {
 783                        downstream_client
 784                            .send(proto::UpdateBufferFile {
 785                                project_id,
 786                                buffer_id: buffer_id.to_proto(),
 787                                file: Some(language::File::to_proto(&*new_file, cx)),
 788                            })
 789                            .log_err();
 790                    }
 791                    downstream_client
 792                        .send(proto::BufferSaved {
 793                            project_id,
 794                            buffer_id: buffer_id.to_proto(),
 795                            version: serialize_version(&version),
 796                            mtime: mtime.map(|time| time.into()),
 797                        })
 798                        .log_err();
 799                }
 800            })?;
 801            buffer_handle.update(&mut cx, |buffer, cx| {
 802                if has_changed_file {
 803                    buffer.file_updated(new_file, cx);
 804                }
 805                buffer.did_save(version.clone(), mtime, cx);
 806            })
 807        })
 808    }
 809
 810    fn subscribe_to_worktree(
 811        &mut self,
 812        worktree: &Entity<Worktree>,
 813        cx: &mut Context<BufferStore>,
 814    ) {
 815        cx.subscribe(worktree, |this, worktree, event, cx| {
 816            if worktree.read(cx).is_local() {
 817                match event {
 818                    worktree::Event::UpdatedEntries(changes) => {
 819                        Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 820                    }
 821                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 822                        Self::local_worktree_git_repos_changed(
 823                            this,
 824                            worktree.clone(),
 825                            updated_repos,
 826                            cx,
 827                        )
 828                    }
 829                    _ => {}
 830                }
 831            }
 832        })
 833        .detach();
 834    }
 835
 836    fn local_worktree_entries_changed(
 837        this: &mut BufferStore,
 838        worktree_handle: &Entity<Worktree>,
 839        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 840        cx: &mut Context<BufferStore>,
 841    ) {
 842        let snapshot = worktree_handle.read(cx).snapshot();
 843        for (path, entry_id, _) in changes {
 844            Self::local_worktree_entry_changed(
 845                this,
 846                *entry_id,
 847                path,
 848                worktree_handle,
 849                &snapshot,
 850                cx,
 851            );
 852        }
 853    }
 854
 855    fn local_worktree_git_repos_changed(
 856        this: &mut BufferStore,
 857        worktree_handle: Entity<Worktree>,
 858        changed_repos: &UpdatedGitRepositoriesSet,
 859        cx: &mut Context<BufferStore>,
 860    ) {
 861        debug_assert!(worktree_handle.read(cx).is_local());
 862
 863        let mut change_set_state_updates = Vec::new();
 864        for buffer in this.opened_buffers.values() {
 865            let OpenBuffer::Complete {
 866                buffer,
 867                change_set_state,
 868            } = buffer
 869            else {
 870                continue;
 871            };
 872            let Some(buffer) = buffer.upgrade() else {
 873                continue;
 874            };
 875            let buffer = buffer.read(cx);
 876            let Some(file) = File::from_dyn(buffer.file()) else {
 877                continue;
 878            };
 879            if file.worktree != worktree_handle {
 880                continue;
 881            }
 882            let change_set_state = change_set_state.read(cx);
 883            if changed_repos
 884                .iter()
 885                .any(|(work_dir, _)| file.path.starts_with(work_dir))
 886            {
 887                let snapshot = buffer.text_snapshot();
 888                change_set_state_updates.push((
 889                    snapshot.clone(),
 890                    file.path.clone(),
 891                    change_set_state
 892                        .unstaged_changes
 893                        .as_ref()
 894                        .and_then(|set| set.upgrade())
 895                        .is_some(),
 896                    change_set_state
 897                        .uncommitted_changes
 898                        .as_ref()
 899                        .and_then(|set| set.upgrade())
 900                        .is_some(),
 901                ))
 902            }
 903        }
 904
 905        if change_set_state_updates.is_empty() {
 906            return;
 907        }
 908
 909        cx.spawn(move |this, mut cx| async move {
 910            let snapshot =
 911                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 912            let diff_bases_changes_by_buffer = cx
 913                .background_executor()
 914                .spawn(async move {
 915                    change_set_state_updates
 916                        .into_iter()
 917                        .filter_map(
 918                            |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
 919                                let local_repo = snapshot.local_repo_for_path(&path)?;
 920                                let relative_path = local_repo.relativize(&path).ok()?;
 921                                let staged_text = if needs_staged_text {
 922                                    local_repo.repo().load_index_text(&relative_path)
 923                                } else {
 924                                    None
 925                                };
 926                                let committed_text = if needs_committed_text {
 927                                    local_repo.repo().load_committed_text(&relative_path)
 928                                } else {
 929                                    None
 930                                };
 931                                let diff_bases_change =
 932                                    match (needs_staged_text, needs_committed_text) {
 933                                        (true, true) => Some(if staged_text == committed_text {
 934                                            DiffBasesChange::SetBoth(staged_text)
 935                                        } else {
 936                                            DiffBasesChange::SetEach {
 937                                                index: staged_text,
 938                                                head: committed_text,
 939                                            }
 940                                        }),
 941                                        (true, false) => {
 942                                            Some(DiffBasesChange::SetIndex(staged_text))
 943                                        }
 944                                        (false, true) => {
 945                                            Some(DiffBasesChange::SetHead(committed_text))
 946                                        }
 947                                        (false, false) => None,
 948                                    };
 949                                Some((buffer_snapshot, diff_bases_change))
 950                            },
 951                        )
 952                        .collect::<Vec<_>>()
 953                })
 954                .await;
 955
 956            this.update(&mut cx, |this, cx| {
 957                for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
 958                    let Some(OpenBuffer::Complete {
 959                        change_set_state, ..
 960                    }) = this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
 961                    else {
 962                        continue;
 963                    };
 964                    let Some(diff_bases_change) = diff_bases_change else {
 965                        continue;
 966                    };
 967
 968                    change_set_state.update(cx, |change_set_state, cx| {
 969                        use proto::update_diff_bases::Mode;
 970
 971                        if let Some((client, project_id)) = this.downstream_client.as_ref() {
 972                            let buffer_id = buffer_snapshot.remote_id().to_proto();
 973                            let (staged_text, committed_text, mode) = match diff_bases_change
 974                                .clone()
 975                            {
 976                                DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
 977                                DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
 978                                DiffBasesChange::SetEach { index, head } => {
 979                                    (index, head, Mode::IndexAndHead)
 980                                }
 981                                DiffBasesChange::SetBoth(text) => {
 982                                    (text, None, Mode::IndexMatchesHead)
 983                                }
 984                            };
 985                            let message = proto::UpdateDiffBases {
 986                                project_id: *project_id,
 987                                buffer_id,
 988                                staged_text,
 989                                committed_text,
 990                                mode: mode as i32,
 991                            };
 992
 993                            client.send(message).log_err();
 994                        }
 995
 996                        let _ = change_set_state.diff_bases_changed(
 997                            buffer_snapshot,
 998                            diff_bases_change,
 999                            cx,
1000                        );
1001                    });
1002                }
1003            })
1004        })
1005        .detach_and_log_err(cx);
1006    }
1007
1008    fn local_worktree_entry_changed(
1009        this: &mut BufferStore,
1010        entry_id: ProjectEntryId,
1011        path: &Arc<Path>,
1012        worktree: &Entity<worktree::Worktree>,
1013        snapshot: &worktree::Snapshot,
1014        cx: &mut Context<BufferStore>,
1015    ) -> Option<()> {
1016        let project_path = ProjectPath {
1017            worktree_id: snapshot.id(),
1018            path: path.clone(),
1019        };
1020
1021        let buffer_id = {
1022            let local = this.as_local_mut()?;
1023            match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1024                Some(&buffer_id) => buffer_id,
1025                None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
1026            }
1027        };
1028
1029        let buffer = if let Some(buffer) = this.get(buffer_id) {
1030            Some(buffer)
1031        } else {
1032            this.opened_buffers.remove(&buffer_id);
1033            None
1034        };
1035
1036        let buffer = if let Some(buffer) = buffer {
1037            buffer
1038        } else {
1039            let this = this.as_local_mut()?;
1040            this.local_buffer_ids_by_path.remove(&project_path);
1041            this.local_buffer_ids_by_entry_id.remove(&entry_id);
1042            return None;
1043        };
1044
1045        let events = buffer.update(cx, |buffer, cx| {
1046            let local = this.as_local_mut()?;
1047            let file = buffer.file()?;
1048            let old_file = File::from_dyn(Some(file))?;
1049            if old_file.worktree != *worktree {
1050                return None;
1051            }
1052
1053            let snapshot_entry = old_file
1054                .entry_id
1055                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
1056                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
1057
1058            let new_file = if let Some(entry) = snapshot_entry {
1059                File {
1060                    disk_state: match entry.mtime {
1061                        Some(mtime) => DiskState::Present { mtime },
1062                        None => old_file.disk_state,
1063                    },
1064                    is_local: true,
1065                    entry_id: Some(entry.id),
1066                    path: entry.path.clone(),
1067                    worktree: worktree.clone(),
1068                    is_private: entry.is_private,
1069                }
1070            } else {
1071                File {
1072                    disk_state: DiskState::Deleted,
1073                    is_local: true,
1074                    entry_id: old_file.entry_id,
1075                    path: old_file.path.clone(),
1076                    worktree: worktree.clone(),
1077                    is_private: old_file.is_private,
1078                }
1079            };
1080
1081            if new_file == *old_file {
1082                return None;
1083            }
1084
1085            let mut events = Vec::new();
1086            if new_file.path != old_file.path {
1087                local.local_buffer_ids_by_path.remove(&ProjectPath {
1088                    path: old_file.path.clone(),
1089                    worktree_id: old_file.worktree_id(cx),
1090                });
1091                local.local_buffer_ids_by_path.insert(
1092                    ProjectPath {
1093                        worktree_id: new_file.worktree_id(cx),
1094                        path: new_file.path.clone(),
1095                    },
1096                    buffer_id,
1097                );
1098                events.push(BufferStoreEvent::BufferChangedFilePath {
1099                    buffer: cx.entity(),
1100                    old_file: buffer.file().cloned(),
1101                });
1102            }
1103
1104            if new_file.entry_id != old_file.entry_id {
1105                if let Some(entry_id) = old_file.entry_id {
1106                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
1107                }
1108                if let Some(entry_id) = new_file.entry_id {
1109                    local
1110                        .local_buffer_ids_by_entry_id
1111                        .insert(entry_id, buffer_id);
1112                }
1113            }
1114
1115            if let Some((client, project_id)) = &this.downstream_client {
1116                client
1117                    .send(proto::UpdateBufferFile {
1118                        project_id: *project_id,
1119                        buffer_id: buffer_id.to_proto(),
1120                        file: Some(new_file.to_proto(cx)),
1121                    })
1122                    .ok();
1123            }
1124
1125            buffer.file_updated(Arc::new(new_file), cx);
1126            Some(events)
1127        })?;
1128
1129        for event in events {
1130            cx.emit(event);
1131        }
1132
1133        None
1134    }
1135
1136    fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1137        let file = File::from_dyn(buffer.read(cx).file())?;
1138
1139        let remote_id = buffer.read(cx).remote_id();
1140        if let Some(entry_id) = file.entry_id {
1141            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1142                Some(_) => {
1143                    return None;
1144                }
1145                None => {
1146                    self.local_buffer_ids_by_entry_id
1147                        .insert(entry_id, remote_id);
1148                }
1149            }
1150        };
1151        self.local_buffer_ids_by_path.insert(
1152            ProjectPath {
1153                worktree_id: file.worktree_id(cx),
1154                path: file.path.clone(),
1155            },
1156            remote_id,
1157        );
1158
1159        Some(())
1160    }
1161
1162    fn save_buffer(
1163        &self,
1164        buffer: Entity<Buffer>,
1165        cx: &mut Context<BufferStore>,
1166    ) -> Task<Result<()>> {
1167        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1168            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1169        };
1170        let worktree = file.worktree.clone();
1171        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1172    }
1173
1174    fn save_buffer_as(
1175        &self,
1176        buffer: Entity<Buffer>,
1177        path: ProjectPath,
1178        cx: &mut Context<BufferStore>,
1179    ) -> Task<Result<()>> {
1180        let Some(worktree) = self
1181            .worktree_store
1182            .read(cx)
1183            .worktree_for_id(path.worktree_id, cx)
1184        else {
1185            return Task::ready(Err(anyhow!("no such worktree")));
1186        };
1187        self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1188    }
1189
1190    fn open_buffer(
1191        &self,
1192        path: Arc<Path>,
1193        worktree: Entity<Worktree>,
1194        cx: &mut Context<BufferStore>,
1195    ) -> Task<Result<Entity<Buffer>>> {
1196        let load_buffer = worktree.update(cx, |worktree, cx| {
1197            let load_file = worktree.load_file(path.as_ref(), cx);
1198            let reservation = cx.reserve_entity();
1199            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1200            cx.spawn(move |_, mut cx| async move {
1201                let loaded = load_file.await?;
1202                let text_buffer = cx
1203                    .background_executor()
1204                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1205                    .await;
1206                cx.insert_entity(reservation, |_| {
1207                    Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1208                })
1209            })
1210        });
1211
1212        cx.spawn(move |this, mut cx| async move {
1213            let buffer = match load_buffer.await {
1214                Ok(buffer) => Ok(buffer),
1215                Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1216                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1217                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1218                    Buffer::build(
1219                        text_buffer,
1220                        Some(Arc::new(File {
1221                            worktree,
1222                            path,
1223                            disk_state: DiskState::New,
1224                            entry_id: None,
1225                            is_local: true,
1226                            is_private: false,
1227                        })),
1228                        Capability::ReadWrite,
1229                    )
1230                }),
1231                Err(e) => Err(e),
1232            }?;
1233            this.update(&mut cx, |this, cx| {
1234                this.add_buffer(buffer.clone(), cx)?;
1235                let buffer_id = buffer.read(cx).remote_id();
1236                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1237                    let this = this.as_local_mut().unwrap();
1238                    this.local_buffer_ids_by_path.insert(
1239                        ProjectPath {
1240                            worktree_id: file.worktree_id(cx),
1241                            path: file.path.clone(),
1242                        },
1243                        buffer_id,
1244                    );
1245
1246                    if let Some(entry_id) = file.entry_id {
1247                        this.local_buffer_ids_by_entry_id
1248                            .insert(entry_id, buffer_id);
1249                    }
1250                }
1251
1252                anyhow::Ok(())
1253            })??;
1254
1255            Ok(buffer)
1256        })
1257    }
1258
1259    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1260        cx.spawn(|buffer_store, mut cx| async move {
1261            let buffer =
1262                cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1263            buffer_store.update(&mut cx, |buffer_store, cx| {
1264                buffer_store.add_buffer(buffer.clone(), cx).log_err();
1265            })?;
1266            Ok(buffer)
1267        })
1268    }
1269
1270    fn reload_buffers(
1271        &self,
1272        buffers: HashSet<Entity<Buffer>>,
1273        push_to_history: bool,
1274        cx: &mut Context<BufferStore>,
1275    ) -> Task<Result<ProjectTransaction>> {
1276        cx.spawn(move |_, mut cx| async move {
1277            let mut project_transaction = ProjectTransaction::default();
1278            for buffer in buffers {
1279                let transaction = buffer
1280                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1281                    .await?;
1282                buffer.update(&mut cx, |buffer, cx| {
1283                    if let Some(transaction) = transaction {
1284                        if !push_to_history {
1285                            buffer.forget_transaction(transaction.id);
1286                        }
1287                        project_transaction.0.insert(cx.entity(), transaction);
1288                    }
1289                })?;
1290            }
1291
1292            Ok(project_transaction)
1293        })
1294    }
1295}
1296
1297impl BufferStore {
1298    pub fn init(client: &AnyProtoClient) {
1299        client.add_entity_message_handler(Self::handle_buffer_reloaded);
1300        client.add_entity_message_handler(Self::handle_buffer_saved);
1301        client.add_entity_message_handler(Self::handle_update_buffer_file);
1302        client.add_entity_request_handler(Self::handle_save_buffer);
1303        client.add_entity_request_handler(Self::handle_blame_buffer);
1304        client.add_entity_request_handler(Self::handle_reload_buffers);
1305        client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1306        client.add_entity_request_handler(Self::handle_open_unstaged_changes);
1307        client.add_entity_request_handler(Self::handle_open_uncommitted_changes);
1308        client.add_entity_message_handler(Self::handle_update_diff_bases);
1309    }
1310
1311    /// Creates a buffer store, optionally retaining its buffers.
1312    pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1313        Self {
1314            state: BufferStoreState::Local(LocalBufferStore {
1315                local_buffer_ids_by_path: Default::default(),
1316                local_buffer_ids_by_entry_id: Default::default(),
1317                worktree_store: worktree_store.clone(),
1318                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1319                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1320                        let this = this.as_local_mut().unwrap();
1321                        this.subscribe_to_worktree(worktree, cx);
1322                    }
1323                }),
1324            }),
1325            downstream_client: None,
1326            opened_buffers: Default::default(),
1327            shared_buffers: Default::default(),
1328            loading_buffers: Default::default(),
1329            loading_change_sets: Default::default(),
1330            worktree_store,
1331        }
1332    }
1333
1334    pub fn remote(
1335        worktree_store: Entity<WorktreeStore>,
1336        upstream_client: AnyProtoClient,
1337        remote_id: u64,
1338        _cx: &mut Context<Self>,
1339    ) -> Self {
1340        Self {
1341            state: BufferStoreState::Remote(RemoteBufferStore {
1342                shared_with_me: Default::default(),
1343                loading_remote_buffers_by_id: Default::default(),
1344                remote_buffer_listeners: Default::default(),
1345                project_id: remote_id,
1346                upstream_client,
1347                worktree_store: worktree_store.clone(),
1348            }),
1349            downstream_client: None,
1350            opened_buffers: Default::default(),
1351            loading_buffers: Default::default(),
1352            loading_change_sets: Default::default(),
1353            shared_buffers: Default::default(),
1354            worktree_store,
1355        }
1356    }
1357
1358    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1359        match &mut self.state {
1360            BufferStoreState::Local(state) => Some(state),
1361            _ => None,
1362        }
1363    }
1364
1365    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1366        match &mut self.state {
1367            BufferStoreState::Remote(state) => Some(state),
1368            _ => None,
1369        }
1370    }
1371
1372    fn as_remote(&self) -> Option<&RemoteBufferStore> {
1373        match &self.state {
1374            BufferStoreState::Remote(state) => Some(state),
1375            _ => None,
1376        }
1377    }
1378
1379    pub fn open_buffer(
1380        &mut self,
1381        project_path: ProjectPath,
1382        cx: &mut Context<Self>,
1383    ) -> Task<Result<Entity<Buffer>>> {
1384        if let Some(buffer) = self.get_by_path(&project_path, cx) {
1385            return Task::ready(Ok(buffer));
1386        }
1387
1388        let task = match self.loading_buffers.entry(project_path.clone()) {
1389            hash_map::Entry::Occupied(e) => e.get().clone(),
1390            hash_map::Entry::Vacant(entry) => {
1391                let path = project_path.path.clone();
1392                let Some(worktree) = self
1393                    .worktree_store
1394                    .read(cx)
1395                    .worktree_for_id(project_path.worktree_id, cx)
1396                else {
1397                    return Task::ready(Err(anyhow!("no such worktree")));
1398                };
1399                let load_buffer = match &self.state {
1400                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1401                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1402                };
1403
1404                entry
1405                    .insert(
1406                        cx.spawn(move |this, mut cx| async move {
1407                            let load_result = load_buffer.await;
1408                            this.update(&mut cx, |this, _cx| {
1409                                // Record the fact that the buffer is no longer loading.
1410                                this.loading_buffers.remove(&project_path);
1411                            })
1412                            .ok();
1413                            load_result.map_err(Arc::new)
1414                        })
1415                        .shared(),
1416                    )
1417                    .clone()
1418            }
1419        };
1420
1421        cx.background_executor()
1422            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1423    }
1424
1425    pub fn open_unstaged_changes(
1426        &mut self,
1427        buffer: Entity<Buffer>,
1428        cx: &mut Context<Self>,
1429    ) -> Task<Result<Entity<BufferChangeSet>>> {
1430        let buffer_id = buffer.read(cx).remote_id();
1431        if let Some(change_set) = self.get_unstaged_changes(buffer_id, cx) {
1432            return Task::ready(Ok(change_set));
1433        }
1434
1435        let task = match self
1436            .loading_change_sets
1437            .entry((buffer_id, ChangeSetKind::Unstaged))
1438        {
1439            hash_map::Entry::Occupied(e) => e.get().clone(),
1440            hash_map::Entry::Vacant(entry) => {
1441                let staged_text = match &self.state {
1442                    BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1443                    BufferStoreState::Remote(this) => this.open_unstaged_changes(buffer_id, cx),
1444                };
1445
1446                entry
1447                    .insert(
1448                        cx.spawn(move |this, cx| async move {
1449                            Self::open_change_set_internal(
1450                                this,
1451                                ChangeSetKind::Unstaged,
1452                                staged_text.await.map(DiffBasesChange::SetIndex),
1453                                buffer,
1454                                cx,
1455                            )
1456                            .await
1457                            .map_err(Arc::new)
1458                        })
1459                        .shared(),
1460                    )
1461                    .clone()
1462            }
1463        };
1464
1465        cx.background_executor()
1466            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1467    }
1468
1469    pub fn open_uncommitted_changes(
1470        &mut self,
1471        buffer: Entity<Buffer>,
1472        cx: &mut Context<Self>,
1473    ) -> Task<Result<Entity<BufferChangeSet>>> {
1474        let buffer_id = buffer.read(cx).remote_id();
1475        if let Some(change_set) = self.get_uncommitted_changes(buffer_id, cx) {
1476            return Task::ready(Ok(change_set));
1477        }
1478
1479        let task = match self
1480            .loading_change_sets
1481            .entry((buffer_id, ChangeSetKind::Uncommitted))
1482        {
1483            hash_map::Entry::Occupied(e) => e.get().clone(),
1484            hash_map::Entry::Vacant(entry) => {
1485                let changes = match &self.state {
1486                    BufferStoreState::Local(this) => {
1487                        let committed_text = this.load_committed_text(&buffer, cx);
1488                        let staged_text = this.load_staged_text(&buffer, cx);
1489                        cx.background_executor().spawn(async move {
1490                            let committed_text = committed_text.await?;
1491                            let staged_text = staged_text.await?;
1492                            let diff_bases_change = if committed_text == staged_text {
1493                                DiffBasesChange::SetBoth(committed_text)
1494                            } else {
1495                                DiffBasesChange::SetEach {
1496                                    index: staged_text,
1497                                    head: committed_text,
1498                                }
1499                            };
1500                            Ok(diff_bases_change)
1501                        })
1502                    }
1503                    BufferStoreState::Remote(this) => this.open_uncommitted_changes(buffer_id, cx),
1504                };
1505
1506                entry
1507                    .insert(
1508                        cx.spawn(move |this, cx| async move {
1509                            Self::open_change_set_internal(
1510                                this,
1511                                ChangeSetKind::Uncommitted,
1512                                changes.await,
1513                                buffer,
1514                                cx,
1515                            )
1516                            .await
1517                            .map_err(Arc::new)
1518                        })
1519                        .shared(),
1520                    )
1521                    .clone()
1522            }
1523        };
1524
1525        cx.background_executor()
1526            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1527    }
1528
1529    #[cfg(any(test, feature = "test-support"))]
1530    pub fn set_unstaged_change_set(
1531        &mut self,
1532        buffer_id: BufferId,
1533        change_set: Entity<BufferChangeSet>,
1534    ) {
1535        self.loading_change_sets.insert(
1536            (buffer_id, ChangeSetKind::Unstaged),
1537            Task::ready(Ok(change_set)).shared(),
1538        );
1539    }
1540
1541    async fn open_change_set_internal(
1542        this: WeakEntity<Self>,
1543        kind: ChangeSetKind,
1544        texts: Result<DiffBasesChange>,
1545        buffer: Entity<Buffer>,
1546        mut cx: AsyncApp,
1547    ) -> Result<Entity<BufferChangeSet>> {
1548        let diff_bases_change = match texts {
1549            Err(e) => {
1550                this.update(&mut cx, |this, cx| {
1551                    let buffer_id = buffer.read(cx).remote_id();
1552                    this.loading_change_sets.remove(&(buffer_id, kind));
1553                })?;
1554                return Err(e);
1555            }
1556            Ok(change) => change,
1557        };
1558
1559        this.update(&mut cx, |this, cx| {
1560            let buffer_id = buffer.read(cx).remote_id();
1561            this.loading_change_sets.remove(&(buffer_id, kind));
1562
1563            if let Some(OpenBuffer::Complete {
1564                change_set_state, ..
1565            }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1566            {
1567                change_set_state.update(cx, |change_set_state, cx| {
1568                    let buffer_id = buffer.read(cx).remote_id();
1569                    change_set_state.buffer_subscription.get_or_insert_with(|| {
1570                        cx.subscribe(&buffer, |this, buffer, event, cx| match event {
1571                            BufferEvent::LanguageChanged => {
1572                                this.buffer_language_changed(buffer, cx)
1573                            }
1574                            _ => {}
1575                        })
1576                    });
1577
1578                    let change_set = cx.new(|cx| BufferChangeSet {
1579                        buffer_id,
1580                        base_text: None,
1581                        diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
1582                        staged_text: None,
1583                        staged_diff: None,
1584                    });
1585                    match kind {
1586                        ChangeSetKind::Unstaged => {
1587                            change_set_state.unstaged_changes = Some(change_set.downgrade())
1588                        }
1589                        ChangeSetKind::Uncommitted => {
1590                            change_set_state.uncommitted_changes = Some(change_set.downgrade())
1591                        }
1592                    };
1593
1594                    let buffer = buffer.read(cx).text_snapshot();
1595                    let rx = change_set_state.diff_bases_changed(buffer, diff_bases_change, cx);
1596
1597                    Ok(async move {
1598                        rx.await.ok();
1599                        Ok(change_set)
1600                    })
1601                })
1602            } else {
1603                Err(anyhow!("buffer was closed"))
1604            }
1605        })??
1606        .await
1607    }
1608
1609    pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1610        match &self.state {
1611            BufferStoreState::Local(this) => this.create_buffer(cx),
1612            BufferStoreState::Remote(this) => this.create_buffer(cx),
1613        }
1614    }
1615
1616    pub fn save_buffer(
1617        &mut self,
1618        buffer: Entity<Buffer>,
1619        cx: &mut Context<Self>,
1620    ) -> Task<Result<()>> {
1621        match &mut self.state {
1622            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1623            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1624        }
1625    }
1626
1627    pub fn save_buffer_as(
1628        &mut self,
1629        buffer: Entity<Buffer>,
1630        path: ProjectPath,
1631        cx: &mut Context<Self>,
1632    ) -> Task<Result<()>> {
1633        let old_file = buffer.read(cx).file().cloned();
1634        let task = match &self.state {
1635            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1636            BufferStoreState::Remote(this) => {
1637                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1638            }
1639        };
1640        cx.spawn(|this, mut cx| async move {
1641            task.await?;
1642            this.update(&mut cx, |_, cx| {
1643                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1644            })
1645        })
1646    }
1647
1648    pub fn blame_buffer(
1649        &self,
1650        buffer: &Entity<Buffer>,
1651        version: Option<clock::Global>,
1652        cx: &App,
1653    ) -> Task<Result<Option<Blame>>> {
1654        let buffer = buffer.read(cx);
1655        let Some(file) = File::from_dyn(buffer.file()) else {
1656            return Task::ready(Err(anyhow!("buffer has no file")));
1657        };
1658
1659        match file.worktree.clone().read(cx) {
1660            Worktree::Local(worktree) => {
1661                let worktree = worktree.snapshot();
1662                let blame_params = maybe!({
1663                    let local_repo = match worktree.local_repo_for_path(&file.path) {
1664                        Some(repo_for_path) => repo_for_path,
1665                        None => return Ok(None),
1666                    };
1667
1668                    let relative_path = local_repo
1669                        .relativize(&file.path)
1670                        .context("failed to relativize buffer path")?;
1671
1672                    let repo = local_repo.repo().clone();
1673
1674                    let content = match version {
1675                        Some(version) => buffer.rope_for_version(&version).clone(),
1676                        None => buffer.as_rope().clone(),
1677                    };
1678
1679                    anyhow::Ok(Some((repo, relative_path, content)))
1680                });
1681
1682                cx.background_executor().spawn(async move {
1683                    let Some((repo, relative_path, content)) = blame_params? else {
1684                        return Ok(None);
1685                    };
1686                    repo.blame(&relative_path, content)
1687                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1688                        .map(Some)
1689                })
1690            }
1691            Worktree::Remote(worktree) => {
1692                let buffer_id = buffer.remote_id();
1693                let version = buffer.version();
1694                let project_id = worktree.project_id();
1695                let client = worktree.client();
1696                cx.spawn(|_| async move {
1697                    let response = client
1698                        .request(proto::BlameBuffer {
1699                            project_id,
1700                            buffer_id: buffer_id.into(),
1701                            version: serialize_version(&version),
1702                        })
1703                        .await?;
1704                    Ok(deserialize_blame_buffer_response(response))
1705                })
1706            }
1707        }
1708    }
1709
1710    pub fn get_permalink_to_line(
1711        &self,
1712        buffer: &Entity<Buffer>,
1713        selection: Range<u32>,
1714        cx: &App,
1715    ) -> Task<Result<url::Url>> {
1716        let buffer = buffer.read(cx);
1717        let Some(file) = File::from_dyn(buffer.file()) else {
1718            return Task::ready(Err(anyhow!("buffer has no file")));
1719        };
1720
1721        match file.worktree.read(cx) {
1722            Worktree::Local(worktree) => {
1723                let worktree_path = worktree.abs_path().clone();
1724                let Some((repo_entry, repo)) =
1725                    worktree.repository_for_path(file.path()).and_then(|entry| {
1726                        let repo = worktree.get_local_repo(&entry)?.repo().clone();
1727                        Some((entry, repo))
1728                    })
1729                else {
1730                    // If we're not in a Git repo, check whether this is a Rust source
1731                    // file in the Cargo registry (presumably opened with go-to-definition
1732                    // from a normal Rust file). If so, we can put together a permalink
1733                    // using crate metadata.
1734                    if !buffer
1735                        .language()
1736                        .is_some_and(|lang| lang.name() == "Rust".into())
1737                    {
1738                        return Task::ready(Err(anyhow!("no permalink available")));
1739                    }
1740                    let file_path = worktree_path.join(file.path());
1741                    return cx.spawn(|cx| async move {
1742                        let provider_registry =
1743                            cx.update(GitHostingProviderRegistry::default_global)?;
1744                        get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1745                            .map_err(|_| anyhow!("no permalink available"))
1746                    });
1747                };
1748
1749                let path = match repo_entry.relativize(file.path()) {
1750                    Ok(RepoPath(path)) => path,
1751                    Err(e) => return Task::ready(Err(e)),
1752                };
1753
1754                cx.spawn(|cx| async move {
1755                    const REMOTE_NAME: &str = "origin";
1756                    let origin_url = repo
1757                        .remote_url(REMOTE_NAME)
1758                        .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1759
1760                    let sha = repo
1761                        .head_sha()
1762                        .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1763
1764                    let provider_registry =
1765                        cx.update(GitHostingProviderRegistry::default_global)?;
1766
1767                    let (provider, remote) =
1768                        parse_git_remote_url(provider_registry, &origin_url)
1769                            .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1770
1771                    let path = path
1772                        .to_str()
1773                        .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1774
1775                    Ok(provider.build_permalink(
1776                        remote,
1777                        BuildPermalinkParams {
1778                            sha: &sha,
1779                            path,
1780                            selection: Some(selection),
1781                        },
1782                    ))
1783                })
1784            }
1785            Worktree::Remote(worktree) => {
1786                let buffer_id = buffer.remote_id();
1787                let project_id = worktree.project_id();
1788                let client = worktree.client();
1789                cx.spawn(|_| async move {
1790                    let response = client
1791                        .request(proto::GetPermalinkToLine {
1792                            project_id,
1793                            buffer_id: buffer_id.into(),
1794                            selection: Some(proto::Range {
1795                                start: selection.start as u64,
1796                                end: selection.end as u64,
1797                            }),
1798                        })
1799                        .await?;
1800
1801                    url::Url::parse(&response.permalink).context("failed to parse permalink")
1802                })
1803            }
1804        }
1805    }
1806
1807    fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1808        let remote_id = buffer.read(cx).remote_id();
1809        let is_remote = buffer.read(cx).replica_id() != 0;
1810        let open_buffer = OpenBuffer::Complete {
1811            buffer: buffer.downgrade(),
1812            change_set_state: cx.new(|_| BufferChangeSetState::default()),
1813        };
1814
1815        let handle = cx.entity().downgrade();
1816        buffer.update(cx, move |_, cx| {
1817            cx.on_release(move |buffer, cx| {
1818                handle
1819                    .update(cx, |_, cx| {
1820                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1821                    })
1822                    .ok();
1823            })
1824            .detach()
1825        });
1826
1827        match self.opened_buffers.entry(remote_id) {
1828            hash_map::Entry::Vacant(entry) => {
1829                entry.insert(open_buffer);
1830            }
1831            hash_map::Entry::Occupied(mut entry) => {
1832                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1833                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1834                } else if entry.get().upgrade().is_some() {
1835                    if is_remote {
1836                        return Ok(());
1837                    } else {
1838                        debug_panic!("buffer {} was already registered", remote_id);
1839                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1840                    }
1841                }
1842                entry.insert(open_buffer);
1843            }
1844        }
1845
1846        cx.subscribe(&buffer, Self::on_buffer_event).detach();
1847        cx.emit(BufferStoreEvent::BufferAdded(buffer));
1848        Ok(())
1849    }
1850
1851    pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1852        self.opened_buffers
1853            .values()
1854            .filter_map(|buffer| buffer.upgrade())
1855    }
1856
1857    pub fn loading_buffers(
1858        &self,
1859    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1860        self.loading_buffers.iter().map(|(path, task)| {
1861            let task = task.clone();
1862            (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1863        })
1864    }
1865
1866    pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1867        self.buffers().find_map(|buffer| {
1868            let file = File::from_dyn(buffer.read(cx).file())?;
1869            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1870                Some(buffer)
1871            } else {
1872                None
1873            }
1874        })
1875    }
1876
1877    pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1878        self.opened_buffers.get(&buffer_id)?.upgrade()
1879    }
1880
1881    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1882        self.get(buffer_id)
1883            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1884    }
1885
1886    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1887        self.get(buffer_id).or_else(|| {
1888            self.as_remote()
1889                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1890        })
1891    }
1892
1893    pub fn get_unstaged_changes(
1894        &self,
1895        buffer_id: BufferId,
1896        cx: &App,
1897    ) -> Option<Entity<BufferChangeSet>> {
1898        if let OpenBuffer::Complete {
1899            change_set_state, ..
1900        } = self.opened_buffers.get(&buffer_id)?
1901        {
1902            change_set_state
1903                .read(cx)
1904                .unstaged_changes
1905                .as_ref()?
1906                .upgrade()
1907        } else {
1908            None
1909        }
1910    }
1911
1912    pub fn get_uncommitted_changes(
1913        &self,
1914        buffer_id: BufferId,
1915        cx: &App,
1916    ) -> Option<Entity<BufferChangeSet>> {
1917        if let OpenBuffer::Complete {
1918            change_set_state, ..
1919        } = self.opened_buffers.get(&buffer_id)?
1920        {
1921            change_set_state
1922                .read(cx)
1923                .uncommitted_changes
1924                .as_ref()?
1925                .upgrade()
1926        } else {
1927            None
1928        }
1929    }
1930
1931    pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1932        let buffers = self
1933            .buffers()
1934            .map(|buffer| {
1935                let buffer = buffer.read(cx);
1936                proto::BufferVersion {
1937                    id: buffer.remote_id().into(),
1938                    version: language::proto::serialize_version(&buffer.version),
1939                }
1940            })
1941            .collect();
1942        let incomplete_buffer_ids = self
1943            .as_remote()
1944            .map(|remote| remote.incomplete_buffer_ids())
1945            .unwrap_or_default();
1946        (buffers, incomplete_buffer_ids)
1947    }
1948
1949    pub fn disconnected_from_host(&mut self, cx: &mut App) {
1950        for open_buffer in self.opened_buffers.values_mut() {
1951            if let Some(buffer) = open_buffer.upgrade() {
1952                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1953            }
1954        }
1955
1956        for buffer in self.buffers() {
1957            buffer.update(cx, |buffer, cx| {
1958                buffer.set_capability(Capability::ReadOnly, cx)
1959            });
1960        }
1961
1962        if let Some(remote) = self.as_remote_mut() {
1963            // Wake up all futures currently waiting on a buffer to get opened,
1964            // to give them a chance to fail now that we've disconnected.
1965            remote.remote_buffer_listeners.clear()
1966        }
1967    }
1968
1969    pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1970        self.downstream_client = Some((downstream_client, remote_id));
1971    }
1972
1973    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1974        self.downstream_client.take();
1975        self.forget_shared_buffers();
1976    }
1977
1978    pub fn discard_incomplete(&mut self) {
1979        self.opened_buffers
1980            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1981    }
1982
1983    pub fn find_search_candidates(
1984        &mut self,
1985        query: &SearchQuery,
1986        mut limit: usize,
1987        fs: Arc<dyn Fs>,
1988        cx: &mut Context<Self>,
1989    ) -> Receiver<Entity<Buffer>> {
1990        let (tx, rx) = smol::channel::unbounded();
1991        let mut open_buffers = HashSet::default();
1992        let mut unnamed_buffers = Vec::new();
1993        for handle in self.buffers() {
1994            let buffer = handle.read(cx);
1995            if let Some(entry_id) = buffer.entry_id(cx) {
1996                open_buffers.insert(entry_id);
1997            } else {
1998                limit = limit.saturating_sub(1);
1999                unnamed_buffers.push(handle)
2000            };
2001        }
2002
2003        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
2004        let project_paths_rx = self
2005            .worktree_store
2006            .update(cx, |worktree_store, cx| {
2007                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
2008            })
2009            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
2010
2011        cx.spawn(|this, mut cx| async move {
2012            for buffer in unnamed_buffers {
2013                tx.send(buffer).await.ok();
2014            }
2015
2016            let mut project_paths_rx = pin!(project_paths_rx);
2017            while let Some(project_paths) = project_paths_rx.next().await {
2018                let buffers = this.update(&mut cx, |this, cx| {
2019                    project_paths
2020                        .into_iter()
2021                        .map(|project_path| this.open_buffer(project_path, cx))
2022                        .collect::<Vec<_>>()
2023                })?;
2024                for buffer_task in buffers {
2025                    if let Some(buffer) = buffer_task.await.log_err() {
2026                        if tx.send(buffer).await.is_err() {
2027                            return anyhow::Ok(());
2028                        }
2029                    }
2030                }
2031            }
2032            anyhow::Ok(())
2033        })
2034        .detach();
2035        rx
2036    }
2037
2038    pub fn recalculate_buffer_diffs(
2039        &mut self,
2040        buffers: Vec<Entity<Buffer>>,
2041        cx: &mut Context<Self>,
2042    ) -> impl Future<Output = ()> {
2043        let mut futures = Vec::new();
2044        for buffer in buffers {
2045            if let Some(OpenBuffer::Complete {
2046                change_set_state, ..
2047            }) = self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
2048            {
2049                let buffer = buffer.read(cx).text_snapshot();
2050                futures.push(change_set_state.update(cx, |change_set_state, cx| {
2051                    change_set_state.recalculate_diffs(buffer, cx)
2052                }));
2053            }
2054        }
2055        async move {
2056            futures::future::join_all(futures).await;
2057        }
2058    }
2059
2060    fn on_buffer_event(
2061        &mut self,
2062        buffer: Entity<Buffer>,
2063        event: &BufferEvent,
2064        cx: &mut Context<Self>,
2065    ) {
2066        match event {
2067            BufferEvent::FileHandleChanged => {
2068                if let Some(local) = self.as_local_mut() {
2069                    local.buffer_changed_file(buffer, cx);
2070                }
2071            }
2072            BufferEvent::Reloaded => {
2073                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2074                    return;
2075                };
2076                let buffer = buffer.read(cx);
2077                downstream_client
2078                    .send(proto::BufferReloaded {
2079                        project_id: *project_id,
2080                        buffer_id: buffer.remote_id().to_proto(),
2081                        version: serialize_version(&buffer.version()),
2082                        mtime: buffer.saved_mtime().map(|t| t.into()),
2083                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2084                    })
2085                    .log_err();
2086            }
2087            _ => {}
2088        }
2089    }
2090
2091    pub async fn handle_update_buffer(
2092        this: Entity<Self>,
2093        envelope: TypedEnvelope<proto::UpdateBuffer>,
2094        mut cx: AsyncApp,
2095    ) -> Result<proto::Ack> {
2096        let payload = envelope.payload.clone();
2097        let buffer_id = BufferId::new(payload.buffer_id)?;
2098        let ops = payload
2099            .operations
2100            .into_iter()
2101            .map(language::proto::deserialize_operation)
2102            .collect::<Result<Vec<_>, _>>()?;
2103        this.update(&mut cx, |this, cx| {
2104            match this.opened_buffers.entry(buffer_id) {
2105                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2106                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2107                    OpenBuffer::Complete { buffer, .. } => {
2108                        if let Some(buffer) = buffer.upgrade() {
2109                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2110                        }
2111                    }
2112                },
2113                hash_map::Entry::Vacant(e) => {
2114                    e.insert(OpenBuffer::Operations(ops));
2115                }
2116            }
2117            Ok(proto::Ack {})
2118        })?
2119    }
2120
2121    pub fn register_shared_lsp_handle(
2122        &mut self,
2123        peer_id: proto::PeerId,
2124        buffer_id: BufferId,
2125        handle: OpenLspBufferHandle,
2126    ) {
2127        if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2128            if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2129                buffer.lsp_handle = Some(handle);
2130                return;
2131            }
2132        }
2133        debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2134    }
2135
2136    pub fn handle_synchronize_buffers(
2137        &mut self,
2138        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2139        cx: &mut Context<Self>,
2140        client: Arc<Client>,
2141    ) -> Result<proto::SynchronizeBuffersResponse> {
2142        let project_id = envelope.payload.project_id;
2143        let mut response = proto::SynchronizeBuffersResponse {
2144            buffers: Default::default(),
2145        };
2146        let Some(guest_id) = envelope.original_sender_id else {
2147            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2148        };
2149
2150        self.shared_buffers.entry(guest_id).or_default().clear();
2151        for buffer in envelope.payload.buffers {
2152            let buffer_id = BufferId::new(buffer.id)?;
2153            let remote_version = language::proto::deserialize_version(&buffer.version);
2154            if let Some(buffer) = self.get(buffer_id) {
2155                self.shared_buffers
2156                    .entry(guest_id)
2157                    .or_default()
2158                    .entry(buffer_id)
2159                    .or_insert_with(|| SharedBuffer {
2160                        buffer: buffer.clone(),
2161                        change_set: None,
2162                        lsp_handle: None,
2163                    });
2164
2165                let buffer = buffer.read(cx);
2166                response.buffers.push(proto::BufferVersion {
2167                    id: buffer_id.into(),
2168                    version: language::proto::serialize_version(&buffer.version),
2169                });
2170
2171                let operations = buffer.serialize_ops(Some(remote_version), cx);
2172                let client = client.clone();
2173                if let Some(file) = buffer.file() {
2174                    client
2175                        .send(proto::UpdateBufferFile {
2176                            project_id,
2177                            buffer_id: buffer_id.into(),
2178                            file: Some(file.to_proto(cx)),
2179                        })
2180                        .log_err();
2181                }
2182
2183                // TODO(max): do something
2184                // client
2185                //     .send(proto::UpdateStagedText {
2186                //         project_id,
2187                //         buffer_id: buffer_id.into(),
2188                //         diff_base: buffer.diff_base().map(ToString::to_string),
2189                //     })
2190                //     .log_err();
2191
2192                client
2193                    .send(proto::BufferReloaded {
2194                        project_id,
2195                        buffer_id: buffer_id.into(),
2196                        version: language::proto::serialize_version(buffer.saved_version()),
2197                        mtime: buffer.saved_mtime().map(|time| time.into()),
2198                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2199                            as i32,
2200                    })
2201                    .log_err();
2202
2203                cx.background_executor()
2204                    .spawn(
2205                        async move {
2206                            let operations = operations.await;
2207                            for chunk in split_operations(operations) {
2208                                client
2209                                    .request(proto::UpdateBuffer {
2210                                        project_id,
2211                                        buffer_id: buffer_id.into(),
2212                                        operations: chunk,
2213                                    })
2214                                    .await?;
2215                            }
2216                            anyhow::Ok(())
2217                        }
2218                        .log_err(),
2219                    )
2220                    .detach();
2221            }
2222        }
2223        Ok(response)
2224    }
2225
2226    pub fn handle_create_buffer_for_peer(
2227        &mut self,
2228        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2229        replica_id: u16,
2230        capability: Capability,
2231        cx: &mut Context<Self>,
2232    ) -> Result<()> {
2233        let Some(remote) = self.as_remote_mut() else {
2234            return Err(anyhow!("buffer store is not a remote"));
2235        };
2236
2237        if let Some(buffer) =
2238            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2239        {
2240            self.add_buffer(buffer, cx)?;
2241        }
2242
2243        Ok(())
2244    }
2245
2246    pub async fn handle_update_buffer_file(
2247        this: Entity<Self>,
2248        envelope: TypedEnvelope<proto::UpdateBufferFile>,
2249        mut cx: AsyncApp,
2250    ) -> Result<()> {
2251        let buffer_id = envelope.payload.buffer_id;
2252        let buffer_id = BufferId::new(buffer_id)?;
2253
2254        this.update(&mut cx, |this, cx| {
2255            let payload = envelope.payload.clone();
2256            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2257                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2258                let worktree = this
2259                    .worktree_store
2260                    .read(cx)
2261                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2262                    .ok_or_else(|| anyhow!("no such worktree"))?;
2263                let file = File::from_proto(file, worktree, cx)?;
2264                let old_file = buffer.update(cx, |buffer, cx| {
2265                    let old_file = buffer.file().cloned();
2266                    let new_path = file.path.clone();
2267                    buffer.file_updated(Arc::new(file), cx);
2268                    if old_file
2269                        .as_ref()
2270                        .map_or(true, |old| *old.path() != new_path)
2271                    {
2272                        Some(old_file)
2273                    } else {
2274                        None
2275                    }
2276                });
2277                if let Some(old_file) = old_file {
2278                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2279                }
2280            }
2281            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2282                downstream_client
2283                    .send(proto::UpdateBufferFile {
2284                        project_id: *project_id,
2285                        buffer_id: buffer_id.into(),
2286                        file: envelope.payload.file,
2287                    })
2288                    .log_err();
2289            }
2290            Ok(())
2291        })?
2292    }
2293
2294    pub async fn handle_save_buffer(
2295        this: Entity<Self>,
2296        envelope: TypedEnvelope<proto::SaveBuffer>,
2297        mut cx: AsyncApp,
2298    ) -> Result<proto::BufferSaved> {
2299        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2300        let (buffer, project_id) = this.update(&mut cx, |this, _| {
2301            anyhow::Ok((
2302                this.get_existing(buffer_id)?,
2303                this.downstream_client
2304                    .as_ref()
2305                    .map(|(_, project_id)| *project_id)
2306                    .context("project is not shared")?,
2307            ))
2308        })??;
2309        buffer
2310            .update(&mut cx, |buffer, _| {
2311                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2312            })?
2313            .await?;
2314        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2315
2316        if let Some(new_path) = envelope.payload.new_path {
2317            let new_path = ProjectPath::from_proto(new_path);
2318            this.update(&mut cx, |this, cx| {
2319                this.save_buffer_as(buffer.clone(), new_path, cx)
2320            })?
2321            .await?;
2322        } else {
2323            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2324                .await?;
2325        }
2326
2327        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2328            project_id,
2329            buffer_id: buffer_id.into(),
2330            version: serialize_version(buffer.saved_version()),
2331            mtime: buffer.saved_mtime().map(|time| time.into()),
2332        })
2333    }
2334
2335    pub async fn handle_close_buffer(
2336        this: Entity<Self>,
2337        envelope: TypedEnvelope<proto::CloseBuffer>,
2338        mut cx: AsyncApp,
2339    ) -> Result<()> {
2340        let peer_id = envelope.sender_id;
2341        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2342        this.update(&mut cx, |this, _| {
2343            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2344                if shared.remove(&buffer_id).is_some() {
2345                    if shared.is_empty() {
2346                        this.shared_buffers.remove(&peer_id);
2347                    }
2348                    return;
2349                }
2350            }
2351            debug_panic!(
2352                "peer_id {} closed buffer_id {} which was either not open or already closed",
2353                peer_id,
2354                buffer_id
2355            )
2356        })
2357    }
2358
2359    pub async fn handle_buffer_saved(
2360        this: Entity<Self>,
2361        envelope: TypedEnvelope<proto::BufferSaved>,
2362        mut cx: AsyncApp,
2363    ) -> Result<()> {
2364        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2365        let version = deserialize_version(&envelope.payload.version);
2366        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2367        this.update(&mut cx, move |this, cx| {
2368            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2369                buffer.update(cx, |buffer, cx| {
2370                    buffer.did_save(version, mtime, cx);
2371                });
2372            }
2373
2374            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2375                downstream_client
2376                    .send(proto::BufferSaved {
2377                        project_id: *project_id,
2378                        buffer_id: buffer_id.into(),
2379                        mtime: envelope.payload.mtime,
2380                        version: envelope.payload.version,
2381                    })
2382                    .log_err();
2383            }
2384        })
2385    }
2386
2387    pub async fn handle_buffer_reloaded(
2388        this: Entity<Self>,
2389        envelope: TypedEnvelope<proto::BufferReloaded>,
2390        mut cx: AsyncApp,
2391    ) -> Result<()> {
2392        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2393        let version = deserialize_version(&envelope.payload.version);
2394        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2395        let line_ending = deserialize_line_ending(
2396            proto::LineEnding::from_i32(envelope.payload.line_ending)
2397                .ok_or_else(|| anyhow!("missing line ending"))?,
2398        );
2399        this.update(&mut cx, |this, cx| {
2400            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2401                buffer.update(cx, |buffer, cx| {
2402                    buffer.did_reload(version, line_ending, mtime, cx);
2403                });
2404            }
2405
2406            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2407                downstream_client
2408                    .send(proto::BufferReloaded {
2409                        project_id: *project_id,
2410                        buffer_id: buffer_id.into(),
2411                        mtime: envelope.payload.mtime,
2412                        version: envelope.payload.version,
2413                        line_ending: envelope.payload.line_ending,
2414                    })
2415                    .log_err();
2416            }
2417        })
2418    }
2419
2420    pub async fn handle_blame_buffer(
2421        this: Entity<Self>,
2422        envelope: TypedEnvelope<proto::BlameBuffer>,
2423        mut cx: AsyncApp,
2424    ) -> Result<proto::BlameBufferResponse> {
2425        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2426        let version = deserialize_version(&envelope.payload.version);
2427        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2428        buffer
2429            .update(&mut cx, |buffer, _| {
2430                buffer.wait_for_version(version.clone())
2431            })?
2432            .await?;
2433        let blame = this
2434            .update(&mut cx, |this, cx| {
2435                this.blame_buffer(&buffer, Some(version), cx)
2436            })?
2437            .await?;
2438        Ok(serialize_blame_buffer_response(blame))
2439    }
2440
2441    pub async fn handle_get_permalink_to_line(
2442        this: Entity<Self>,
2443        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2444        mut cx: AsyncApp,
2445    ) -> Result<proto::GetPermalinkToLineResponse> {
2446        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2447        // let version = deserialize_version(&envelope.payload.version);
2448        let selection = {
2449            let proto_selection = envelope
2450                .payload
2451                .selection
2452                .context("no selection to get permalink for defined")?;
2453            proto_selection.start as u32..proto_selection.end as u32
2454        };
2455        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2456        let permalink = this
2457            .update(&mut cx, |this, cx| {
2458                this.get_permalink_to_line(&buffer, selection, cx)
2459            })?
2460            .await?;
2461        Ok(proto::GetPermalinkToLineResponse {
2462            permalink: permalink.to_string(),
2463        })
2464    }
2465
2466    pub async fn handle_open_unstaged_changes(
2467        this: Entity<Self>,
2468        request: TypedEnvelope<proto::OpenUnstagedChanges>,
2469        mut cx: AsyncApp,
2470    ) -> Result<proto::OpenUnstagedChangesResponse> {
2471        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2472        let change_set = this
2473            .update(&mut cx, |this, cx| {
2474                let buffer = this.get(buffer_id)?;
2475                Some(this.open_unstaged_changes(buffer, cx))
2476            })?
2477            .ok_or_else(|| anyhow!("no such buffer"))?
2478            .await?;
2479        this.update(&mut cx, |this, _| {
2480            let shared_buffers = this
2481                .shared_buffers
2482                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2483                .or_default();
2484            debug_assert!(shared_buffers.contains_key(&buffer_id));
2485            if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2486                shared.change_set = Some(change_set.clone());
2487            }
2488        })?;
2489        let staged_text = change_set.read_with(&cx, |change_set, _| {
2490            change_set.base_text.as_ref().map(|buffer| buffer.text())
2491        })?;
2492        Ok(proto::OpenUnstagedChangesResponse { staged_text })
2493    }
2494
2495    pub async fn handle_open_uncommitted_changes(
2496        this: Entity<Self>,
2497        request: TypedEnvelope<proto::OpenUncommittedChanges>,
2498        mut cx: AsyncApp,
2499    ) -> Result<proto::OpenUncommittedChangesResponse> {
2500        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2501        let change_set = this
2502            .update(&mut cx, |this, cx| {
2503                let buffer = this.get(buffer_id)?;
2504                Some(this.open_uncommitted_changes(buffer, cx))
2505            })?
2506            .ok_or_else(|| anyhow!("no such buffer"))?
2507            .await?;
2508        this.update(&mut cx, |this, _| {
2509            let shared_buffers = this
2510                .shared_buffers
2511                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2512                .or_default();
2513            debug_assert!(shared_buffers.contains_key(&buffer_id));
2514            if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2515                shared.change_set = Some(change_set.clone());
2516            }
2517        })?;
2518        change_set.read_with(&cx, |change_set, _| {
2519            use proto::open_uncommitted_changes_response::Mode;
2520
2521            let mode;
2522            let staged_text;
2523            let committed_text;
2524            if let Some(committed_buffer) = &change_set.base_text {
2525                committed_text = Some(committed_buffer.text());
2526                if let Some(staged_buffer) = &change_set.staged_text {
2527                    if staged_buffer.remote_id() == committed_buffer.remote_id() {
2528                        mode = Mode::IndexMatchesHead;
2529                        staged_text = None;
2530                    } else {
2531                        mode = Mode::IndexAndHead;
2532                        staged_text = Some(staged_buffer.text());
2533                    }
2534                } else {
2535                    mode = Mode::IndexAndHead;
2536                    staged_text = None;
2537                }
2538            } else {
2539                mode = Mode::IndexAndHead;
2540                committed_text = None;
2541                staged_text = change_set.staged_text.as_ref().map(|buffer| buffer.text());
2542            }
2543
2544            proto::OpenUncommittedChangesResponse {
2545                committed_text,
2546                staged_text,
2547                mode: mode.into(),
2548            }
2549        })
2550    }
2551
2552    pub async fn handle_update_diff_bases(
2553        this: Entity<Self>,
2554        request: TypedEnvelope<proto::UpdateDiffBases>,
2555        mut cx: AsyncApp,
2556    ) -> Result<()> {
2557        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2558        this.update(&mut cx, |this, cx| {
2559            if let Some(OpenBuffer::Complete {
2560                change_set_state,
2561                buffer,
2562            }) = this.opened_buffers.get_mut(&buffer_id)
2563            {
2564                if let Some(buffer) = buffer.upgrade() {
2565                    let buffer = buffer.read(cx).text_snapshot();
2566                    change_set_state.update(cx, |change_set_state, cx| {
2567                        change_set_state.handle_base_texts_updated(buffer, request.payload, cx);
2568                    })
2569                }
2570            }
2571        })
2572    }
2573
2574    pub fn reload_buffers(
2575        &self,
2576        buffers: HashSet<Entity<Buffer>>,
2577        push_to_history: bool,
2578        cx: &mut Context<Self>,
2579    ) -> Task<Result<ProjectTransaction>> {
2580        if buffers.is_empty() {
2581            return Task::ready(Ok(ProjectTransaction::default()));
2582        }
2583        match &self.state {
2584            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2585            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2586        }
2587    }
2588
2589    async fn handle_reload_buffers(
2590        this: Entity<Self>,
2591        envelope: TypedEnvelope<proto::ReloadBuffers>,
2592        mut cx: AsyncApp,
2593    ) -> Result<proto::ReloadBuffersResponse> {
2594        let sender_id = envelope.original_sender_id().unwrap_or_default();
2595        let reload = this.update(&mut cx, |this, cx| {
2596            let mut buffers = HashSet::default();
2597            for buffer_id in &envelope.payload.buffer_ids {
2598                let buffer_id = BufferId::new(*buffer_id)?;
2599                buffers.insert(this.get_existing(buffer_id)?);
2600            }
2601            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2602        })??;
2603
2604        let project_transaction = reload.await?;
2605        let project_transaction = this.update(&mut cx, |this, cx| {
2606            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2607        })?;
2608        Ok(proto::ReloadBuffersResponse {
2609            transaction: Some(project_transaction),
2610        })
2611    }
2612
2613    pub fn create_buffer_for_peer(
2614        &mut self,
2615        buffer: &Entity<Buffer>,
2616        peer_id: proto::PeerId,
2617        cx: &mut Context<Self>,
2618    ) -> Task<Result<()>> {
2619        let buffer_id = buffer.read(cx).remote_id();
2620        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2621        if shared_buffers.contains_key(&buffer_id) {
2622            return Task::ready(Ok(()));
2623        }
2624        shared_buffers.insert(
2625            buffer_id,
2626            SharedBuffer {
2627                buffer: buffer.clone(),
2628                change_set: None,
2629                lsp_handle: None,
2630            },
2631        );
2632
2633        let Some((client, project_id)) = self.downstream_client.clone() else {
2634            return Task::ready(Ok(()));
2635        };
2636
2637        cx.spawn(|this, mut cx| async move {
2638            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2639                return anyhow::Ok(());
2640            };
2641
2642            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2643            let operations = operations.await;
2644            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2645
2646            let initial_state = proto::CreateBufferForPeer {
2647                project_id,
2648                peer_id: Some(peer_id),
2649                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2650            };
2651
2652            if client.send(initial_state).log_err().is_some() {
2653                let client = client.clone();
2654                cx.background_executor()
2655                    .spawn(async move {
2656                        let mut chunks = split_operations(operations).peekable();
2657                        while let Some(chunk) = chunks.next() {
2658                            let is_last = chunks.peek().is_none();
2659                            client.send(proto::CreateBufferForPeer {
2660                                project_id,
2661                                peer_id: Some(peer_id),
2662                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2663                                    proto::BufferChunk {
2664                                        buffer_id: buffer_id.into(),
2665                                        operations: chunk,
2666                                        is_last,
2667                                    },
2668                                )),
2669                            })?;
2670                        }
2671                        anyhow::Ok(())
2672                    })
2673                    .await
2674                    .log_err();
2675            }
2676            Ok(())
2677        })
2678    }
2679
2680    pub fn forget_shared_buffers(&mut self) {
2681        self.shared_buffers.clear();
2682    }
2683
2684    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2685        self.shared_buffers.remove(peer_id);
2686    }
2687
2688    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2689        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2690            self.shared_buffers.insert(new_peer_id, buffers);
2691        }
2692    }
2693
2694    pub fn has_shared_buffers(&self) -> bool {
2695        !self.shared_buffers.is_empty()
2696    }
2697
2698    pub fn create_local_buffer(
2699        &mut self,
2700        text: &str,
2701        language: Option<Arc<Language>>,
2702        cx: &mut Context<Self>,
2703    ) -> Entity<Buffer> {
2704        let buffer = cx.new(|cx| {
2705            Buffer::local(text, cx)
2706                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2707        });
2708
2709        self.add_buffer(buffer.clone(), cx).log_err();
2710        let buffer_id = buffer.read(cx).remote_id();
2711
2712        let this = self
2713            .as_local_mut()
2714            .expect("local-only method called in a non-local context");
2715        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2716            this.local_buffer_ids_by_path.insert(
2717                ProjectPath {
2718                    worktree_id: file.worktree_id(cx),
2719                    path: file.path.clone(),
2720                },
2721                buffer_id,
2722            );
2723
2724            if let Some(entry_id) = file.entry_id {
2725                this.local_buffer_ids_by_entry_id
2726                    .insert(entry_id, buffer_id);
2727            }
2728        }
2729        buffer
2730    }
2731
2732    pub fn deserialize_project_transaction(
2733        &mut self,
2734        message: proto::ProjectTransaction,
2735        push_to_history: bool,
2736        cx: &mut Context<Self>,
2737    ) -> Task<Result<ProjectTransaction>> {
2738        if let Some(this) = self.as_remote_mut() {
2739            this.deserialize_project_transaction(message, push_to_history, cx)
2740        } else {
2741            debug_panic!("not a remote buffer store");
2742            Task::ready(Err(anyhow!("not a remote buffer store")))
2743        }
2744    }
2745
2746    pub fn wait_for_remote_buffer(
2747        &mut self,
2748        id: BufferId,
2749        cx: &mut Context<BufferStore>,
2750    ) -> Task<Result<Entity<Buffer>>> {
2751        if let Some(this) = self.as_remote_mut() {
2752            this.wait_for_remote_buffer(id, cx)
2753        } else {
2754            debug_panic!("not a remote buffer store");
2755            Task::ready(Err(anyhow!("not a remote buffer store")))
2756        }
2757    }
2758
2759    pub fn serialize_project_transaction_for_peer(
2760        &mut self,
2761        project_transaction: ProjectTransaction,
2762        peer_id: proto::PeerId,
2763        cx: &mut Context<Self>,
2764    ) -> proto::ProjectTransaction {
2765        let mut serialized_transaction = proto::ProjectTransaction {
2766            buffer_ids: Default::default(),
2767            transactions: Default::default(),
2768        };
2769        for (buffer, transaction) in project_transaction.0 {
2770            self.create_buffer_for_peer(&buffer, peer_id, cx)
2771                .detach_and_log_err(cx);
2772            serialized_transaction
2773                .buffer_ids
2774                .push(buffer.read(cx).remote_id().into());
2775            serialized_transaction
2776                .transactions
2777                .push(language::proto::serialize_transaction(&transaction));
2778        }
2779        serialized_transaction
2780    }
2781}
2782
2783impl EventEmitter<BufferChangeSetEvent> for BufferChangeSet {}
2784
2785impl BufferChangeSet {
2786    fn set_state(
2787        &mut self,
2788        base_text: Option<language::BufferSnapshot>,
2789        diff: BufferDiff,
2790        buffer: &text::BufferSnapshot,
2791        cx: &mut Context<Self>,
2792    ) {
2793        if let Some(base_text) = base_text.as_ref() {
2794            let changed_range = if Some(base_text.remote_id())
2795                != self.base_text.as_ref().map(|buffer| buffer.remote_id())
2796            {
2797                Some(text::Anchor::MIN..text::Anchor::MAX)
2798            } else {
2799                diff.compare(&self.diff_to_buffer, buffer)
2800            };
2801            if let Some(changed_range) = changed_range {
2802                cx.emit(BufferChangeSetEvent::DiffChanged { changed_range });
2803            }
2804        }
2805        self.base_text = base_text;
2806        self.diff_to_buffer = diff;
2807    }
2808
2809    pub fn diff_hunks_intersecting_range<'a>(
2810        &'a self,
2811        range: Range<text::Anchor>,
2812        buffer_snapshot: &'a text::BufferSnapshot,
2813    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2814        self.diff_to_buffer
2815            .hunks_intersecting_range(range, buffer_snapshot)
2816    }
2817
2818    pub fn diff_hunks_intersecting_range_rev<'a>(
2819        &'a self,
2820        range: Range<text::Anchor>,
2821        buffer_snapshot: &'a text::BufferSnapshot,
2822    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2823        self.diff_to_buffer
2824            .hunks_intersecting_range_rev(range, buffer_snapshot)
2825    }
2826
2827    /// Used in cases where the change set isn't derived from git.
2828    pub fn set_base_text(
2829        &mut self,
2830        base_buffer: Entity<language::Buffer>,
2831        buffer: text::BufferSnapshot,
2832        cx: &mut Context<Self>,
2833    ) -> oneshot::Receiver<()> {
2834        let (tx, rx) = oneshot::channel();
2835        let this = cx.weak_entity();
2836        let base_buffer = base_buffer.read(cx).snapshot();
2837        cx.spawn(|_, mut cx| async move {
2838            let diff = cx
2839                .background_executor()
2840                .spawn({
2841                    let base_buffer = base_buffer.clone();
2842                    let buffer = buffer.clone();
2843                    async move { BufferDiff::build(Some(&base_buffer.text()), &buffer) }
2844                })
2845                .await;
2846            let Some(this) = this.upgrade() else {
2847                tx.send(()).ok();
2848                return;
2849            };
2850            this.update(&mut cx, |this, cx| {
2851                this.set_state(Some(base_buffer), diff, &buffer, cx);
2852            })
2853            .log_err();
2854            tx.send(()).ok();
2855        })
2856        .detach();
2857        rx
2858    }
2859
2860    #[cfg(any(test, feature = "test-support"))]
2861    pub fn base_text_string(&self) -> Option<String> {
2862        self.base_text.as_ref().map(|buffer| buffer.text())
2863    }
2864
2865    pub fn new(buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2866        BufferChangeSet {
2867            buffer_id: buffer.read(cx).remote_id(),
2868            base_text: None,
2869            diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
2870            staged_text: None,
2871            staged_diff: None,
2872        }
2873    }
2874
2875    #[cfg(any(test, feature = "test-support"))]
2876    pub fn new_with_base_text(base_text: &str, buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2877        let mut base_text = base_text.to_owned();
2878        text::LineEnding::normalize(&mut base_text);
2879        let diff_to_buffer = BufferDiff::build(Some(&base_text), &buffer.read(cx).text_snapshot());
2880        let base_text = language::Buffer::build_snapshot_sync(base_text.into(), None, None, cx);
2881        BufferChangeSet {
2882            buffer_id: buffer.read(cx).remote_id(),
2883            base_text: Some(base_text),
2884            diff_to_buffer,
2885            staged_text: None,
2886            staged_diff: None,
2887        }
2888    }
2889
2890    #[cfg(any(test, feature = "test-support"))]
2891    pub fn recalculate_diff_sync(
2892        &mut self,
2893        snapshot: text::BufferSnapshot,
2894        cx: &mut Context<Self>,
2895    ) {
2896        let mut base_text = self.base_text.as_ref().map(|buffer| buffer.text());
2897        if let Some(base_text) = base_text.as_mut() {
2898            text::LineEnding::normalize(base_text);
2899        }
2900        let diff_to_buffer = BufferDiff::build(base_text.as_deref(), &snapshot);
2901        self.set_state(self.base_text.clone(), diff_to_buffer, &snapshot, cx);
2902    }
2903}
2904
2905impl OpenBuffer {
2906    fn upgrade(&self) -> Option<Entity<Buffer>> {
2907        match self {
2908            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2909            OpenBuffer::Operations(_) => None,
2910        }
2911    }
2912}
2913
2914fn is_not_found_error(error: &anyhow::Error) -> bool {
2915    error
2916        .root_cause()
2917        .downcast_ref::<io::Error>()
2918        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2919}
2920
2921fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2922    let Some(blame) = blame else {
2923        return proto::BlameBufferResponse {
2924            blame_response: None,
2925        };
2926    };
2927
2928    let entries = blame
2929        .entries
2930        .into_iter()
2931        .map(|entry| proto::BlameEntry {
2932            sha: entry.sha.as_bytes().into(),
2933            start_line: entry.range.start,
2934            end_line: entry.range.end,
2935            original_line_number: entry.original_line_number,
2936            author: entry.author.clone(),
2937            author_mail: entry.author_mail.clone(),
2938            author_time: entry.author_time,
2939            author_tz: entry.author_tz.clone(),
2940            committer: entry.committer.clone(),
2941            committer_mail: entry.committer_mail.clone(),
2942            committer_time: entry.committer_time,
2943            committer_tz: entry.committer_tz.clone(),
2944            summary: entry.summary.clone(),
2945            previous: entry.previous.clone(),
2946            filename: entry.filename.clone(),
2947        })
2948        .collect::<Vec<_>>();
2949
2950    let messages = blame
2951        .messages
2952        .into_iter()
2953        .map(|(oid, message)| proto::CommitMessage {
2954            oid: oid.as_bytes().into(),
2955            message,
2956        })
2957        .collect::<Vec<_>>();
2958
2959    let permalinks = blame
2960        .permalinks
2961        .into_iter()
2962        .map(|(oid, url)| proto::CommitPermalink {
2963            oid: oid.as_bytes().into(),
2964            permalink: url.to_string(),
2965        })
2966        .collect::<Vec<_>>();
2967
2968    proto::BlameBufferResponse {
2969        blame_response: Some(proto::blame_buffer_response::BlameResponse {
2970            entries,
2971            messages,
2972            permalinks,
2973            remote_url: blame.remote_url,
2974        }),
2975    }
2976}
2977
2978fn deserialize_blame_buffer_response(
2979    response: proto::BlameBufferResponse,
2980) -> Option<git::blame::Blame> {
2981    let response = response.blame_response?;
2982    let entries = response
2983        .entries
2984        .into_iter()
2985        .filter_map(|entry| {
2986            Some(git::blame::BlameEntry {
2987                sha: git::Oid::from_bytes(&entry.sha).ok()?,
2988                range: entry.start_line..entry.end_line,
2989                original_line_number: entry.original_line_number,
2990                committer: entry.committer,
2991                committer_time: entry.committer_time,
2992                committer_tz: entry.committer_tz,
2993                committer_mail: entry.committer_mail,
2994                author: entry.author,
2995                author_mail: entry.author_mail,
2996                author_time: entry.author_time,
2997                author_tz: entry.author_tz,
2998                summary: entry.summary,
2999                previous: entry.previous,
3000                filename: entry.filename,
3001            })
3002        })
3003        .collect::<Vec<_>>();
3004
3005    let messages = response
3006        .messages
3007        .into_iter()
3008        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
3009        .collect::<HashMap<_, _>>();
3010
3011    let permalinks = response
3012        .permalinks
3013        .into_iter()
3014        .filter_map(|permalink| {
3015            Some((
3016                git::Oid::from_bytes(&permalink.oid).ok()?,
3017                Url::from_str(&permalink.permalink).ok()?,
3018            ))
3019        })
3020        .collect::<HashMap<_, _>>();
3021
3022    Some(Blame {
3023        entries,
3024        permalinks,
3025        messages,
3026        remote_url: response.remote_url,
3027    })
3028}
3029
3030fn get_permalink_in_rust_registry_src(
3031    provider_registry: Arc<GitHostingProviderRegistry>,
3032    path: PathBuf,
3033    selection: Range<u32>,
3034) -> Result<url::Url> {
3035    #[derive(Deserialize)]
3036    struct CargoVcsGit {
3037        sha1: String,
3038    }
3039
3040    #[derive(Deserialize)]
3041    struct CargoVcsInfo {
3042        git: CargoVcsGit,
3043        path_in_vcs: String,
3044    }
3045
3046    #[derive(Deserialize)]
3047    struct CargoPackage {
3048        repository: String,
3049    }
3050
3051    #[derive(Deserialize)]
3052    struct CargoToml {
3053        package: CargoPackage,
3054    }
3055
3056    let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
3057        let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
3058        Some((dir, json))
3059    }) else {
3060        bail!("No .cargo_vcs_info.json found in parent directories")
3061    };
3062    let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
3063    let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
3064    let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
3065    let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
3066        .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
3067    let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
3068    let permalink = provider.build_permalink(
3069        remote,
3070        BuildPermalinkParams {
3071            sha: &cargo_vcs_info.git.sha1,
3072            path: &path.to_string_lossy(),
3073            selection: Some(selection),
3074        },
3075    );
3076    Ok(permalink)
3077}