buffer_store.rs

   1use crate::{
   2    lsp_store::OpenLspBufferHandle,
   3    search::SearchQuery,
   4    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   5    ProjectItem as _, ProjectPath,
   6};
   7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
   8use anyhow::{anyhow, bail, Context as _, Result};
   9use client::Client;
  10use collections::{hash_map, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{
  13    channel::oneshot,
  14    future::{OptionFuture, Shared},
  15    Future, FutureExt as _, StreamExt,
  16};
  17use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
  18use gpui::{
  19    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
  20};
  21use http_client::Url;
  22use language::{
  23    proto::{
  24        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  25        split_operations,
  26    },
  27    Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
  28};
  29use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  30use serde::Deserialize;
  31use smol::channel::Receiver;
  32use std::{
  33    io,
  34    ops::Range,
  35    path::{Path, PathBuf},
  36    pin::pin,
  37    str::FromStr as _,
  38    sync::Arc,
  39    time::Instant,
  40};
  41use text::{BufferId, Rope};
  42use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  43use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
  44
  45#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
  46enum ChangeSetKind {
  47    Unstaged,
  48    Uncommitted,
  49}
  50
  51/// A set of open buffers.
  52pub struct BufferStore {
  53    state: BufferStoreState,
  54    #[allow(clippy::type_complexity)]
  55    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
  56    #[allow(clippy::type_complexity)]
  57    loading_change_sets: HashMap<
  58        (BufferId, ChangeSetKind),
  59        Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>,
  60    >,
  61    worktree_store: Entity<WorktreeStore>,
  62    opened_buffers: HashMap<BufferId, OpenBuffer>,
  63    downstream_client: Option<(AnyProtoClient, u64)>,
  64    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  65}
  66
  67#[derive(Hash, Eq, PartialEq, Clone)]
  68struct SharedBuffer {
  69    buffer: Entity<Buffer>,
  70    change_set: Option<Entity<BufferChangeSet>>,
  71    lsp_handle: Option<OpenLspBufferHandle>,
  72}
  73
  74#[derive(Default)]
  75struct BufferChangeSetState {
  76    unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
  77    uncommitted_changes: Option<WeakEntity<BufferChangeSet>>,
  78    recalculate_diff_task: Option<Task<Result<()>>>,
  79    language: Option<Arc<Language>>,
  80    language_registry: Option<Arc<LanguageRegistry>>,
  81    diff_updated_futures: Vec<oneshot::Sender<()>>,
  82    buffer_subscription: Option<Subscription>,
  83
  84    head_text: Option<Arc<String>>,
  85    index_text: Option<Arc<String>>,
  86    head_changed: bool,
  87    index_changed: bool,
  88}
  89
  90#[derive(Clone, Debug)]
  91enum DiffBasesChange {
  92    SetIndex(Option<String>),
  93    SetHead(Option<String>),
  94    SetEach {
  95        index: Option<String>,
  96        head: Option<String>,
  97    },
  98    SetBoth(Option<String>),
  99}
 100
 101impl BufferChangeSetState {
 102    fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
 103        self.language = buffer.read(cx).language().cloned();
 104        self.index_changed = self.index_text.is_some();
 105        self.head_changed = self.head_text.is_some();
 106        let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
 107    }
 108
 109    fn unstaged_changes(&self) -> Option<Entity<BufferChangeSet>> {
 110        self.unstaged_changes.as_ref().and_then(|set| set.upgrade())
 111    }
 112
 113    fn uncommitted_changes(&self) -> Option<Entity<BufferChangeSet>> {
 114        self.uncommitted_changes
 115            .as_ref()
 116            .and_then(|set| set.upgrade())
 117    }
 118
 119    fn handle_base_texts_updated(
 120        &mut self,
 121        buffer: text::BufferSnapshot,
 122        message: proto::UpdateDiffBases,
 123        cx: &mut Context<Self>,
 124    ) {
 125        use proto::update_diff_bases::Mode;
 126
 127        let Some(mode) = Mode::from_i32(message.mode) else {
 128            return;
 129        };
 130
 131        let diff_bases_change = match mode {
 132            Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
 133            Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
 134            Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
 135            Mode::IndexAndHead => DiffBasesChange::SetEach {
 136                index: message.staged_text,
 137                head: message.committed_text,
 138            },
 139        };
 140
 141        let _ = self.diff_bases_changed(buffer, diff_bases_change, cx);
 142    }
 143
 144    fn diff_bases_changed(
 145        &mut self,
 146        buffer: text::BufferSnapshot,
 147        diff_bases_change: DiffBasesChange,
 148        cx: &mut Context<Self>,
 149    ) -> oneshot::Receiver<()> {
 150        match diff_bases_change {
 151            DiffBasesChange::SetIndex(index) => {
 152                let mut index = index.unwrap_or_default();
 153                text::LineEnding::normalize(&mut index);
 154                self.index_text = Some(Arc::new(index));
 155                self.index_changed = true;
 156            }
 157            DiffBasesChange::SetHead(head) => {
 158                let mut head = head.unwrap_or_default();
 159                text::LineEnding::normalize(&mut head);
 160                self.head_text = Some(Arc::new(head));
 161                self.head_changed = true;
 162            }
 163            DiffBasesChange::SetBoth(text) => {
 164                let mut text = text.unwrap_or_default();
 165                text::LineEnding::normalize(&mut text);
 166                self.head_text = Some(Arc::new(text));
 167                self.index_text = self.head_text.clone();
 168                self.head_changed = true;
 169                self.index_changed = true;
 170            }
 171            DiffBasesChange::SetEach { index, head } => {
 172                let mut index = index.unwrap_or_default();
 173                text::LineEnding::normalize(&mut index);
 174                let mut head = head.unwrap_or_default();
 175                text::LineEnding::normalize(&mut head);
 176                self.index_text = Some(Arc::new(index));
 177                self.head_text = Some(Arc::new(head));
 178                self.head_changed = true;
 179                self.index_changed = true;
 180            }
 181        }
 182
 183        self.recalculate_diffs(buffer, cx)
 184    }
 185
 186    fn recalculate_diffs(
 187        &mut self,
 188        buffer: text::BufferSnapshot,
 189        cx: &mut Context<Self>,
 190    ) -> oneshot::Receiver<()> {
 191        let (tx, rx) = oneshot::channel();
 192        self.diff_updated_futures.push(tx);
 193
 194        let language = self.language.clone();
 195        let language_registry = self.language_registry.clone();
 196        let unstaged_changes = self.unstaged_changes();
 197        let uncommitted_changes = self.uncommitted_changes();
 198        let head = self.head_text.clone();
 199        let index = self.index_text.clone();
 200        let index_changed = self.index_changed;
 201        let head_changed = self.head_changed;
 202        let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
 203            (Some(index), Some(head)) => Arc::ptr_eq(index, head),
 204            (None, None) => true,
 205            _ => false,
 206        };
 207        self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
 208            if let Some(unstaged_changes) = &unstaged_changes {
 209                let staged_snapshot = if index_changed {
 210                    let staged_snapshot = cx.update(|cx| {
 211                        index.as_ref().map(|head| {
 212                            language::Buffer::build_snapshot(
 213                                Rope::from(head.as_str()),
 214                                language.clone(),
 215                                language_registry.clone(),
 216                                cx,
 217                            )
 218                        })
 219                    })?;
 220                    cx.background_executor()
 221                        .spawn(OptionFuture::from(staged_snapshot))
 222                } else {
 223                    Task::ready(
 224                        unstaged_changes
 225                            .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
 226                    )
 227                };
 228
 229                let diff =
 230                    cx.background_executor().spawn({
 231                        let buffer = buffer.clone();
 232                        async move {
 233                            BufferDiff::build(index.as_ref().map(|index| index.as_str()), &buffer)
 234                        }
 235                    });
 236
 237                let (staged_snapshot, diff) = futures::join!(staged_snapshot, diff);
 238
 239                unstaged_changes.update(&mut cx, |unstaged_changes, cx| {
 240                    unstaged_changes.set_state(staged_snapshot.clone(), diff, &buffer, cx);
 241                })?;
 242            }
 243
 244            if let Some(uncommitted_changes) = &uncommitted_changes {
 245                let (snapshot, diff) = if let (Some(unstaged_changes), true) =
 246                    (&unstaged_changes, index_matches_head)
 247                {
 248                    unstaged_changes.read_with(&cx, |change_set, _| {
 249                        (
 250                            change_set.base_text.clone(),
 251                            change_set.diff_to_buffer.clone(),
 252                        )
 253                    })?
 254                } else {
 255                    let committed_snapshot = if head_changed {
 256                        let committed_snapshot = cx.update(|cx| {
 257                            head.as_ref().map(|head| {
 258                                language::Buffer::build_snapshot(
 259                                    Rope::from(head.as_str()),
 260                                    language.clone(),
 261                                    language_registry.clone(),
 262                                    cx,
 263                                )
 264                            })
 265                        })?;
 266                        cx.background_executor()
 267                            .spawn(OptionFuture::from(committed_snapshot))
 268                    } else {
 269                        Task::ready(
 270                            uncommitted_changes
 271                                .read_with(&cx, |change_set, _| change_set.base_text.clone())?,
 272                        )
 273                    };
 274
 275                    let diff = cx.background_executor().spawn({
 276                        let buffer = buffer.clone();
 277                        let head = head.clone();
 278                        async move {
 279                            BufferDiff::build(head.as_ref().map(|head| head.as_str()), &buffer)
 280                        }
 281                    });
 282                    futures::join!(committed_snapshot, diff)
 283                };
 284
 285                uncommitted_changes.update(&mut cx, |change_set, cx| {
 286                    change_set.set_state(snapshot, diff, &buffer, cx);
 287                })?;
 288            }
 289
 290            if let Some(this) = this.upgrade() {
 291                this.update(&mut cx, |this, _| {
 292                    this.index_changed = false;
 293                    this.head_changed = false;
 294                    for tx in this.diff_updated_futures.drain(..) {
 295                        tx.send(()).ok();
 296                    }
 297                })?;
 298            }
 299
 300            Ok(())
 301        }));
 302
 303        rx
 304    }
 305}
 306
 307pub struct BufferChangeSet {
 308    pub buffer_id: BufferId,
 309    pub base_text: Option<language::BufferSnapshot>,
 310    pub diff_to_buffer: BufferDiff,
 311    pub unstaged_change_set: Option<Entity<BufferChangeSet>>,
 312}
 313
 314impl std::fmt::Debug for BufferChangeSet {
 315    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 316        f.debug_struct("BufferChangeSet")
 317            .field("buffer_id", &self.buffer_id)
 318            .field("base_text", &self.base_text.as_ref().map(|s| s.text()))
 319            .field("diff_to_buffer", &self.diff_to_buffer)
 320            .finish()
 321    }
 322}
 323
 324pub enum BufferChangeSetEvent {
 325    DiffChanged { changed_range: Range<text::Anchor> },
 326}
 327
 328enum BufferStoreState {
 329    Local(LocalBufferStore),
 330    Remote(RemoteBufferStore),
 331}
 332
 333struct RemoteBufferStore {
 334    shared_with_me: HashSet<Entity<Buffer>>,
 335    upstream_client: AnyProtoClient,
 336    project_id: u64,
 337    loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
 338    remote_buffer_listeners:
 339        HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
 340    worktree_store: Entity<WorktreeStore>,
 341}
 342
 343struct LocalBufferStore {
 344    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
 345    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
 346    worktree_store: Entity<WorktreeStore>,
 347    _subscription: Subscription,
 348}
 349
 350enum OpenBuffer {
 351    Complete {
 352        buffer: WeakEntity<Buffer>,
 353        change_set_state: Entity<BufferChangeSetState>,
 354    },
 355    Operations(Vec<Operation>),
 356}
 357
 358pub enum BufferStoreEvent {
 359    BufferAdded(Entity<Buffer>),
 360    BufferDropped(BufferId),
 361    BufferChangedFilePath {
 362        buffer: Entity<Buffer>,
 363        old_file: Option<Arc<dyn language::File>>,
 364    },
 365}
 366
 367#[derive(Default, Debug)]
 368pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
 369
 370impl EventEmitter<BufferStoreEvent> for BufferStore {}
 371
 372impl RemoteBufferStore {
 373    fn open_unstaged_changes(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
 374        let project_id = self.project_id;
 375        let client = self.upstream_client.clone();
 376        cx.background_executor().spawn(async move {
 377            let response = client
 378                .request(proto::OpenUnstagedChanges {
 379                    project_id,
 380                    buffer_id: buffer_id.to_proto(),
 381                })
 382                .await?;
 383            Ok(response.staged_text)
 384        })
 385    }
 386
 387    fn open_uncommitted_changes(
 388        &self,
 389        buffer_id: BufferId,
 390        cx: &App,
 391    ) -> Task<Result<DiffBasesChange>> {
 392        use proto::open_uncommitted_changes_response::Mode;
 393
 394        let project_id = self.project_id;
 395        let client = self.upstream_client.clone();
 396        cx.background_executor().spawn(async move {
 397            let response = client
 398                .request(proto::OpenUncommittedChanges {
 399                    project_id,
 400                    buffer_id: buffer_id.to_proto(),
 401                })
 402                .await?;
 403            let mode = Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
 404            let bases = match mode {
 405                Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
 406                Mode::IndexAndHead => DiffBasesChange::SetEach {
 407                    head: response.committed_text,
 408                    index: response.staged_text,
 409                },
 410            };
 411            Ok(bases)
 412        })
 413    }
 414
 415    pub fn wait_for_remote_buffer(
 416        &mut self,
 417        id: BufferId,
 418        cx: &mut Context<BufferStore>,
 419    ) -> Task<Result<Entity<Buffer>>> {
 420        let (tx, rx) = oneshot::channel();
 421        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 422
 423        cx.spawn(|this, cx| async move {
 424            if let Some(buffer) = this
 425                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
 426                .ok()
 427                .flatten()
 428            {
 429                return Ok(buffer);
 430            }
 431
 432            cx.background_executor()
 433                .spawn(async move { rx.await? })
 434                .await
 435        })
 436    }
 437
 438    fn save_remote_buffer(
 439        &self,
 440        buffer_handle: Entity<Buffer>,
 441        new_path: Option<proto::ProjectPath>,
 442        cx: &Context<BufferStore>,
 443    ) -> Task<Result<()>> {
 444        let buffer = buffer_handle.read(cx);
 445        let buffer_id = buffer.remote_id().into();
 446        let version = buffer.version();
 447        let rpc = self.upstream_client.clone();
 448        let project_id = self.project_id;
 449        cx.spawn(move |_, mut cx| async move {
 450            let response = rpc
 451                .request(proto::SaveBuffer {
 452                    project_id,
 453                    buffer_id,
 454                    new_path,
 455                    version: serialize_version(&version),
 456                })
 457                .await?;
 458            let version = deserialize_version(&response.version);
 459            let mtime = response.mtime.map(|mtime| mtime.into());
 460
 461            buffer_handle.update(&mut cx, |buffer, cx| {
 462                buffer.did_save(version.clone(), mtime, cx);
 463            })?;
 464
 465            Ok(())
 466        })
 467    }
 468
 469    pub fn handle_create_buffer_for_peer(
 470        &mut self,
 471        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 472        replica_id: u16,
 473        capability: Capability,
 474        cx: &mut Context<BufferStore>,
 475    ) -> Result<Option<Entity<Buffer>>> {
 476        match envelope
 477            .payload
 478            .variant
 479            .ok_or_else(|| anyhow!("missing variant"))?
 480        {
 481            proto::create_buffer_for_peer::Variant::State(mut state) => {
 482                let buffer_id = BufferId::new(state.id)?;
 483
 484                let buffer_result = maybe!({
 485                    let mut buffer_file = None;
 486                    if let Some(file) = state.file.take() {
 487                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 488                        let worktree = self
 489                            .worktree_store
 490                            .read(cx)
 491                            .worktree_for_id(worktree_id, cx)
 492                            .ok_or_else(|| {
 493                                anyhow!("no worktree found for id {}", file.worktree_id)
 494                            })?;
 495                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 496                            as Arc<dyn language::File>);
 497                    }
 498                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 499                });
 500
 501                match buffer_result {
 502                    Ok(buffer) => {
 503                        let buffer = cx.new(|_| buffer);
 504                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 505                    }
 506                    Err(error) => {
 507                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 508                            for listener in listeners {
 509                                listener.send(Err(anyhow!(error.cloned()))).ok();
 510                            }
 511                        }
 512                    }
 513                }
 514            }
 515            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 516                let buffer_id = BufferId::new(chunk.buffer_id)?;
 517                let buffer = self
 518                    .loading_remote_buffers_by_id
 519                    .get(&buffer_id)
 520                    .cloned()
 521                    .ok_or_else(|| {
 522                        anyhow!(
 523                            "received chunk for buffer {} without initial state",
 524                            chunk.buffer_id
 525                        )
 526                    })?;
 527
 528                let result = maybe!({
 529                    let operations = chunk
 530                        .operations
 531                        .into_iter()
 532                        .map(language::proto::deserialize_operation)
 533                        .collect::<Result<Vec<_>>>()?;
 534                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 535                    anyhow::Ok(())
 536                });
 537
 538                if let Err(error) = result {
 539                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 540                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 541                        for listener in listeners {
 542                            listener.send(Err(error.cloned())).ok();
 543                        }
 544                    }
 545                } else if chunk.is_last {
 546                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 547                    if self.upstream_client.is_via_collab() {
 548                        // retain buffers sent by peers to avoid races.
 549                        self.shared_with_me.insert(buffer.clone());
 550                    }
 551
 552                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 553                        for sender in senders {
 554                            sender.send(Ok(buffer.clone())).ok();
 555                        }
 556                    }
 557                    return Ok(Some(buffer));
 558                }
 559            }
 560        }
 561        return Ok(None);
 562    }
 563
 564    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 565        self.loading_remote_buffers_by_id
 566            .keys()
 567            .copied()
 568            .collect::<Vec<_>>()
 569    }
 570
 571    pub fn deserialize_project_transaction(
 572        &self,
 573        message: proto::ProjectTransaction,
 574        push_to_history: bool,
 575        cx: &mut Context<BufferStore>,
 576    ) -> Task<Result<ProjectTransaction>> {
 577        cx.spawn(|this, mut cx| async move {
 578            let mut project_transaction = ProjectTransaction::default();
 579            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 580            {
 581                let buffer_id = BufferId::new(buffer_id)?;
 582                let buffer = this
 583                    .update(&mut cx, |this, cx| {
 584                        this.wait_for_remote_buffer(buffer_id, cx)
 585                    })?
 586                    .await?;
 587                let transaction = language::proto::deserialize_transaction(transaction)?;
 588                project_transaction.0.insert(buffer, transaction);
 589            }
 590
 591            for (buffer, transaction) in &project_transaction.0 {
 592                buffer
 593                    .update(&mut cx, |buffer, _| {
 594                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 595                    })?
 596                    .await?;
 597
 598                if push_to_history {
 599                    buffer.update(&mut cx, |buffer, _| {
 600                        buffer.push_transaction(transaction.clone(), Instant::now());
 601                    })?;
 602                }
 603            }
 604
 605            Ok(project_transaction)
 606        })
 607    }
 608
 609    fn open_buffer(
 610        &self,
 611        path: Arc<Path>,
 612        worktree: Entity<Worktree>,
 613        cx: &mut Context<BufferStore>,
 614    ) -> Task<Result<Entity<Buffer>>> {
 615        let worktree_id = worktree.read(cx).id().to_proto();
 616        let project_id = self.project_id;
 617        let client = self.upstream_client.clone();
 618        let path_string = path.clone().to_string_lossy().to_string();
 619        cx.spawn(move |this, mut cx| async move {
 620            let response = client
 621                .request(proto::OpenBufferByPath {
 622                    project_id,
 623                    worktree_id,
 624                    path: path_string,
 625                })
 626                .await?;
 627            let buffer_id = BufferId::new(response.buffer_id)?;
 628
 629            let buffer = this
 630                .update(&mut cx, {
 631                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 632                })?
 633                .await?;
 634
 635            Ok(buffer)
 636        })
 637    }
 638
 639    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
 640        let create = self.upstream_client.request(proto::OpenNewBuffer {
 641            project_id: self.project_id,
 642        });
 643        cx.spawn(|this, mut cx| async move {
 644            let response = create.await?;
 645            let buffer_id = BufferId::new(response.buffer_id)?;
 646
 647            this.update(&mut cx, |this, cx| {
 648                this.wait_for_remote_buffer(buffer_id, cx)
 649            })?
 650            .await
 651        })
 652    }
 653
 654    fn reload_buffers(
 655        &self,
 656        buffers: HashSet<Entity<Buffer>>,
 657        push_to_history: bool,
 658        cx: &mut Context<BufferStore>,
 659    ) -> Task<Result<ProjectTransaction>> {
 660        let request = self.upstream_client.request(proto::ReloadBuffers {
 661            project_id: self.project_id,
 662            buffer_ids: buffers
 663                .iter()
 664                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 665                .collect(),
 666        });
 667
 668        cx.spawn(|this, mut cx| async move {
 669            let response = request
 670                .await?
 671                .transaction
 672                .ok_or_else(|| anyhow!("missing transaction"))?;
 673            this.update(&mut cx, |this, cx| {
 674                this.deserialize_project_transaction(response, push_to_history, cx)
 675            })?
 676            .await
 677        })
 678    }
 679}
 680
 681impl LocalBufferStore {
 682    fn worktree_for_buffer(
 683        &self,
 684        buffer: &Entity<Buffer>,
 685        cx: &App,
 686    ) -> Option<(Entity<Worktree>, Arc<Path>)> {
 687        let file = buffer.read(cx).file()?;
 688        let worktree_id = file.worktree_id(cx);
 689        let path = file.path().clone();
 690        let worktree = self
 691            .worktree_store
 692            .read(cx)
 693            .worktree_for_id(worktree_id, cx)?;
 694        Some((worktree, path))
 695    }
 696
 697    fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
 698        if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
 699            worktree.read(cx).load_staged_file(path.as_ref(), cx)
 700        } else {
 701            return Task::ready(Err(anyhow!("no such worktree")));
 702        }
 703    }
 704
 705    fn load_committed_text(
 706        &self,
 707        buffer: &Entity<Buffer>,
 708        cx: &App,
 709    ) -> Task<Result<Option<String>>> {
 710        if let Some((worktree, path)) = self.worktree_for_buffer(buffer, cx) {
 711            worktree.read(cx).load_committed_file(path.as_ref(), cx)
 712        } else {
 713            Task::ready(Err(anyhow!("no such worktree")))
 714        }
 715    }
 716
 717    fn save_local_buffer(
 718        &self,
 719        buffer_handle: Entity<Buffer>,
 720        worktree: Entity<Worktree>,
 721        path: Arc<Path>,
 722        mut has_changed_file: bool,
 723        cx: &mut Context<BufferStore>,
 724    ) -> Task<Result<()>> {
 725        let buffer = buffer_handle.read(cx);
 726
 727        let text = buffer.as_rope().clone();
 728        let line_ending = buffer.line_ending();
 729        let version = buffer.version();
 730        let buffer_id = buffer.remote_id();
 731        if buffer
 732            .file()
 733            .is_some_and(|file| file.disk_state() == DiskState::New)
 734        {
 735            has_changed_file = true;
 736        }
 737
 738        let save = worktree.update(cx, |worktree, cx| {
 739            worktree.write_file(path.as_ref(), text, line_ending, cx)
 740        });
 741
 742        cx.spawn(move |this, mut cx| async move {
 743            let new_file = save.await?;
 744            let mtime = new_file.disk_state().mtime();
 745            this.update(&mut cx, |this, cx| {
 746                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 747                    if has_changed_file {
 748                        downstream_client
 749                            .send(proto::UpdateBufferFile {
 750                                project_id,
 751                                buffer_id: buffer_id.to_proto(),
 752                                file: Some(language::File::to_proto(&*new_file, cx)),
 753                            })
 754                            .log_err();
 755                    }
 756                    downstream_client
 757                        .send(proto::BufferSaved {
 758                            project_id,
 759                            buffer_id: buffer_id.to_proto(),
 760                            version: serialize_version(&version),
 761                            mtime: mtime.map(|time| time.into()),
 762                        })
 763                        .log_err();
 764                }
 765            })?;
 766            buffer_handle.update(&mut cx, |buffer, cx| {
 767                if has_changed_file {
 768                    buffer.file_updated(new_file, cx);
 769                }
 770                buffer.did_save(version.clone(), mtime, cx);
 771            })
 772        })
 773    }
 774
 775    fn subscribe_to_worktree(
 776        &mut self,
 777        worktree: &Entity<Worktree>,
 778        cx: &mut Context<BufferStore>,
 779    ) {
 780        cx.subscribe(worktree, |this, worktree, event, cx| {
 781            if worktree.read(cx).is_local() {
 782                match event {
 783                    worktree::Event::UpdatedEntries(changes) => {
 784                        Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 785                    }
 786                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 787                        Self::local_worktree_git_repos_changed(
 788                            this,
 789                            worktree.clone(),
 790                            updated_repos,
 791                            cx,
 792                        )
 793                    }
 794                    _ => {}
 795                }
 796            }
 797        })
 798        .detach();
 799    }
 800
 801    fn local_worktree_entries_changed(
 802        this: &mut BufferStore,
 803        worktree_handle: &Entity<Worktree>,
 804        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 805        cx: &mut Context<BufferStore>,
 806    ) {
 807        let snapshot = worktree_handle.read(cx).snapshot();
 808        for (path, entry_id, _) in changes {
 809            Self::local_worktree_entry_changed(
 810                this,
 811                *entry_id,
 812                path,
 813                worktree_handle,
 814                &snapshot,
 815                cx,
 816            );
 817        }
 818    }
 819
 820    fn local_worktree_git_repos_changed(
 821        this: &mut BufferStore,
 822        worktree_handle: Entity<Worktree>,
 823        changed_repos: &UpdatedGitRepositoriesSet,
 824        cx: &mut Context<BufferStore>,
 825    ) {
 826        debug_assert!(worktree_handle.read(cx).is_local());
 827
 828        let mut change_set_state_updates = Vec::new();
 829        for buffer in this.opened_buffers.values() {
 830            let OpenBuffer::Complete {
 831                buffer,
 832                change_set_state,
 833            } = buffer
 834            else {
 835                continue;
 836            };
 837            let Some(buffer) = buffer.upgrade() else {
 838                continue;
 839            };
 840            let buffer = buffer.read(cx);
 841            let Some(file) = File::from_dyn(buffer.file()) else {
 842                continue;
 843            };
 844            if file.worktree != worktree_handle {
 845                continue;
 846            }
 847            let change_set_state = change_set_state.read(cx);
 848            if changed_repos
 849                .iter()
 850                .any(|(work_dir, _)| file.path.starts_with(work_dir))
 851            {
 852                let snapshot = buffer.text_snapshot();
 853                change_set_state_updates.push((
 854                    snapshot.clone(),
 855                    file.path.clone(),
 856                    change_set_state
 857                        .unstaged_changes
 858                        .as_ref()
 859                        .and_then(|set| set.upgrade())
 860                        .is_some(),
 861                    change_set_state
 862                        .uncommitted_changes
 863                        .as_ref()
 864                        .and_then(|set| set.upgrade())
 865                        .is_some(),
 866                ))
 867            }
 868        }
 869
 870        if change_set_state_updates.is_empty() {
 871            return;
 872        }
 873
 874        cx.spawn(move |this, mut cx| async move {
 875            let snapshot =
 876                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 877            let diff_bases_changes_by_buffer = cx
 878                .background_executor()
 879                .spawn(async move {
 880                    change_set_state_updates
 881                        .into_iter()
 882                        .filter_map(
 883                            |(buffer_snapshot, path, needs_staged_text, needs_committed_text)| {
 884                                let local_repo = snapshot.local_repo_for_path(&path)?;
 885                                let relative_path = local_repo.relativize(&path).ok()?;
 886                                let staged_text = if needs_staged_text {
 887                                    local_repo.repo().load_index_text(&relative_path)
 888                                } else {
 889                                    None
 890                                };
 891                                let committed_text = if needs_committed_text {
 892                                    local_repo.repo().load_committed_text(&relative_path)
 893                                } else {
 894                                    None
 895                                };
 896                                let diff_bases_change =
 897                                    match (needs_staged_text, needs_committed_text) {
 898                                        (true, true) => Some(if staged_text == committed_text {
 899                                            DiffBasesChange::SetBoth(committed_text)
 900                                        } else {
 901                                            DiffBasesChange::SetEach {
 902                                                index: staged_text,
 903                                                head: committed_text,
 904                                            }
 905                                        }),
 906                                        (true, false) => {
 907                                            Some(DiffBasesChange::SetIndex(staged_text))
 908                                        }
 909                                        (false, true) => {
 910                                            Some(DiffBasesChange::SetHead(committed_text))
 911                                        }
 912                                        (false, false) => None,
 913                                    };
 914                                Some((buffer_snapshot, diff_bases_change))
 915                            },
 916                        )
 917                        .collect::<Vec<_>>()
 918                })
 919                .await;
 920
 921            this.update(&mut cx, |this, cx| {
 922                for (buffer_snapshot, diff_bases_change) in diff_bases_changes_by_buffer {
 923                    let Some(OpenBuffer::Complete {
 924                        change_set_state, ..
 925                    }) = this.opened_buffers.get_mut(&buffer_snapshot.remote_id())
 926                    else {
 927                        continue;
 928                    };
 929                    let Some(diff_bases_change) = diff_bases_change else {
 930                        continue;
 931                    };
 932
 933                    change_set_state.update(cx, |change_set_state, cx| {
 934                        use proto::update_diff_bases::Mode;
 935
 936                        if let Some((client, project_id)) = this.downstream_client.as_ref() {
 937                            let buffer_id = buffer_snapshot.remote_id().to_proto();
 938                            let (staged_text, committed_text, mode) = match diff_bases_change
 939                                .clone()
 940                            {
 941                                DiffBasesChange::SetIndex(index) => (index, None, Mode::IndexOnly),
 942                                DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
 943                                DiffBasesChange::SetEach { index, head } => {
 944                                    (index, head, Mode::IndexAndHead)
 945                                }
 946                                DiffBasesChange::SetBoth(text) => {
 947                                    (None, text, Mode::IndexMatchesHead)
 948                                }
 949                            };
 950                            let message = proto::UpdateDiffBases {
 951                                project_id: *project_id,
 952                                buffer_id,
 953                                staged_text,
 954                                committed_text,
 955                                mode: mode as i32,
 956                            };
 957
 958                            client.send(message).log_err();
 959                        }
 960
 961                        let _ = change_set_state.diff_bases_changed(
 962                            buffer_snapshot,
 963                            diff_bases_change,
 964                            cx,
 965                        );
 966                    });
 967                }
 968            })
 969        })
 970        .detach_and_log_err(cx);
 971    }
 972
 973    fn local_worktree_entry_changed(
 974        this: &mut BufferStore,
 975        entry_id: ProjectEntryId,
 976        path: &Arc<Path>,
 977        worktree: &Entity<worktree::Worktree>,
 978        snapshot: &worktree::Snapshot,
 979        cx: &mut Context<BufferStore>,
 980    ) -> Option<()> {
 981        let project_path = ProjectPath {
 982            worktree_id: snapshot.id(),
 983            path: path.clone(),
 984        };
 985
 986        let buffer_id = {
 987            let local = this.as_local_mut()?;
 988            match local.local_buffer_ids_by_entry_id.get(&entry_id) {
 989                Some(&buffer_id) => buffer_id,
 990                None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
 991            }
 992        };
 993
 994        let buffer = if let Some(buffer) = this.get(buffer_id) {
 995            Some(buffer)
 996        } else {
 997            this.opened_buffers.remove(&buffer_id);
 998            None
 999        };
1000
1001        let buffer = if let Some(buffer) = buffer {
1002            buffer
1003        } else {
1004            let this = this.as_local_mut()?;
1005            this.local_buffer_ids_by_path.remove(&project_path);
1006            this.local_buffer_ids_by_entry_id.remove(&entry_id);
1007            return None;
1008        };
1009
1010        let events = buffer.update(cx, |buffer, cx| {
1011            let local = this.as_local_mut()?;
1012            let file = buffer.file()?;
1013            let old_file = File::from_dyn(Some(file))?;
1014            if old_file.worktree != *worktree {
1015                return None;
1016            }
1017
1018            let snapshot_entry = old_file
1019                .entry_id
1020                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
1021                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
1022
1023            let new_file = if let Some(entry) = snapshot_entry {
1024                File {
1025                    disk_state: match entry.mtime {
1026                        Some(mtime) => DiskState::Present { mtime },
1027                        None => old_file.disk_state,
1028                    },
1029                    is_local: true,
1030                    entry_id: Some(entry.id),
1031                    path: entry.path.clone(),
1032                    worktree: worktree.clone(),
1033                    is_private: entry.is_private,
1034                }
1035            } else {
1036                File {
1037                    disk_state: DiskState::Deleted,
1038                    is_local: true,
1039                    entry_id: old_file.entry_id,
1040                    path: old_file.path.clone(),
1041                    worktree: worktree.clone(),
1042                    is_private: old_file.is_private,
1043                }
1044            };
1045
1046            if new_file == *old_file {
1047                return None;
1048            }
1049
1050            let mut events = Vec::new();
1051            if new_file.path != old_file.path {
1052                local.local_buffer_ids_by_path.remove(&ProjectPath {
1053                    path: old_file.path.clone(),
1054                    worktree_id: old_file.worktree_id(cx),
1055                });
1056                local.local_buffer_ids_by_path.insert(
1057                    ProjectPath {
1058                        worktree_id: new_file.worktree_id(cx),
1059                        path: new_file.path.clone(),
1060                    },
1061                    buffer_id,
1062                );
1063                events.push(BufferStoreEvent::BufferChangedFilePath {
1064                    buffer: cx.entity(),
1065                    old_file: buffer.file().cloned(),
1066                });
1067            }
1068
1069            if new_file.entry_id != old_file.entry_id {
1070                if let Some(entry_id) = old_file.entry_id {
1071                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
1072                }
1073                if let Some(entry_id) = new_file.entry_id {
1074                    local
1075                        .local_buffer_ids_by_entry_id
1076                        .insert(entry_id, buffer_id);
1077                }
1078            }
1079
1080            if let Some((client, project_id)) = &this.downstream_client {
1081                client
1082                    .send(proto::UpdateBufferFile {
1083                        project_id: *project_id,
1084                        buffer_id: buffer_id.to_proto(),
1085                        file: Some(new_file.to_proto(cx)),
1086                    })
1087                    .ok();
1088            }
1089
1090            buffer.file_updated(Arc::new(new_file), cx);
1091            Some(events)
1092        })?;
1093
1094        for event in events {
1095            cx.emit(event);
1096        }
1097
1098        None
1099    }
1100
1101    fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1102        let file = File::from_dyn(buffer.read(cx).file())?;
1103
1104        let remote_id = buffer.read(cx).remote_id();
1105        if let Some(entry_id) = file.entry_id {
1106            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1107                Some(_) => {
1108                    return None;
1109                }
1110                None => {
1111                    self.local_buffer_ids_by_entry_id
1112                        .insert(entry_id, remote_id);
1113                }
1114            }
1115        };
1116        self.local_buffer_ids_by_path.insert(
1117            ProjectPath {
1118                worktree_id: file.worktree_id(cx),
1119                path: file.path.clone(),
1120            },
1121            remote_id,
1122        );
1123
1124        Some(())
1125    }
1126
1127    fn save_buffer(
1128        &self,
1129        buffer: Entity<Buffer>,
1130        cx: &mut Context<BufferStore>,
1131    ) -> Task<Result<()>> {
1132        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1133            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
1134        };
1135        let worktree = file.worktree.clone();
1136        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
1137    }
1138
1139    fn save_buffer_as(
1140        &self,
1141        buffer: Entity<Buffer>,
1142        path: ProjectPath,
1143        cx: &mut Context<BufferStore>,
1144    ) -> Task<Result<()>> {
1145        let Some(worktree) = self
1146            .worktree_store
1147            .read(cx)
1148            .worktree_for_id(path.worktree_id, cx)
1149        else {
1150            return Task::ready(Err(anyhow!("no such worktree")));
1151        };
1152        self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
1153    }
1154
1155    fn open_buffer(
1156        &self,
1157        path: Arc<Path>,
1158        worktree: Entity<Worktree>,
1159        cx: &mut Context<BufferStore>,
1160    ) -> Task<Result<Entity<Buffer>>> {
1161        let load_buffer = worktree.update(cx, |worktree, cx| {
1162            let load_file = worktree.load_file(path.as_ref(), cx);
1163            let reservation = cx.reserve_entity();
1164            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
1165            cx.spawn(move |_, mut cx| async move {
1166                let loaded = load_file.await?;
1167                let text_buffer = cx
1168                    .background_executor()
1169                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
1170                    .await;
1171                cx.insert_entity(reservation, |_| {
1172                    Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
1173                })
1174            })
1175        });
1176
1177        cx.spawn(move |this, mut cx| async move {
1178            let buffer = match load_buffer.await {
1179                Ok(buffer) => Ok(buffer),
1180                Err(error) if is_not_found_error(&error) => cx.new(|cx| {
1181                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
1182                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
1183                    Buffer::build(
1184                        text_buffer,
1185                        Some(Arc::new(File {
1186                            worktree,
1187                            path,
1188                            disk_state: DiskState::New,
1189                            entry_id: None,
1190                            is_local: true,
1191                            is_private: false,
1192                        })),
1193                        Capability::ReadWrite,
1194                    )
1195                }),
1196                Err(e) => Err(e),
1197            }?;
1198            this.update(&mut cx, |this, cx| {
1199                this.add_buffer(buffer.clone(), cx)?;
1200                let buffer_id = buffer.read(cx).remote_id();
1201                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1202                    let this = this.as_local_mut().unwrap();
1203                    this.local_buffer_ids_by_path.insert(
1204                        ProjectPath {
1205                            worktree_id: file.worktree_id(cx),
1206                            path: file.path.clone(),
1207                        },
1208                        buffer_id,
1209                    );
1210
1211                    if let Some(entry_id) = file.entry_id {
1212                        this.local_buffer_ids_by_entry_id
1213                            .insert(entry_id, buffer_id);
1214                    }
1215                }
1216
1217                anyhow::Ok(())
1218            })??;
1219
1220            Ok(buffer)
1221        })
1222    }
1223
1224    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
1225        cx.spawn(|buffer_store, mut cx| async move {
1226            let buffer =
1227                cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
1228            buffer_store.update(&mut cx, |buffer_store, cx| {
1229                buffer_store.add_buffer(buffer.clone(), cx).log_err();
1230            })?;
1231            Ok(buffer)
1232        })
1233    }
1234
1235    fn reload_buffers(
1236        &self,
1237        buffers: HashSet<Entity<Buffer>>,
1238        push_to_history: bool,
1239        cx: &mut Context<BufferStore>,
1240    ) -> Task<Result<ProjectTransaction>> {
1241        cx.spawn(move |_, mut cx| async move {
1242            let mut project_transaction = ProjectTransaction::default();
1243            for buffer in buffers {
1244                let transaction = buffer
1245                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1246                    .await?;
1247                buffer.update(&mut cx, |buffer, cx| {
1248                    if let Some(transaction) = transaction {
1249                        if !push_to_history {
1250                            buffer.forget_transaction(transaction.id);
1251                        }
1252                        project_transaction.0.insert(cx.entity(), transaction);
1253                    }
1254                })?;
1255            }
1256
1257            Ok(project_transaction)
1258        })
1259    }
1260}
1261
1262impl BufferStore {
1263    pub fn init(client: &AnyProtoClient) {
1264        client.add_entity_message_handler(Self::handle_buffer_reloaded);
1265        client.add_entity_message_handler(Self::handle_buffer_saved);
1266        client.add_entity_message_handler(Self::handle_update_buffer_file);
1267        client.add_entity_request_handler(Self::handle_save_buffer);
1268        client.add_entity_request_handler(Self::handle_blame_buffer);
1269        client.add_entity_request_handler(Self::handle_reload_buffers);
1270        client.add_entity_request_handler(Self::handle_get_permalink_to_line);
1271        client.add_entity_request_handler(Self::handle_open_unstaged_changes);
1272        client.add_entity_request_handler(Self::handle_open_uncommitted_changes);
1273        client.add_entity_message_handler(Self::handle_update_diff_bases);
1274    }
1275
1276    /// Creates a buffer store, optionally retaining its buffers.
1277    pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
1278        Self {
1279            state: BufferStoreState::Local(LocalBufferStore {
1280                local_buffer_ids_by_path: Default::default(),
1281                local_buffer_ids_by_entry_id: Default::default(),
1282                worktree_store: worktree_store.clone(),
1283                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
1284                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
1285                        let this = this.as_local_mut().unwrap();
1286                        this.subscribe_to_worktree(worktree, cx);
1287                    }
1288                }),
1289            }),
1290            downstream_client: None,
1291            opened_buffers: Default::default(),
1292            shared_buffers: Default::default(),
1293            loading_buffers: Default::default(),
1294            loading_change_sets: Default::default(),
1295            worktree_store,
1296        }
1297    }
1298
1299    pub fn remote(
1300        worktree_store: Entity<WorktreeStore>,
1301        upstream_client: AnyProtoClient,
1302        remote_id: u64,
1303        _cx: &mut Context<Self>,
1304    ) -> Self {
1305        Self {
1306            state: BufferStoreState::Remote(RemoteBufferStore {
1307                shared_with_me: Default::default(),
1308                loading_remote_buffers_by_id: Default::default(),
1309                remote_buffer_listeners: Default::default(),
1310                project_id: remote_id,
1311                upstream_client,
1312                worktree_store: worktree_store.clone(),
1313            }),
1314            downstream_client: None,
1315            opened_buffers: Default::default(),
1316            loading_buffers: Default::default(),
1317            loading_change_sets: Default::default(),
1318            shared_buffers: Default::default(),
1319            worktree_store,
1320        }
1321    }
1322
1323    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
1324        match &mut self.state {
1325            BufferStoreState::Local(state) => Some(state),
1326            _ => None,
1327        }
1328    }
1329
1330    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
1331        match &mut self.state {
1332            BufferStoreState::Remote(state) => Some(state),
1333            _ => None,
1334        }
1335    }
1336
1337    fn as_remote(&self) -> Option<&RemoteBufferStore> {
1338        match &self.state {
1339            BufferStoreState::Remote(state) => Some(state),
1340            _ => None,
1341        }
1342    }
1343
1344    pub fn open_buffer(
1345        &mut self,
1346        project_path: ProjectPath,
1347        cx: &mut Context<Self>,
1348    ) -> Task<Result<Entity<Buffer>>> {
1349        if let Some(buffer) = self.get_by_path(&project_path, cx) {
1350            return Task::ready(Ok(buffer));
1351        }
1352
1353        let task = match self.loading_buffers.entry(project_path.clone()) {
1354            hash_map::Entry::Occupied(e) => e.get().clone(),
1355            hash_map::Entry::Vacant(entry) => {
1356                let path = project_path.path.clone();
1357                let Some(worktree) = self
1358                    .worktree_store
1359                    .read(cx)
1360                    .worktree_for_id(project_path.worktree_id, cx)
1361                else {
1362                    return Task::ready(Err(anyhow!("no such worktree")));
1363                };
1364                let load_buffer = match &self.state {
1365                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
1366                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
1367                };
1368
1369                entry
1370                    .insert(
1371                        cx.spawn(move |this, mut cx| async move {
1372                            let load_result = load_buffer.await;
1373                            this.update(&mut cx, |this, _cx| {
1374                                // Record the fact that the buffer is no longer loading.
1375                                this.loading_buffers.remove(&project_path);
1376                            })
1377                            .ok();
1378                            load_result.map_err(Arc::new)
1379                        })
1380                        .shared(),
1381                    )
1382                    .clone()
1383            }
1384        };
1385
1386        cx.background_executor()
1387            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1388    }
1389
1390    pub fn open_unstaged_changes(
1391        &mut self,
1392        buffer: Entity<Buffer>,
1393        cx: &mut Context<Self>,
1394    ) -> Task<Result<Entity<BufferChangeSet>>> {
1395        let buffer_id = buffer.read(cx).remote_id();
1396        if let Some(change_set) = self.get_unstaged_changes(buffer_id, cx) {
1397            return Task::ready(Ok(change_set));
1398        }
1399
1400        let task = match self
1401            .loading_change_sets
1402            .entry((buffer_id, ChangeSetKind::Unstaged))
1403        {
1404            hash_map::Entry::Occupied(e) => e.get().clone(),
1405            hash_map::Entry::Vacant(entry) => {
1406                let staged_text = match &self.state {
1407                    BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1408                    BufferStoreState::Remote(this) => this.open_unstaged_changes(buffer_id, cx),
1409                };
1410
1411                entry
1412                    .insert(
1413                        cx.spawn(move |this, cx| async move {
1414                            Self::open_change_set_internal(
1415                                this,
1416                                ChangeSetKind::Unstaged,
1417                                staged_text.await.map(DiffBasesChange::SetIndex),
1418                                buffer,
1419                                cx,
1420                            )
1421                            .await
1422                            .map_err(Arc::new)
1423                        })
1424                        .shared(),
1425                    )
1426                    .clone()
1427            }
1428        };
1429
1430        cx.background_executor()
1431            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1432    }
1433
1434    pub fn open_uncommitted_changes(
1435        &mut self,
1436        buffer: Entity<Buffer>,
1437        cx: &mut Context<Self>,
1438    ) -> Task<Result<Entity<BufferChangeSet>>> {
1439        let buffer_id = buffer.read(cx).remote_id();
1440        if let Some(change_set) = self.get_uncommitted_changes(buffer_id, cx) {
1441            return Task::ready(Ok(change_set));
1442        }
1443
1444        let task = match self
1445            .loading_change_sets
1446            .entry((buffer_id, ChangeSetKind::Uncommitted))
1447        {
1448            hash_map::Entry::Occupied(e) => e.get().clone(),
1449            hash_map::Entry::Vacant(entry) => {
1450                let changes = match &self.state {
1451                    BufferStoreState::Local(this) => {
1452                        let committed_text = this.load_committed_text(&buffer, cx);
1453                        let staged_text = this.load_staged_text(&buffer, cx);
1454                        cx.background_executor().spawn(async move {
1455                            let committed_text = committed_text.await?;
1456                            let staged_text = staged_text.await?;
1457                            let diff_bases_change = if committed_text == staged_text {
1458                                DiffBasesChange::SetBoth(committed_text)
1459                            } else {
1460                                DiffBasesChange::SetEach {
1461                                    index: staged_text,
1462                                    head: committed_text,
1463                                }
1464                            };
1465                            Ok(diff_bases_change)
1466                        })
1467                    }
1468                    BufferStoreState::Remote(this) => this.open_uncommitted_changes(buffer_id, cx),
1469                };
1470
1471                entry
1472                    .insert(
1473                        cx.spawn(move |this, cx| async move {
1474                            Self::open_change_set_internal(
1475                                this,
1476                                ChangeSetKind::Uncommitted,
1477                                changes.await,
1478                                buffer,
1479                                cx,
1480                            )
1481                            .await
1482                            .map_err(Arc::new)
1483                        })
1484                        .shared(),
1485                    )
1486                    .clone()
1487            }
1488        };
1489
1490        cx.background_executor()
1491            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1492    }
1493
1494    #[cfg(any(test, feature = "test-support"))]
1495    pub fn set_unstaged_change_set(
1496        &mut self,
1497        buffer_id: BufferId,
1498        change_set: Entity<BufferChangeSet>,
1499    ) {
1500        self.loading_change_sets.insert(
1501            (buffer_id, ChangeSetKind::Unstaged),
1502            Task::ready(Ok(change_set)).shared(),
1503        );
1504    }
1505
1506    async fn open_change_set_internal(
1507        this: WeakEntity<Self>,
1508        kind: ChangeSetKind,
1509        texts: Result<DiffBasesChange>,
1510        buffer: Entity<Buffer>,
1511        mut cx: AsyncApp,
1512    ) -> Result<Entity<BufferChangeSet>> {
1513        let diff_bases_change = match texts {
1514            Err(e) => {
1515                this.update(&mut cx, |this, cx| {
1516                    let buffer_id = buffer.read(cx).remote_id();
1517                    this.loading_change_sets.remove(&(buffer_id, kind));
1518                })?;
1519                return Err(e);
1520            }
1521            Ok(change) => change,
1522        };
1523
1524        this.update(&mut cx, |this, cx| {
1525            let buffer_id = buffer.read(cx).remote_id();
1526            this.loading_change_sets.remove(&(buffer_id, kind));
1527
1528            if let Some(OpenBuffer::Complete {
1529                change_set_state, ..
1530            }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1531            {
1532                change_set_state.update(cx, |change_set_state, cx| {
1533                    let buffer_id = buffer.read(cx).remote_id();
1534                    change_set_state.buffer_subscription.get_or_insert_with(|| {
1535                        cx.subscribe(&buffer, |this, buffer, event, cx| match event {
1536                            BufferEvent::LanguageChanged => {
1537                                this.buffer_language_changed(buffer, cx)
1538                            }
1539                            _ => {}
1540                        })
1541                    });
1542
1543                    let change_set = cx.new(|cx| BufferChangeSet {
1544                        buffer_id,
1545                        base_text: None,
1546                        diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
1547                        unstaged_change_set: None,
1548                    });
1549                    match kind {
1550                        ChangeSetKind::Unstaged => {
1551                            change_set_state.unstaged_changes = Some(change_set.downgrade())
1552                        }
1553                        ChangeSetKind::Uncommitted => {
1554                            let unstaged_change_set =
1555                                if let Some(change_set) = change_set_state.unstaged_changes() {
1556                                    change_set
1557                                } else {
1558                                    let unstaged_change_set = cx.new(|cx| BufferChangeSet {
1559                                        buffer_id,
1560                                        base_text: None,
1561                                        diff_to_buffer: BufferDiff::new(
1562                                            &buffer.read(cx).text_snapshot(),
1563                                        ),
1564                                        unstaged_change_set: None,
1565                                    });
1566                                    change_set_state.unstaged_changes =
1567                                        Some(unstaged_change_set.downgrade());
1568                                    unstaged_change_set
1569                                };
1570
1571                            change_set.update(cx, |change_set, _| {
1572                                change_set.unstaged_change_set = Some(unstaged_change_set);
1573                            });
1574                            change_set_state.uncommitted_changes = Some(change_set.downgrade())
1575                        }
1576                    };
1577
1578                    let buffer = buffer.read(cx).text_snapshot();
1579                    let rx = change_set_state.diff_bases_changed(buffer, diff_bases_change, cx);
1580
1581                    Ok(async move {
1582                        rx.await.ok();
1583                        Ok(change_set)
1584                    })
1585                })
1586            } else {
1587                Err(anyhow!("buffer was closed"))
1588            }
1589        })??
1590        .await
1591    }
1592
1593    pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1594        match &self.state {
1595            BufferStoreState::Local(this) => this.create_buffer(cx),
1596            BufferStoreState::Remote(this) => this.create_buffer(cx),
1597        }
1598    }
1599
1600    pub fn save_buffer(
1601        &mut self,
1602        buffer: Entity<Buffer>,
1603        cx: &mut Context<Self>,
1604    ) -> Task<Result<()>> {
1605        match &mut self.state {
1606            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1607            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1608        }
1609    }
1610
1611    pub fn save_buffer_as(
1612        &mut self,
1613        buffer: Entity<Buffer>,
1614        path: ProjectPath,
1615        cx: &mut Context<Self>,
1616    ) -> Task<Result<()>> {
1617        let old_file = buffer.read(cx).file().cloned();
1618        let task = match &self.state {
1619            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1620            BufferStoreState::Remote(this) => {
1621                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1622            }
1623        };
1624        cx.spawn(|this, mut cx| async move {
1625            task.await?;
1626            this.update(&mut cx, |_, cx| {
1627                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1628            })
1629        })
1630    }
1631
1632    pub fn blame_buffer(
1633        &self,
1634        buffer: &Entity<Buffer>,
1635        version: Option<clock::Global>,
1636        cx: &App,
1637    ) -> Task<Result<Option<Blame>>> {
1638        let buffer = buffer.read(cx);
1639        let Some(file) = File::from_dyn(buffer.file()) else {
1640            return Task::ready(Err(anyhow!("buffer has no file")));
1641        };
1642
1643        match file.worktree.clone().read(cx) {
1644            Worktree::Local(worktree) => {
1645                let worktree = worktree.snapshot();
1646                let blame_params = maybe!({
1647                    let local_repo = match worktree.local_repo_for_path(&file.path) {
1648                        Some(repo_for_path) => repo_for_path,
1649                        None => return Ok(None),
1650                    };
1651
1652                    let relative_path = local_repo
1653                        .relativize(&file.path)
1654                        .context("failed to relativize buffer path")?;
1655
1656                    let repo = local_repo.repo().clone();
1657
1658                    let content = match version {
1659                        Some(version) => buffer.rope_for_version(&version).clone(),
1660                        None => buffer.as_rope().clone(),
1661                    };
1662
1663                    anyhow::Ok(Some((repo, relative_path, content)))
1664                });
1665
1666                cx.background_executor().spawn(async move {
1667                    let Some((repo, relative_path, content)) = blame_params? else {
1668                        return Ok(None);
1669                    };
1670                    repo.blame(&relative_path, content)
1671                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1672                        .map(Some)
1673                })
1674            }
1675            Worktree::Remote(worktree) => {
1676                let buffer_id = buffer.remote_id();
1677                let version = buffer.version();
1678                let project_id = worktree.project_id();
1679                let client = worktree.client();
1680                cx.spawn(|_| async move {
1681                    let response = client
1682                        .request(proto::BlameBuffer {
1683                            project_id,
1684                            buffer_id: buffer_id.into(),
1685                            version: serialize_version(&version),
1686                        })
1687                        .await?;
1688                    Ok(deserialize_blame_buffer_response(response))
1689                })
1690            }
1691        }
1692    }
1693
1694    pub fn get_permalink_to_line(
1695        &self,
1696        buffer: &Entity<Buffer>,
1697        selection: Range<u32>,
1698        cx: &App,
1699    ) -> Task<Result<url::Url>> {
1700        let buffer = buffer.read(cx);
1701        let Some(file) = File::from_dyn(buffer.file()) else {
1702            return Task::ready(Err(anyhow!("buffer has no file")));
1703        };
1704
1705        match file.worktree.read(cx) {
1706            Worktree::Local(worktree) => {
1707                let worktree_path = worktree.abs_path().clone();
1708                let Some((repo_entry, repo)) =
1709                    worktree.repository_for_path(file.path()).and_then(|entry| {
1710                        let repo = worktree.get_local_repo(&entry)?.repo().clone();
1711                        Some((entry, repo))
1712                    })
1713                else {
1714                    // If we're not in a Git repo, check whether this is a Rust source
1715                    // file in the Cargo registry (presumably opened with go-to-definition
1716                    // from a normal Rust file). If so, we can put together a permalink
1717                    // using crate metadata.
1718                    if !buffer
1719                        .language()
1720                        .is_some_and(|lang| lang.name() == "Rust".into())
1721                    {
1722                        return Task::ready(Err(anyhow!("no permalink available")));
1723                    }
1724                    let file_path = worktree_path.join(file.path());
1725                    return cx.spawn(|cx| async move {
1726                        let provider_registry =
1727                            cx.update(GitHostingProviderRegistry::default_global)?;
1728                        get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1729                            .map_err(|_| anyhow!("no permalink available"))
1730                    });
1731                };
1732
1733                let path = match repo_entry.relativize(file.path()) {
1734                    Ok(RepoPath(path)) => path,
1735                    Err(e) => return Task::ready(Err(e)),
1736                };
1737
1738                cx.spawn(|cx| async move {
1739                    const REMOTE_NAME: &str = "origin";
1740                    let origin_url = repo
1741                        .remote_url(REMOTE_NAME)
1742                        .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1743
1744                    let sha = repo
1745                        .head_sha()
1746                        .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1747
1748                    let provider_registry =
1749                        cx.update(GitHostingProviderRegistry::default_global)?;
1750
1751                    let (provider, remote) =
1752                        parse_git_remote_url(provider_registry, &origin_url)
1753                            .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1754
1755                    let path = path
1756                        .to_str()
1757                        .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1758
1759                    Ok(provider.build_permalink(
1760                        remote,
1761                        BuildPermalinkParams {
1762                            sha: &sha,
1763                            path,
1764                            selection: Some(selection),
1765                        },
1766                    ))
1767                })
1768            }
1769            Worktree::Remote(worktree) => {
1770                let buffer_id = buffer.remote_id();
1771                let project_id = worktree.project_id();
1772                let client = worktree.client();
1773                cx.spawn(|_| async move {
1774                    let response = client
1775                        .request(proto::GetPermalinkToLine {
1776                            project_id,
1777                            buffer_id: buffer_id.into(),
1778                            selection: Some(proto::Range {
1779                                start: selection.start as u64,
1780                                end: selection.end as u64,
1781                            }),
1782                        })
1783                        .await?;
1784
1785                    url::Url::parse(&response.permalink).context("failed to parse permalink")
1786                })
1787            }
1788        }
1789    }
1790
1791    fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1792        let remote_id = buffer.read(cx).remote_id();
1793        let is_remote = buffer.read(cx).replica_id() != 0;
1794        let open_buffer = OpenBuffer::Complete {
1795            buffer: buffer.downgrade(),
1796            change_set_state: cx.new(|_| BufferChangeSetState::default()),
1797        };
1798
1799        let handle = cx.entity().downgrade();
1800        buffer.update(cx, move |_, cx| {
1801            cx.on_release(move |buffer, cx| {
1802                handle
1803                    .update(cx, |_, cx| {
1804                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1805                    })
1806                    .ok();
1807            })
1808            .detach()
1809        });
1810
1811        match self.opened_buffers.entry(remote_id) {
1812            hash_map::Entry::Vacant(entry) => {
1813                entry.insert(open_buffer);
1814            }
1815            hash_map::Entry::Occupied(mut entry) => {
1816                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1817                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1818                } else if entry.get().upgrade().is_some() {
1819                    if is_remote {
1820                        return Ok(());
1821                    } else {
1822                        debug_panic!("buffer {} was already registered", remote_id);
1823                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1824                    }
1825                }
1826                entry.insert(open_buffer);
1827            }
1828        }
1829
1830        cx.subscribe(&buffer, Self::on_buffer_event).detach();
1831        cx.emit(BufferStoreEvent::BufferAdded(buffer));
1832        Ok(())
1833    }
1834
1835    pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1836        self.opened_buffers
1837            .values()
1838            .filter_map(|buffer| buffer.upgrade())
1839    }
1840
1841    pub fn loading_buffers(
1842        &self,
1843    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1844        self.loading_buffers.iter().map(|(path, task)| {
1845            let task = task.clone();
1846            (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1847        })
1848    }
1849
1850    pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1851        self.buffers().find_map(|buffer| {
1852            let file = File::from_dyn(buffer.read(cx).file())?;
1853            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1854                Some(buffer)
1855            } else {
1856                None
1857            }
1858        })
1859    }
1860
1861    pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1862        self.opened_buffers.get(&buffer_id)?.upgrade()
1863    }
1864
1865    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1866        self.get(buffer_id)
1867            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1868    }
1869
1870    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1871        self.get(buffer_id).or_else(|| {
1872            self.as_remote()
1873                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1874        })
1875    }
1876
1877    pub fn get_unstaged_changes(
1878        &self,
1879        buffer_id: BufferId,
1880        cx: &App,
1881    ) -> Option<Entity<BufferChangeSet>> {
1882        if let OpenBuffer::Complete {
1883            change_set_state, ..
1884        } = self.opened_buffers.get(&buffer_id)?
1885        {
1886            change_set_state
1887                .read(cx)
1888                .unstaged_changes
1889                .as_ref()?
1890                .upgrade()
1891        } else {
1892            None
1893        }
1894    }
1895
1896    pub fn get_uncommitted_changes(
1897        &self,
1898        buffer_id: BufferId,
1899        cx: &App,
1900    ) -> Option<Entity<BufferChangeSet>> {
1901        if let OpenBuffer::Complete {
1902            change_set_state, ..
1903        } = self.opened_buffers.get(&buffer_id)?
1904        {
1905            change_set_state
1906                .read(cx)
1907                .uncommitted_changes
1908                .as_ref()?
1909                .upgrade()
1910        } else {
1911            None
1912        }
1913    }
1914
1915    pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1916        let buffers = self
1917            .buffers()
1918            .map(|buffer| {
1919                let buffer = buffer.read(cx);
1920                proto::BufferVersion {
1921                    id: buffer.remote_id().into(),
1922                    version: language::proto::serialize_version(&buffer.version),
1923                }
1924            })
1925            .collect();
1926        let incomplete_buffer_ids = self
1927            .as_remote()
1928            .map(|remote| remote.incomplete_buffer_ids())
1929            .unwrap_or_default();
1930        (buffers, incomplete_buffer_ids)
1931    }
1932
1933    pub fn disconnected_from_host(&mut self, cx: &mut App) {
1934        for open_buffer in self.opened_buffers.values_mut() {
1935            if let Some(buffer) = open_buffer.upgrade() {
1936                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1937            }
1938        }
1939
1940        for buffer in self.buffers() {
1941            buffer.update(cx, |buffer, cx| {
1942                buffer.set_capability(Capability::ReadOnly, cx)
1943            });
1944        }
1945
1946        if let Some(remote) = self.as_remote_mut() {
1947            // Wake up all futures currently waiting on a buffer to get opened,
1948            // to give them a chance to fail now that we've disconnected.
1949            remote.remote_buffer_listeners.clear()
1950        }
1951    }
1952
1953    pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1954        self.downstream_client = Some((downstream_client, remote_id));
1955    }
1956
1957    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1958        self.downstream_client.take();
1959        self.forget_shared_buffers();
1960    }
1961
1962    pub fn discard_incomplete(&mut self) {
1963        self.opened_buffers
1964            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1965    }
1966
1967    pub fn find_search_candidates(
1968        &mut self,
1969        query: &SearchQuery,
1970        mut limit: usize,
1971        fs: Arc<dyn Fs>,
1972        cx: &mut Context<Self>,
1973    ) -> Receiver<Entity<Buffer>> {
1974        let (tx, rx) = smol::channel::unbounded();
1975        let mut open_buffers = HashSet::default();
1976        let mut unnamed_buffers = Vec::new();
1977        for handle in self.buffers() {
1978            let buffer = handle.read(cx);
1979            if let Some(entry_id) = buffer.entry_id(cx) {
1980                open_buffers.insert(entry_id);
1981            } else {
1982                limit = limit.saturating_sub(1);
1983                unnamed_buffers.push(handle)
1984            };
1985        }
1986
1987        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1988        let project_paths_rx = self
1989            .worktree_store
1990            .update(cx, |worktree_store, cx| {
1991                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1992            })
1993            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1994
1995        cx.spawn(|this, mut cx| async move {
1996            for buffer in unnamed_buffers {
1997                tx.send(buffer).await.ok();
1998            }
1999
2000            let mut project_paths_rx = pin!(project_paths_rx);
2001            while let Some(project_paths) = project_paths_rx.next().await {
2002                let buffers = this.update(&mut cx, |this, cx| {
2003                    project_paths
2004                        .into_iter()
2005                        .map(|project_path| this.open_buffer(project_path, cx))
2006                        .collect::<Vec<_>>()
2007                })?;
2008                for buffer_task in buffers {
2009                    if let Some(buffer) = buffer_task.await.log_err() {
2010                        if tx.send(buffer).await.is_err() {
2011                            return anyhow::Ok(());
2012                        }
2013                    }
2014                }
2015            }
2016            anyhow::Ok(())
2017        })
2018        .detach();
2019        rx
2020    }
2021
2022    pub fn recalculate_buffer_diffs(
2023        &mut self,
2024        buffers: Vec<Entity<Buffer>>,
2025        cx: &mut Context<Self>,
2026    ) -> impl Future<Output = ()> {
2027        let mut futures = Vec::new();
2028        for buffer in buffers {
2029            if let Some(OpenBuffer::Complete {
2030                change_set_state, ..
2031            }) = self.opened_buffers.get_mut(&buffer.read(cx).remote_id())
2032            {
2033                let buffer = buffer.read(cx).text_snapshot();
2034                futures.push(change_set_state.update(cx, |change_set_state, cx| {
2035                    change_set_state.recalculate_diffs(buffer, cx)
2036                }));
2037            }
2038        }
2039        async move {
2040            futures::future::join_all(futures).await;
2041        }
2042    }
2043
2044    fn on_buffer_event(
2045        &mut self,
2046        buffer: Entity<Buffer>,
2047        event: &BufferEvent,
2048        cx: &mut Context<Self>,
2049    ) {
2050        match event {
2051            BufferEvent::FileHandleChanged => {
2052                if let Some(local) = self.as_local_mut() {
2053                    local.buffer_changed_file(buffer, cx);
2054                }
2055            }
2056            BufferEvent::Reloaded => {
2057                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
2058                    return;
2059                };
2060                let buffer = buffer.read(cx);
2061                downstream_client
2062                    .send(proto::BufferReloaded {
2063                        project_id: *project_id,
2064                        buffer_id: buffer.remote_id().to_proto(),
2065                        version: serialize_version(&buffer.version()),
2066                        mtime: buffer.saved_mtime().map(|t| t.into()),
2067                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
2068                    })
2069                    .log_err();
2070            }
2071            _ => {}
2072        }
2073    }
2074
2075    pub async fn handle_update_buffer(
2076        this: Entity<Self>,
2077        envelope: TypedEnvelope<proto::UpdateBuffer>,
2078        mut cx: AsyncApp,
2079    ) -> Result<proto::Ack> {
2080        let payload = envelope.payload.clone();
2081        let buffer_id = BufferId::new(payload.buffer_id)?;
2082        let ops = payload
2083            .operations
2084            .into_iter()
2085            .map(language::proto::deserialize_operation)
2086            .collect::<Result<Vec<_>, _>>()?;
2087        this.update(&mut cx, |this, cx| {
2088            match this.opened_buffers.entry(buffer_id) {
2089                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
2090                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
2091                    OpenBuffer::Complete { buffer, .. } => {
2092                        if let Some(buffer) = buffer.upgrade() {
2093                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
2094                        }
2095                    }
2096                },
2097                hash_map::Entry::Vacant(e) => {
2098                    e.insert(OpenBuffer::Operations(ops));
2099                }
2100            }
2101            Ok(proto::Ack {})
2102        })?
2103    }
2104
2105    pub fn register_shared_lsp_handle(
2106        &mut self,
2107        peer_id: proto::PeerId,
2108        buffer_id: BufferId,
2109        handle: OpenLspBufferHandle,
2110    ) {
2111        if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
2112            if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
2113                buffer.lsp_handle = Some(handle);
2114                return;
2115            }
2116        }
2117        debug_panic!("tried to register shared lsp handle, but buffer was not shared")
2118    }
2119
2120    pub fn handle_synchronize_buffers(
2121        &mut self,
2122        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
2123        cx: &mut Context<Self>,
2124        client: Arc<Client>,
2125    ) -> Result<proto::SynchronizeBuffersResponse> {
2126        let project_id = envelope.payload.project_id;
2127        let mut response = proto::SynchronizeBuffersResponse {
2128            buffers: Default::default(),
2129        };
2130        let Some(guest_id) = envelope.original_sender_id else {
2131            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
2132        };
2133
2134        self.shared_buffers.entry(guest_id).or_default().clear();
2135        for buffer in envelope.payload.buffers {
2136            let buffer_id = BufferId::new(buffer.id)?;
2137            let remote_version = language::proto::deserialize_version(&buffer.version);
2138            if let Some(buffer) = self.get(buffer_id) {
2139                self.shared_buffers
2140                    .entry(guest_id)
2141                    .or_default()
2142                    .entry(buffer_id)
2143                    .or_insert_with(|| SharedBuffer {
2144                        buffer: buffer.clone(),
2145                        change_set: None,
2146                        lsp_handle: None,
2147                    });
2148
2149                let buffer = buffer.read(cx);
2150                response.buffers.push(proto::BufferVersion {
2151                    id: buffer_id.into(),
2152                    version: language::proto::serialize_version(&buffer.version),
2153                });
2154
2155                let operations = buffer.serialize_ops(Some(remote_version), cx);
2156                let client = client.clone();
2157                if let Some(file) = buffer.file() {
2158                    client
2159                        .send(proto::UpdateBufferFile {
2160                            project_id,
2161                            buffer_id: buffer_id.into(),
2162                            file: Some(file.to_proto(cx)),
2163                        })
2164                        .log_err();
2165                }
2166
2167                // TODO(max): do something
2168                // client
2169                //     .send(proto::UpdateStagedText {
2170                //         project_id,
2171                //         buffer_id: buffer_id.into(),
2172                //         diff_base: buffer.diff_base().map(ToString::to_string),
2173                //     })
2174                //     .log_err();
2175
2176                client
2177                    .send(proto::BufferReloaded {
2178                        project_id,
2179                        buffer_id: buffer_id.into(),
2180                        version: language::proto::serialize_version(buffer.saved_version()),
2181                        mtime: buffer.saved_mtime().map(|time| time.into()),
2182                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
2183                            as i32,
2184                    })
2185                    .log_err();
2186
2187                cx.background_executor()
2188                    .spawn(
2189                        async move {
2190                            let operations = operations.await;
2191                            for chunk in split_operations(operations) {
2192                                client
2193                                    .request(proto::UpdateBuffer {
2194                                        project_id,
2195                                        buffer_id: buffer_id.into(),
2196                                        operations: chunk,
2197                                    })
2198                                    .await?;
2199                            }
2200                            anyhow::Ok(())
2201                        }
2202                        .log_err(),
2203                    )
2204                    .detach();
2205            }
2206        }
2207        Ok(response)
2208    }
2209
2210    pub fn handle_create_buffer_for_peer(
2211        &mut self,
2212        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
2213        replica_id: u16,
2214        capability: Capability,
2215        cx: &mut Context<Self>,
2216    ) -> Result<()> {
2217        let Some(remote) = self.as_remote_mut() else {
2218            return Err(anyhow!("buffer store is not a remote"));
2219        };
2220
2221        if let Some(buffer) =
2222            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
2223        {
2224            self.add_buffer(buffer, cx)?;
2225        }
2226
2227        Ok(())
2228    }
2229
2230    pub async fn handle_update_buffer_file(
2231        this: Entity<Self>,
2232        envelope: TypedEnvelope<proto::UpdateBufferFile>,
2233        mut cx: AsyncApp,
2234    ) -> Result<()> {
2235        let buffer_id = envelope.payload.buffer_id;
2236        let buffer_id = BufferId::new(buffer_id)?;
2237
2238        this.update(&mut cx, |this, cx| {
2239            let payload = envelope.payload.clone();
2240            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2241                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
2242                let worktree = this
2243                    .worktree_store
2244                    .read(cx)
2245                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
2246                    .ok_or_else(|| anyhow!("no such worktree"))?;
2247                let file = File::from_proto(file, worktree, cx)?;
2248                let old_file = buffer.update(cx, |buffer, cx| {
2249                    let old_file = buffer.file().cloned();
2250                    let new_path = file.path.clone();
2251                    buffer.file_updated(Arc::new(file), cx);
2252                    if old_file
2253                        .as_ref()
2254                        .map_or(true, |old| *old.path() != new_path)
2255                    {
2256                        Some(old_file)
2257                    } else {
2258                        None
2259                    }
2260                });
2261                if let Some(old_file) = old_file {
2262                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
2263                }
2264            }
2265            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2266                downstream_client
2267                    .send(proto::UpdateBufferFile {
2268                        project_id: *project_id,
2269                        buffer_id: buffer_id.into(),
2270                        file: envelope.payload.file,
2271                    })
2272                    .log_err();
2273            }
2274            Ok(())
2275        })?
2276    }
2277
2278    pub async fn handle_save_buffer(
2279        this: Entity<Self>,
2280        envelope: TypedEnvelope<proto::SaveBuffer>,
2281        mut cx: AsyncApp,
2282    ) -> Result<proto::BufferSaved> {
2283        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2284        let (buffer, project_id) = this.update(&mut cx, |this, _| {
2285            anyhow::Ok((
2286                this.get_existing(buffer_id)?,
2287                this.downstream_client
2288                    .as_ref()
2289                    .map(|(_, project_id)| *project_id)
2290                    .context("project is not shared")?,
2291            ))
2292        })??;
2293        buffer
2294            .update(&mut cx, |buffer, _| {
2295                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
2296            })?
2297            .await?;
2298        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
2299
2300        if let Some(new_path) = envelope.payload.new_path {
2301            let new_path = ProjectPath::from_proto(new_path);
2302            this.update(&mut cx, |this, cx| {
2303                this.save_buffer_as(buffer.clone(), new_path, cx)
2304            })?
2305            .await?;
2306        } else {
2307            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
2308                .await?;
2309        }
2310
2311        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
2312            project_id,
2313            buffer_id: buffer_id.into(),
2314            version: serialize_version(buffer.saved_version()),
2315            mtime: buffer.saved_mtime().map(|time| time.into()),
2316        })
2317    }
2318
2319    pub async fn handle_close_buffer(
2320        this: Entity<Self>,
2321        envelope: TypedEnvelope<proto::CloseBuffer>,
2322        mut cx: AsyncApp,
2323    ) -> Result<()> {
2324        let peer_id = envelope.sender_id;
2325        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2326        this.update(&mut cx, |this, _| {
2327            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
2328                if shared.remove(&buffer_id).is_some() {
2329                    if shared.is_empty() {
2330                        this.shared_buffers.remove(&peer_id);
2331                    }
2332                    return;
2333                }
2334            }
2335            debug_panic!(
2336                "peer_id {} closed buffer_id {} which was either not open or already closed",
2337                peer_id,
2338                buffer_id
2339            )
2340        })
2341    }
2342
2343    pub async fn handle_buffer_saved(
2344        this: Entity<Self>,
2345        envelope: TypedEnvelope<proto::BufferSaved>,
2346        mut cx: AsyncApp,
2347    ) -> Result<()> {
2348        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2349        let version = deserialize_version(&envelope.payload.version);
2350        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2351        this.update(&mut cx, move |this, cx| {
2352            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2353                buffer.update(cx, |buffer, cx| {
2354                    buffer.did_save(version, mtime, cx);
2355                });
2356            }
2357
2358            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2359                downstream_client
2360                    .send(proto::BufferSaved {
2361                        project_id: *project_id,
2362                        buffer_id: buffer_id.into(),
2363                        mtime: envelope.payload.mtime,
2364                        version: envelope.payload.version,
2365                    })
2366                    .log_err();
2367            }
2368        })
2369    }
2370
2371    pub async fn handle_buffer_reloaded(
2372        this: Entity<Self>,
2373        envelope: TypedEnvelope<proto::BufferReloaded>,
2374        mut cx: AsyncApp,
2375    ) -> Result<()> {
2376        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2377        let version = deserialize_version(&envelope.payload.version);
2378        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
2379        let line_ending = deserialize_line_ending(
2380            proto::LineEnding::from_i32(envelope.payload.line_ending)
2381                .ok_or_else(|| anyhow!("missing line ending"))?,
2382        );
2383        this.update(&mut cx, |this, cx| {
2384            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
2385                buffer.update(cx, |buffer, cx| {
2386                    buffer.did_reload(version, line_ending, mtime, cx);
2387                });
2388            }
2389
2390            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
2391                downstream_client
2392                    .send(proto::BufferReloaded {
2393                        project_id: *project_id,
2394                        buffer_id: buffer_id.into(),
2395                        mtime: envelope.payload.mtime,
2396                        version: envelope.payload.version,
2397                        line_ending: envelope.payload.line_ending,
2398                    })
2399                    .log_err();
2400            }
2401        })
2402    }
2403
2404    pub async fn handle_blame_buffer(
2405        this: Entity<Self>,
2406        envelope: TypedEnvelope<proto::BlameBuffer>,
2407        mut cx: AsyncApp,
2408    ) -> Result<proto::BlameBufferResponse> {
2409        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2410        let version = deserialize_version(&envelope.payload.version);
2411        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2412        buffer
2413            .update(&mut cx, |buffer, _| {
2414                buffer.wait_for_version(version.clone())
2415            })?
2416            .await?;
2417        let blame = this
2418            .update(&mut cx, |this, cx| {
2419                this.blame_buffer(&buffer, Some(version), cx)
2420            })?
2421            .await?;
2422        Ok(serialize_blame_buffer_response(blame))
2423    }
2424
2425    pub async fn handle_get_permalink_to_line(
2426        this: Entity<Self>,
2427        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2428        mut cx: AsyncApp,
2429    ) -> Result<proto::GetPermalinkToLineResponse> {
2430        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2431        // let version = deserialize_version(&envelope.payload.version);
2432        let selection = {
2433            let proto_selection = envelope
2434                .payload
2435                .selection
2436                .context("no selection to get permalink for defined")?;
2437            proto_selection.start as u32..proto_selection.end as u32
2438        };
2439        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
2440        let permalink = this
2441            .update(&mut cx, |this, cx| {
2442                this.get_permalink_to_line(&buffer, selection, cx)
2443            })?
2444            .await?;
2445        Ok(proto::GetPermalinkToLineResponse {
2446            permalink: permalink.to_string(),
2447        })
2448    }
2449
2450    pub async fn handle_open_unstaged_changes(
2451        this: Entity<Self>,
2452        request: TypedEnvelope<proto::OpenUnstagedChanges>,
2453        mut cx: AsyncApp,
2454    ) -> Result<proto::OpenUnstagedChangesResponse> {
2455        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2456        let change_set = this
2457            .update(&mut cx, |this, cx| {
2458                let buffer = this.get(buffer_id)?;
2459                Some(this.open_unstaged_changes(buffer, cx))
2460            })?
2461            .ok_or_else(|| anyhow!("no such buffer"))?
2462            .await?;
2463        this.update(&mut cx, |this, _| {
2464            let shared_buffers = this
2465                .shared_buffers
2466                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2467                .or_default();
2468            debug_assert!(shared_buffers.contains_key(&buffer_id));
2469            if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2470                shared.change_set = Some(change_set.clone());
2471            }
2472        })?;
2473        let staged_text = change_set.read_with(&cx, |change_set, _| {
2474            change_set.base_text.as_ref().map(|buffer| buffer.text())
2475        })?;
2476        Ok(proto::OpenUnstagedChangesResponse { staged_text })
2477    }
2478
2479    pub async fn handle_open_uncommitted_changes(
2480        this: Entity<Self>,
2481        request: TypedEnvelope<proto::OpenUncommittedChanges>,
2482        mut cx: AsyncApp,
2483    ) -> Result<proto::OpenUncommittedChangesResponse> {
2484        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2485        let change_set = this
2486            .update(&mut cx, |this, cx| {
2487                let buffer = this.get(buffer_id)?;
2488                Some(this.open_uncommitted_changes(buffer, cx))
2489            })?
2490            .ok_or_else(|| anyhow!("no such buffer"))?
2491            .await?;
2492        this.update(&mut cx, |this, _| {
2493            let shared_buffers = this
2494                .shared_buffers
2495                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2496                .or_default();
2497            debug_assert!(shared_buffers.contains_key(&buffer_id));
2498            if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
2499                shared.change_set = Some(change_set.clone());
2500            }
2501        })?;
2502        change_set.read_with(&cx, |change_set, cx| {
2503            use proto::open_uncommitted_changes_response::Mode;
2504
2505            let staged_buffer = change_set
2506                .unstaged_change_set
2507                .as_ref()
2508                .and_then(|change_set| change_set.read(cx).base_text.as_ref());
2509
2510            let mode;
2511            let staged_text;
2512            let committed_text;
2513            if let Some(committed_buffer) = &change_set.base_text {
2514                committed_text = Some(committed_buffer.text());
2515                if let Some(staged_buffer) = staged_buffer {
2516                    if staged_buffer.remote_id() == committed_buffer.remote_id() {
2517                        mode = Mode::IndexMatchesHead;
2518                        staged_text = None;
2519                    } else {
2520                        mode = Mode::IndexAndHead;
2521                        staged_text = Some(staged_buffer.text());
2522                    }
2523                } else {
2524                    mode = Mode::IndexAndHead;
2525                    staged_text = None;
2526                }
2527            } else {
2528                mode = Mode::IndexAndHead;
2529                committed_text = None;
2530                staged_text = staged_buffer.as_ref().map(|buffer| buffer.text());
2531            }
2532
2533            proto::OpenUncommittedChangesResponse {
2534                committed_text,
2535                staged_text,
2536                mode: mode.into(),
2537            }
2538        })
2539    }
2540
2541    pub async fn handle_update_diff_bases(
2542        this: Entity<Self>,
2543        request: TypedEnvelope<proto::UpdateDiffBases>,
2544        mut cx: AsyncApp,
2545    ) -> Result<()> {
2546        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2547        this.update(&mut cx, |this, cx| {
2548            if let Some(OpenBuffer::Complete {
2549                change_set_state,
2550                buffer,
2551            }) = this.opened_buffers.get_mut(&buffer_id)
2552            {
2553                if let Some(buffer) = buffer.upgrade() {
2554                    let buffer = buffer.read(cx).text_snapshot();
2555                    change_set_state.update(cx, |change_set_state, cx| {
2556                        change_set_state.handle_base_texts_updated(buffer, request.payload, cx);
2557                    })
2558                }
2559            }
2560        })
2561    }
2562
2563    pub fn reload_buffers(
2564        &self,
2565        buffers: HashSet<Entity<Buffer>>,
2566        push_to_history: bool,
2567        cx: &mut Context<Self>,
2568    ) -> Task<Result<ProjectTransaction>> {
2569        if buffers.is_empty() {
2570            return Task::ready(Ok(ProjectTransaction::default()));
2571        }
2572        match &self.state {
2573            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2574            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2575        }
2576    }
2577
2578    async fn handle_reload_buffers(
2579        this: Entity<Self>,
2580        envelope: TypedEnvelope<proto::ReloadBuffers>,
2581        mut cx: AsyncApp,
2582    ) -> Result<proto::ReloadBuffersResponse> {
2583        let sender_id = envelope.original_sender_id().unwrap_or_default();
2584        let reload = this.update(&mut cx, |this, cx| {
2585            let mut buffers = HashSet::default();
2586            for buffer_id in &envelope.payload.buffer_ids {
2587                let buffer_id = BufferId::new(*buffer_id)?;
2588                buffers.insert(this.get_existing(buffer_id)?);
2589            }
2590            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2591        })??;
2592
2593        let project_transaction = reload.await?;
2594        let project_transaction = this.update(&mut cx, |this, cx| {
2595            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2596        })?;
2597        Ok(proto::ReloadBuffersResponse {
2598            transaction: Some(project_transaction),
2599        })
2600    }
2601
2602    pub fn create_buffer_for_peer(
2603        &mut self,
2604        buffer: &Entity<Buffer>,
2605        peer_id: proto::PeerId,
2606        cx: &mut Context<Self>,
2607    ) -> Task<Result<()>> {
2608        let buffer_id = buffer.read(cx).remote_id();
2609        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2610        if shared_buffers.contains_key(&buffer_id) {
2611            return Task::ready(Ok(()));
2612        }
2613        shared_buffers.insert(
2614            buffer_id,
2615            SharedBuffer {
2616                buffer: buffer.clone(),
2617                change_set: None,
2618                lsp_handle: None,
2619            },
2620        );
2621
2622        let Some((client, project_id)) = self.downstream_client.clone() else {
2623            return Task::ready(Ok(()));
2624        };
2625
2626        cx.spawn(|this, mut cx| async move {
2627            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2628                return anyhow::Ok(());
2629            };
2630
2631            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2632            let operations = operations.await;
2633            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2634
2635            let initial_state = proto::CreateBufferForPeer {
2636                project_id,
2637                peer_id: Some(peer_id),
2638                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2639            };
2640
2641            if client.send(initial_state).log_err().is_some() {
2642                let client = client.clone();
2643                cx.background_executor()
2644                    .spawn(async move {
2645                        let mut chunks = split_operations(operations).peekable();
2646                        while let Some(chunk) = chunks.next() {
2647                            let is_last = chunks.peek().is_none();
2648                            client.send(proto::CreateBufferForPeer {
2649                                project_id,
2650                                peer_id: Some(peer_id),
2651                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2652                                    proto::BufferChunk {
2653                                        buffer_id: buffer_id.into(),
2654                                        operations: chunk,
2655                                        is_last,
2656                                    },
2657                                )),
2658                            })?;
2659                        }
2660                        anyhow::Ok(())
2661                    })
2662                    .await
2663                    .log_err();
2664            }
2665            Ok(())
2666        })
2667    }
2668
2669    pub fn forget_shared_buffers(&mut self) {
2670        self.shared_buffers.clear();
2671    }
2672
2673    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2674        self.shared_buffers.remove(peer_id);
2675    }
2676
2677    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2678        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2679            self.shared_buffers.insert(new_peer_id, buffers);
2680        }
2681    }
2682
2683    pub fn has_shared_buffers(&self) -> bool {
2684        !self.shared_buffers.is_empty()
2685    }
2686
2687    pub fn create_local_buffer(
2688        &mut self,
2689        text: &str,
2690        language: Option<Arc<Language>>,
2691        cx: &mut Context<Self>,
2692    ) -> Entity<Buffer> {
2693        let buffer = cx.new(|cx| {
2694            Buffer::local(text, cx)
2695                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2696        });
2697
2698        self.add_buffer(buffer.clone(), cx).log_err();
2699        let buffer_id = buffer.read(cx).remote_id();
2700
2701        let this = self
2702            .as_local_mut()
2703            .expect("local-only method called in a non-local context");
2704        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2705            this.local_buffer_ids_by_path.insert(
2706                ProjectPath {
2707                    worktree_id: file.worktree_id(cx),
2708                    path: file.path.clone(),
2709                },
2710                buffer_id,
2711            );
2712
2713            if let Some(entry_id) = file.entry_id {
2714                this.local_buffer_ids_by_entry_id
2715                    .insert(entry_id, buffer_id);
2716            }
2717        }
2718        buffer
2719    }
2720
2721    pub fn deserialize_project_transaction(
2722        &mut self,
2723        message: proto::ProjectTransaction,
2724        push_to_history: bool,
2725        cx: &mut Context<Self>,
2726    ) -> Task<Result<ProjectTransaction>> {
2727        if let Some(this) = self.as_remote_mut() {
2728            this.deserialize_project_transaction(message, push_to_history, cx)
2729        } else {
2730            debug_panic!("not a remote buffer store");
2731            Task::ready(Err(anyhow!("not a remote buffer store")))
2732        }
2733    }
2734
2735    pub fn wait_for_remote_buffer(
2736        &mut self,
2737        id: BufferId,
2738        cx: &mut Context<BufferStore>,
2739    ) -> Task<Result<Entity<Buffer>>> {
2740        if let Some(this) = self.as_remote_mut() {
2741            this.wait_for_remote_buffer(id, cx)
2742        } else {
2743            debug_panic!("not a remote buffer store");
2744            Task::ready(Err(anyhow!("not a remote buffer store")))
2745        }
2746    }
2747
2748    pub fn serialize_project_transaction_for_peer(
2749        &mut self,
2750        project_transaction: ProjectTransaction,
2751        peer_id: proto::PeerId,
2752        cx: &mut Context<Self>,
2753    ) -> proto::ProjectTransaction {
2754        let mut serialized_transaction = proto::ProjectTransaction {
2755            buffer_ids: Default::default(),
2756            transactions: Default::default(),
2757        };
2758        for (buffer, transaction) in project_transaction.0 {
2759            self.create_buffer_for_peer(&buffer, peer_id, cx)
2760                .detach_and_log_err(cx);
2761            serialized_transaction
2762                .buffer_ids
2763                .push(buffer.read(cx).remote_id().into());
2764            serialized_transaction
2765                .transactions
2766                .push(language::proto::serialize_transaction(&transaction));
2767        }
2768        serialized_transaction
2769    }
2770}
2771
2772impl EventEmitter<BufferChangeSetEvent> for BufferChangeSet {}
2773
2774impl BufferChangeSet {
2775    fn set_state(
2776        &mut self,
2777        base_text: Option<language::BufferSnapshot>,
2778        diff: BufferDiff,
2779        buffer: &text::BufferSnapshot,
2780        cx: &mut Context<Self>,
2781    ) {
2782        if let Some(base_text) = base_text.as_ref() {
2783            let changed_range = if Some(base_text.remote_id())
2784                != self.base_text.as_ref().map(|buffer| buffer.remote_id())
2785            {
2786                Some(text::Anchor::MIN..text::Anchor::MAX)
2787            } else {
2788                diff.compare(&self.diff_to_buffer, buffer)
2789            };
2790            if let Some(changed_range) = changed_range {
2791                cx.emit(BufferChangeSetEvent::DiffChanged { changed_range });
2792            }
2793        }
2794        self.base_text = base_text;
2795        self.diff_to_buffer = diff;
2796    }
2797
2798    pub fn diff_hunks_intersecting_range<'a>(
2799        &'a self,
2800        range: Range<text::Anchor>,
2801        buffer_snapshot: &'a text::BufferSnapshot,
2802    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2803        self.diff_to_buffer
2804            .hunks_intersecting_range(range, buffer_snapshot)
2805    }
2806
2807    pub fn diff_hunks_intersecting_range_rev<'a>(
2808        &'a self,
2809        range: Range<text::Anchor>,
2810        buffer_snapshot: &'a text::BufferSnapshot,
2811    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2812        self.diff_to_buffer
2813            .hunks_intersecting_range_rev(range, buffer_snapshot)
2814    }
2815
2816    /// Used in cases where the change set isn't derived from git.
2817    pub fn set_base_text(
2818        &mut self,
2819        base_buffer: Entity<language::Buffer>,
2820        buffer: text::BufferSnapshot,
2821        cx: &mut Context<Self>,
2822    ) -> oneshot::Receiver<()> {
2823        let (tx, rx) = oneshot::channel();
2824        let this = cx.weak_entity();
2825        let base_buffer = base_buffer.read(cx).snapshot();
2826        cx.spawn(|_, mut cx| async move {
2827            let diff = cx
2828                .background_executor()
2829                .spawn({
2830                    let base_buffer = base_buffer.clone();
2831                    let buffer = buffer.clone();
2832                    async move { BufferDiff::build(Some(&base_buffer.text()), &buffer) }
2833                })
2834                .await;
2835            let Some(this) = this.upgrade() else {
2836                tx.send(()).ok();
2837                return;
2838            };
2839            this.update(&mut cx, |this, cx| {
2840                this.set_state(Some(base_buffer), diff, &buffer, cx);
2841            })
2842            .log_err();
2843            tx.send(()).ok();
2844        })
2845        .detach();
2846        rx
2847    }
2848
2849    #[cfg(any(test, feature = "test-support"))]
2850    pub fn base_text_string(&self) -> Option<String> {
2851        self.base_text.as_ref().map(|buffer| buffer.text())
2852    }
2853
2854    pub fn new(buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2855        BufferChangeSet {
2856            buffer_id: buffer.read(cx).remote_id(),
2857            base_text: None,
2858            diff_to_buffer: BufferDiff::new(&buffer.read(cx).text_snapshot()),
2859            unstaged_change_set: None,
2860        }
2861    }
2862
2863    #[cfg(any(test, feature = "test-support"))]
2864    pub fn new_with_base_text(base_text: &str, buffer: &Entity<Buffer>, cx: &mut App) -> Self {
2865        let mut base_text = base_text.to_owned();
2866        text::LineEnding::normalize(&mut base_text);
2867        let diff_to_buffer = BufferDiff::build(Some(&base_text), &buffer.read(cx).text_snapshot());
2868        let base_text = language::Buffer::build_snapshot_sync(base_text.into(), None, None, cx);
2869        BufferChangeSet {
2870            buffer_id: buffer.read(cx).remote_id(),
2871            base_text: Some(base_text),
2872            diff_to_buffer,
2873            unstaged_change_set: None,
2874        }
2875    }
2876
2877    #[cfg(any(test, feature = "test-support"))]
2878    pub fn recalculate_diff_sync(
2879        &mut self,
2880        snapshot: text::BufferSnapshot,
2881        cx: &mut Context<Self>,
2882    ) {
2883        let mut base_text = self.base_text.as_ref().map(|buffer| buffer.text());
2884        if let Some(base_text) = base_text.as_mut() {
2885            text::LineEnding::normalize(base_text);
2886        }
2887        let diff_to_buffer = BufferDiff::build(base_text.as_deref(), &snapshot);
2888        self.set_state(self.base_text.clone(), diff_to_buffer, &snapshot, cx);
2889    }
2890}
2891
2892impl OpenBuffer {
2893    fn upgrade(&self) -> Option<Entity<Buffer>> {
2894        match self {
2895            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2896            OpenBuffer::Operations(_) => None,
2897        }
2898    }
2899}
2900
2901fn is_not_found_error(error: &anyhow::Error) -> bool {
2902    error
2903        .root_cause()
2904        .downcast_ref::<io::Error>()
2905        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2906}
2907
2908fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2909    let Some(blame) = blame else {
2910        return proto::BlameBufferResponse {
2911            blame_response: None,
2912        };
2913    };
2914
2915    let entries = blame
2916        .entries
2917        .into_iter()
2918        .map(|entry| proto::BlameEntry {
2919            sha: entry.sha.as_bytes().into(),
2920            start_line: entry.range.start,
2921            end_line: entry.range.end,
2922            original_line_number: entry.original_line_number,
2923            author: entry.author.clone(),
2924            author_mail: entry.author_mail.clone(),
2925            author_time: entry.author_time,
2926            author_tz: entry.author_tz.clone(),
2927            committer: entry.committer.clone(),
2928            committer_mail: entry.committer_mail.clone(),
2929            committer_time: entry.committer_time,
2930            committer_tz: entry.committer_tz.clone(),
2931            summary: entry.summary.clone(),
2932            previous: entry.previous.clone(),
2933            filename: entry.filename.clone(),
2934        })
2935        .collect::<Vec<_>>();
2936
2937    let messages = blame
2938        .messages
2939        .into_iter()
2940        .map(|(oid, message)| proto::CommitMessage {
2941            oid: oid.as_bytes().into(),
2942            message,
2943        })
2944        .collect::<Vec<_>>();
2945
2946    let permalinks = blame
2947        .permalinks
2948        .into_iter()
2949        .map(|(oid, url)| proto::CommitPermalink {
2950            oid: oid.as_bytes().into(),
2951            permalink: url.to_string(),
2952        })
2953        .collect::<Vec<_>>();
2954
2955    proto::BlameBufferResponse {
2956        blame_response: Some(proto::blame_buffer_response::BlameResponse {
2957            entries,
2958            messages,
2959            permalinks,
2960            remote_url: blame.remote_url,
2961        }),
2962    }
2963}
2964
2965fn deserialize_blame_buffer_response(
2966    response: proto::BlameBufferResponse,
2967) -> Option<git::blame::Blame> {
2968    let response = response.blame_response?;
2969    let entries = response
2970        .entries
2971        .into_iter()
2972        .filter_map(|entry| {
2973            Some(git::blame::BlameEntry {
2974                sha: git::Oid::from_bytes(&entry.sha).ok()?,
2975                range: entry.start_line..entry.end_line,
2976                original_line_number: entry.original_line_number,
2977                committer: entry.committer,
2978                committer_time: entry.committer_time,
2979                committer_tz: entry.committer_tz,
2980                committer_mail: entry.committer_mail,
2981                author: entry.author,
2982                author_mail: entry.author_mail,
2983                author_time: entry.author_time,
2984                author_tz: entry.author_tz,
2985                summary: entry.summary,
2986                previous: entry.previous,
2987                filename: entry.filename,
2988            })
2989        })
2990        .collect::<Vec<_>>();
2991
2992    let messages = response
2993        .messages
2994        .into_iter()
2995        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2996        .collect::<HashMap<_, _>>();
2997
2998    let permalinks = response
2999        .permalinks
3000        .into_iter()
3001        .filter_map(|permalink| {
3002            Some((
3003                git::Oid::from_bytes(&permalink.oid).ok()?,
3004                Url::from_str(&permalink.permalink).ok()?,
3005            ))
3006        })
3007        .collect::<HashMap<_, _>>();
3008
3009    Some(Blame {
3010        entries,
3011        permalinks,
3012        messages,
3013        remote_url: response.remote_url,
3014    })
3015}
3016
3017fn get_permalink_in_rust_registry_src(
3018    provider_registry: Arc<GitHostingProviderRegistry>,
3019    path: PathBuf,
3020    selection: Range<u32>,
3021) -> Result<url::Url> {
3022    #[derive(Deserialize)]
3023    struct CargoVcsGit {
3024        sha1: String,
3025    }
3026
3027    #[derive(Deserialize)]
3028    struct CargoVcsInfo {
3029        git: CargoVcsGit,
3030        path_in_vcs: String,
3031    }
3032
3033    #[derive(Deserialize)]
3034    struct CargoPackage {
3035        repository: String,
3036    }
3037
3038    #[derive(Deserialize)]
3039    struct CargoToml {
3040        package: CargoPackage,
3041    }
3042
3043    let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
3044        let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
3045        Some((dir, json))
3046    }) else {
3047        bail!("No .cargo_vcs_info.json found in parent directories")
3048    };
3049    let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
3050    let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
3051    let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
3052    let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
3053        .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
3054    let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
3055    let permalink = provider.build_permalink(
3056        remote,
3057        BuildPermalinkParams {
3058            sha: &cargo_vcs_info.git.sha1,
3059            path: &path.to_string_lossy(),
3060            selection: Some(selection),
3061        },
3062    );
3063    Ok(permalink)
3064}