buffer_store.rs

   1use crate::{
   2    lsp_store::OpenLspBufferHandle,
   3    search::SearchQuery,
   4    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   5    ProjectItem as _, ProjectPath,
   6};
   7use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
   8use anyhow::{anyhow, bail, Context as _, Result};
   9use client::Client;
  10use collections::{hash_map, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
  13use git::{blame::Blame, diff::BufferDiff, repository::RepoPath};
  14use gpui::{
  15    App, AppContext as _, AsyncAppContext, Context, Entity, EventEmitter, Subscription, Task,
  16    WeakEntity,
  17};
  18use http_client::Url;
  19use language::{
  20    proto::{
  21        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  22        split_operations,
  23    },
  24    Buffer, BufferEvent, Capability, DiskState, File as _, Language, LanguageRegistry, Operation,
  25};
  26use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  27use serde::Deserialize;
  28use smol::channel::Receiver;
  29use std::{
  30    io,
  31    ops::Range,
  32    path::{Path, PathBuf},
  33    pin::pin,
  34    str::FromStr as _,
  35    sync::Arc,
  36    time::Instant,
  37};
  38use text::{BufferId, LineEnding, Rope};
  39use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  40use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
  41
  42/// A set of open buffers.
  43pub struct BufferStore {
  44    state: BufferStoreState,
  45    #[allow(clippy::type_complexity)]
  46    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
  47    #[allow(clippy::type_complexity)]
  48    loading_change_sets:
  49        HashMap<BufferId, Shared<Task<Result<Entity<BufferChangeSet>, Arc<anyhow::Error>>>>>,
  50    worktree_store: Entity<WorktreeStore>,
  51    opened_buffers: HashMap<BufferId, OpenBuffer>,
  52    downstream_client: Option<(AnyProtoClient, u64)>,
  53    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  54}
  55
  56#[derive(Hash, Eq, PartialEq, Clone)]
  57struct SharedBuffer {
  58    buffer: Entity<Buffer>,
  59    unstaged_changes: Option<Entity<BufferChangeSet>>,
  60    lsp_handle: Option<OpenLspBufferHandle>,
  61}
  62
  63pub struct BufferChangeSet {
  64    pub buffer_id: BufferId,
  65    pub base_text: Option<language::BufferSnapshot>,
  66    pub language: Option<Arc<Language>>,
  67    pub diff_to_buffer: git::diff::BufferDiff,
  68    pub recalculate_diff_task: Option<Task<Result<()>>>,
  69    pub diff_updated_futures: Vec<oneshot::Sender<()>>,
  70    pub language_registry: Option<Arc<LanguageRegistry>>,
  71}
  72
  73enum BufferStoreState {
  74    Local(LocalBufferStore),
  75    Remote(RemoteBufferStore),
  76}
  77
  78struct RemoteBufferStore {
  79    shared_with_me: HashSet<Entity<Buffer>>,
  80    upstream_client: AnyProtoClient,
  81    project_id: u64,
  82    loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
  83    remote_buffer_listeners:
  84        HashMap<BufferId, Vec<oneshot::Sender<Result<Entity<Buffer>, anyhow::Error>>>>,
  85    worktree_store: Entity<WorktreeStore>,
  86}
  87
  88struct LocalBufferStore {
  89    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  90    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  91    worktree_store: Entity<WorktreeStore>,
  92    _subscription: Subscription,
  93}
  94
  95enum OpenBuffer {
  96    Complete {
  97        buffer: WeakEntity<Buffer>,
  98        unstaged_changes: Option<WeakEntity<BufferChangeSet>>,
  99    },
 100    Operations(Vec<Operation>),
 101}
 102
 103pub enum BufferStoreEvent {
 104    BufferAdded(Entity<Buffer>),
 105    BufferDropped(BufferId),
 106    BufferChangedFilePath {
 107        buffer: Entity<Buffer>,
 108        old_file: Option<Arc<dyn language::File>>,
 109    },
 110}
 111
 112#[derive(Default, Debug)]
 113pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
 114
 115impl EventEmitter<BufferStoreEvent> for BufferStore {}
 116
 117impl RemoteBufferStore {
 118    fn load_staged_text(&self, buffer_id: BufferId, cx: &App) -> Task<Result<Option<String>>> {
 119        let project_id = self.project_id;
 120        let client = self.upstream_client.clone();
 121        cx.background_executor().spawn(async move {
 122            Ok(client
 123                .request(proto::GetStagedText {
 124                    project_id,
 125                    buffer_id: buffer_id.to_proto(),
 126                })
 127                .await?
 128                .staged_text)
 129        })
 130    }
 131    pub fn wait_for_remote_buffer(
 132        &mut self,
 133        id: BufferId,
 134        cx: &mut Context<BufferStore>,
 135    ) -> Task<Result<Entity<Buffer>>> {
 136        let (tx, rx) = oneshot::channel();
 137        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 138
 139        cx.spawn(|this, cx| async move {
 140            if let Some(buffer) = this
 141                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
 142                .ok()
 143                .flatten()
 144            {
 145                return Ok(buffer);
 146            }
 147
 148            cx.background_executor()
 149                .spawn(async move { rx.await? })
 150                .await
 151        })
 152    }
 153
 154    fn save_remote_buffer(
 155        &self,
 156        buffer_handle: Entity<Buffer>,
 157        new_path: Option<proto::ProjectPath>,
 158        cx: &Context<BufferStore>,
 159    ) -> Task<Result<()>> {
 160        let buffer = buffer_handle.read(cx);
 161        let buffer_id = buffer.remote_id().into();
 162        let version = buffer.version();
 163        let rpc = self.upstream_client.clone();
 164        let project_id = self.project_id;
 165        cx.spawn(move |_, mut cx| async move {
 166            let response = rpc
 167                .request(proto::SaveBuffer {
 168                    project_id,
 169                    buffer_id,
 170                    new_path,
 171                    version: serialize_version(&version),
 172                })
 173                .await?;
 174            let version = deserialize_version(&response.version);
 175            let mtime = response.mtime.map(|mtime| mtime.into());
 176
 177            buffer_handle.update(&mut cx, |buffer, cx| {
 178                buffer.did_save(version.clone(), mtime, cx);
 179            })?;
 180
 181            Ok(())
 182        })
 183    }
 184
 185    pub fn handle_create_buffer_for_peer(
 186        &mut self,
 187        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 188        replica_id: u16,
 189        capability: Capability,
 190        cx: &mut Context<BufferStore>,
 191    ) -> Result<Option<Entity<Buffer>>> {
 192        match envelope
 193            .payload
 194            .variant
 195            .ok_or_else(|| anyhow!("missing variant"))?
 196        {
 197            proto::create_buffer_for_peer::Variant::State(mut state) => {
 198                let buffer_id = BufferId::new(state.id)?;
 199
 200                let buffer_result = maybe!({
 201                    let mut buffer_file = None;
 202                    if let Some(file) = state.file.take() {
 203                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 204                        let worktree = self
 205                            .worktree_store
 206                            .read(cx)
 207                            .worktree_for_id(worktree_id, cx)
 208                            .ok_or_else(|| {
 209                                anyhow!("no worktree found for id {}", file.worktree_id)
 210                            })?;
 211                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 212                            as Arc<dyn language::File>);
 213                    }
 214                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 215                });
 216
 217                match buffer_result {
 218                    Ok(buffer) => {
 219                        let buffer = cx.new(|_| buffer);
 220                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 221                    }
 222                    Err(error) => {
 223                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 224                            for listener in listeners {
 225                                listener.send(Err(anyhow!(error.cloned()))).ok();
 226                            }
 227                        }
 228                    }
 229                }
 230            }
 231            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 232                let buffer_id = BufferId::new(chunk.buffer_id)?;
 233                let buffer = self
 234                    .loading_remote_buffers_by_id
 235                    .get(&buffer_id)
 236                    .cloned()
 237                    .ok_or_else(|| {
 238                        anyhow!(
 239                            "received chunk for buffer {} without initial state",
 240                            chunk.buffer_id
 241                        )
 242                    })?;
 243
 244                let result = maybe!({
 245                    let operations = chunk
 246                        .operations
 247                        .into_iter()
 248                        .map(language::proto::deserialize_operation)
 249                        .collect::<Result<Vec<_>>>()?;
 250                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 251                    anyhow::Ok(())
 252                });
 253
 254                if let Err(error) = result {
 255                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 256                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 257                        for listener in listeners {
 258                            listener.send(Err(error.cloned())).ok();
 259                        }
 260                    }
 261                } else if chunk.is_last {
 262                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 263                    if self.upstream_client.is_via_collab() {
 264                        // retain buffers sent by peers to avoid races.
 265                        self.shared_with_me.insert(buffer.clone());
 266                    }
 267
 268                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 269                        for sender in senders {
 270                            sender.send(Ok(buffer.clone())).ok();
 271                        }
 272                    }
 273                    return Ok(Some(buffer));
 274                }
 275            }
 276        }
 277        return Ok(None);
 278    }
 279
 280    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 281        self.loading_remote_buffers_by_id
 282            .keys()
 283            .copied()
 284            .collect::<Vec<_>>()
 285    }
 286
 287    pub fn deserialize_project_transaction(
 288        &self,
 289        message: proto::ProjectTransaction,
 290        push_to_history: bool,
 291        cx: &mut Context<BufferStore>,
 292    ) -> Task<Result<ProjectTransaction>> {
 293        cx.spawn(|this, mut cx| async move {
 294            let mut project_transaction = ProjectTransaction::default();
 295            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 296            {
 297                let buffer_id = BufferId::new(buffer_id)?;
 298                let buffer = this
 299                    .update(&mut cx, |this, cx| {
 300                        this.wait_for_remote_buffer(buffer_id, cx)
 301                    })?
 302                    .await?;
 303                let transaction = language::proto::deserialize_transaction(transaction)?;
 304                project_transaction.0.insert(buffer, transaction);
 305            }
 306
 307            for (buffer, transaction) in &project_transaction.0 {
 308                buffer
 309                    .update(&mut cx, |buffer, _| {
 310                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 311                    })?
 312                    .await?;
 313
 314                if push_to_history {
 315                    buffer.update(&mut cx, |buffer, _| {
 316                        buffer.push_transaction(transaction.clone(), Instant::now());
 317                    })?;
 318                }
 319            }
 320
 321            Ok(project_transaction)
 322        })
 323    }
 324
 325    fn open_buffer(
 326        &self,
 327        path: Arc<Path>,
 328        worktree: Entity<Worktree>,
 329        cx: &mut Context<BufferStore>,
 330    ) -> Task<Result<Entity<Buffer>>> {
 331        let worktree_id = worktree.read(cx).id().to_proto();
 332        let project_id = self.project_id;
 333        let client = self.upstream_client.clone();
 334        let path_string = path.clone().to_string_lossy().to_string();
 335        cx.spawn(move |this, mut cx| async move {
 336            let response = client
 337                .request(proto::OpenBufferByPath {
 338                    project_id,
 339                    worktree_id,
 340                    path: path_string,
 341                })
 342                .await?;
 343            let buffer_id = BufferId::new(response.buffer_id)?;
 344
 345            let buffer = this
 346                .update(&mut cx, {
 347                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 348                })?
 349                .await?;
 350
 351            Ok(buffer)
 352        })
 353    }
 354
 355    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
 356        let create = self.upstream_client.request(proto::OpenNewBuffer {
 357            project_id: self.project_id,
 358        });
 359        cx.spawn(|this, mut cx| async move {
 360            let response = create.await?;
 361            let buffer_id = BufferId::new(response.buffer_id)?;
 362
 363            this.update(&mut cx, |this, cx| {
 364                this.wait_for_remote_buffer(buffer_id, cx)
 365            })?
 366            .await
 367        })
 368    }
 369
 370    fn reload_buffers(
 371        &self,
 372        buffers: HashSet<Entity<Buffer>>,
 373        push_to_history: bool,
 374        cx: &mut Context<BufferStore>,
 375    ) -> Task<Result<ProjectTransaction>> {
 376        let request = self.upstream_client.request(proto::ReloadBuffers {
 377            project_id: self.project_id,
 378            buffer_ids: buffers
 379                .iter()
 380                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 381                .collect(),
 382        });
 383
 384        cx.spawn(|this, mut cx| async move {
 385            let response = request
 386                .await?
 387                .transaction
 388                .ok_or_else(|| anyhow!("missing transaction"))?;
 389            this.update(&mut cx, |this, cx| {
 390                this.deserialize_project_transaction(response, push_to_history, cx)
 391            })?
 392            .await
 393        })
 394    }
 395}
 396
 397impl LocalBufferStore {
 398    fn load_staged_text(&self, buffer: &Entity<Buffer>, cx: &App) -> Task<Result<Option<String>>> {
 399        let Some(file) = buffer.read(cx).file() else {
 400            return Task::ready(Ok(None));
 401        };
 402        let worktree_id = file.worktree_id(cx);
 403        let path = file.path().clone();
 404        let Some(worktree) = self
 405            .worktree_store
 406            .read(cx)
 407            .worktree_for_id(worktree_id, cx)
 408        else {
 409            return Task::ready(Err(anyhow!("no such worktree")));
 410        };
 411
 412        worktree.read(cx).load_staged_file(path.as_ref(), cx)
 413    }
 414
 415    fn save_local_buffer(
 416        &self,
 417        buffer_handle: Entity<Buffer>,
 418        worktree: Entity<Worktree>,
 419        path: Arc<Path>,
 420        mut has_changed_file: bool,
 421        cx: &mut Context<BufferStore>,
 422    ) -> Task<Result<()>> {
 423        let buffer = buffer_handle.read(cx);
 424
 425        let text = buffer.as_rope().clone();
 426        let line_ending = buffer.line_ending();
 427        let version = buffer.version();
 428        let buffer_id = buffer.remote_id();
 429        if buffer
 430            .file()
 431            .is_some_and(|file| file.disk_state() == DiskState::New)
 432        {
 433            has_changed_file = true;
 434        }
 435
 436        let save = worktree.update(cx, |worktree, cx| {
 437            worktree.write_file(path.as_ref(), text, line_ending, cx)
 438        });
 439
 440        cx.spawn(move |this, mut cx| async move {
 441            let new_file = save.await?;
 442            let mtime = new_file.disk_state().mtime();
 443            this.update(&mut cx, |this, cx| {
 444                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 445                    if has_changed_file {
 446                        downstream_client
 447                            .send(proto::UpdateBufferFile {
 448                                project_id,
 449                                buffer_id: buffer_id.to_proto(),
 450                                file: Some(language::File::to_proto(&*new_file, cx)),
 451                            })
 452                            .log_err();
 453                    }
 454                    downstream_client
 455                        .send(proto::BufferSaved {
 456                            project_id,
 457                            buffer_id: buffer_id.to_proto(),
 458                            version: serialize_version(&version),
 459                            mtime: mtime.map(|time| time.into()),
 460                        })
 461                        .log_err();
 462                }
 463            })?;
 464            buffer_handle.update(&mut cx, |buffer, cx| {
 465                if has_changed_file {
 466                    buffer.file_updated(new_file, cx);
 467                }
 468                buffer.did_save(version.clone(), mtime, cx);
 469            })
 470        })
 471    }
 472
 473    fn subscribe_to_worktree(
 474        &mut self,
 475        worktree: &Entity<Worktree>,
 476        cx: &mut Context<BufferStore>,
 477    ) {
 478        cx.subscribe(worktree, |this, worktree, event, cx| {
 479            if worktree.read(cx).is_local() {
 480                match event {
 481                    worktree::Event::UpdatedEntries(changes) => {
 482                        Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 483                    }
 484                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 485                        Self::local_worktree_git_repos_changed(
 486                            this,
 487                            worktree.clone(),
 488                            updated_repos,
 489                            cx,
 490                        )
 491                    }
 492                    _ => {}
 493                }
 494            }
 495        })
 496        .detach();
 497    }
 498
 499    fn local_worktree_entries_changed(
 500        this: &mut BufferStore,
 501        worktree_handle: &Entity<Worktree>,
 502        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 503        cx: &mut Context<BufferStore>,
 504    ) {
 505        let snapshot = worktree_handle.read(cx).snapshot();
 506        for (path, entry_id, _) in changes {
 507            Self::local_worktree_entry_changed(
 508                this,
 509                *entry_id,
 510                path,
 511                worktree_handle,
 512                &snapshot,
 513                cx,
 514            );
 515        }
 516    }
 517
 518    fn local_worktree_git_repos_changed(
 519        this: &mut BufferStore,
 520        worktree_handle: Entity<Worktree>,
 521        changed_repos: &UpdatedGitRepositoriesSet,
 522        cx: &mut Context<BufferStore>,
 523    ) {
 524        debug_assert!(worktree_handle.read(cx).is_local());
 525
 526        let buffer_change_sets = this
 527            .opened_buffers
 528            .values()
 529            .filter_map(|buffer| {
 530                if let OpenBuffer::Complete {
 531                    buffer,
 532                    unstaged_changes,
 533                } = buffer
 534                {
 535                    let buffer = buffer.upgrade()?.read(cx);
 536                    let file = File::from_dyn(buffer.file())?;
 537                    if file.worktree != worktree_handle {
 538                        return None;
 539                    }
 540                    changed_repos
 541                        .iter()
 542                        .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 543                    let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
 544                    let snapshot = buffer.text_snapshot();
 545                    Some((unstaged_changes, snapshot, file.path.clone()))
 546                } else {
 547                    None
 548                }
 549            })
 550            .collect::<Vec<_>>();
 551
 552        if buffer_change_sets.is_empty() {
 553            return;
 554        }
 555
 556        cx.spawn(move |this, mut cx| async move {
 557            let snapshot =
 558                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 559            let diff_bases_by_buffer = cx
 560                .background_executor()
 561                .spawn(async move {
 562                    buffer_change_sets
 563                        .into_iter()
 564                        .filter_map(|(change_set, buffer_snapshot, path)| {
 565                            let local_repo = snapshot.local_repo_for_path(&path)?;
 566                            let relative_path = local_repo.relativize(&path).ok()?;
 567                            let base_text = local_repo.repo().load_index_text(&relative_path);
 568                            Some((change_set, buffer_snapshot, base_text))
 569                        })
 570                        .collect::<Vec<_>>()
 571                })
 572                .await;
 573
 574            this.update(&mut cx, |this, cx| {
 575                for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
 576                    change_set.update(cx, |change_set, cx| {
 577                        if let Some(staged_text) = staged_text.clone() {
 578                            let _ =
 579                                change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
 580                        } else {
 581                            change_set.unset_base_text(buffer_snapshot.clone(), cx);
 582                        }
 583                    });
 584
 585                    if let Some((client, project_id)) = &this.downstream_client.clone() {
 586                        client
 587                            .send(proto::UpdateDiffBase {
 588                                project_id: *project_id,
 589                                buffer_id: buffer_snapshot.remote_id().to_proto(),
 590                                staged_text,
 591                            })
 592                            .log_err();
 593                    }
 594                }
 595            })
 596        })
 597        .detach_and_log_err(cx);
 598    }
 599
 600    fn local_worktree_entry_changed(
 601        this: &mut BufferStore,
 602        entry_id: ProjectEntryId,
 603        path: &Arc<Path>,
 604        worktree: &Entity<worktree::Worktree>,
 605        snapshot: &worktree::Snapshot,
 606        cx: &mut Context<BufferStore>,
 607    ) -> Option<()> {
 608        let project_path = ProjectPath {
 609            worktree_id: snapshot.id(),
 610            path: path.clone(),
 611        };
 612
 613        let buffer_id = {
 614            let local = this.as_local_mut()?;
 615            match local.local_buffer_ids_by_entry_id.get(&entry_id) {
 616                Some(&buffer_id) => buffer_id,
 617                None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
 618            }
 619        };
 620
 621        let buffer = if let Some(buffer) = this.get(buffer_id) {
 622            Some(buffer)
 623        } else {
 624            this.opened_buffers.remove(&buffer_id);
 625            None
 626        };
 627
 628        let buffer = if let Some(buffer) = buffer {
 629            buffer
 630        } else {
 631            let this = this.as_local_mut()?;
 632            this.local_buffer_ids_by_path.remove(&project_path);
 633            this.local_buffer_ids_by_entry_id.remove(&entry_id);
 634            return None;
 635        };
 636
 637        let events = buffer.update(cx, |buffer, cx| {
 638            let local = this.as_local_mut()?;
 639            let file = buffer.file()?;
 640            let old_file = File::from_dyn(Some(file))?;
 641            if old_file.worktree != *worktree {
 642                return None;
 643            }
 644
 645            let snapshot_entry = old_file
 646                .entry_id
 647                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 648                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
 649
 650            let new_file = if let Some(entry) = snapshot_entry {
 651                File {
 652                    disk_state: match entry.mtime {
 653                        Some(mtime) => DiskState::Present { mtime },
 654                        None => old_file.disk_state,
 655                    },
 656                    is_local: true,
 657                    entry_id: Some(entry.id),
 658                    path: entry.path.clone(),
 659                    worktree: worktree.clone(),
 660                    is_private: entry.is_private,
 661                }
 662            } else {
 663                File {
 664                    disk_state: DiskState::Deleted,
 665                    is_local: true,
 666                    entry_id: old_file.entry_id,
 667                    path: old_file.path.clone(),
 668                    worktree: worktree.clone(),
 669                    is_private: old_file.is_private,
 670                }
 671            };
 672
 673            if new_file == *old_file {
 674                return None;
 675            }
 676
 677            let mut events = Vec::new();
 678            if new_file.path != old_file.path {
 679                local.local_buffer_ids_by_path.remove(&ProjectPath {
 680                    path: old_file.path.clone(),
 681                    worktree_id: old_file.worktree_id(cx),
 682                });
 683                local.local_buffer_ids_by_path.insert(
 684                    ProjectPath {
 685                        worktree_id: new_file.worktree_id(cx),
 686                        path: new_file.path.clone(),
 687                    },
 688                    buffer_id,
 689                );
 690                events.push(BufferStoreEvent::BufferChangedFilePath {
 691                    buffer: cx.model(),
 692                    old_file: buffer.file().cloned(),
 693                });
 694            }
 695
 696            if new_file.entry_id != old_file.entry_id {
 697                if let Some(entry_id) = old_file.entry_id {
 698                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
 699                }
 700                if let Some(entry_id) = new_file.entry_id {
 701                    local
 702                        .local_buffer_ids_by_entry_id
 703                        .insert(entry_id, buffer_id);
 704                }
 705            }
 706
 707            if let Some((client, project_id)) = &this.downstream_client {
 708                client
 709                    .send(proto::UpdateBufferFile {
 710                        project_id: *project_id,
 711                        buffer_id: buffer_id.to_proto(),
 712                        file: Some(new_file.to_proto(cx)),
 713                    })
 714                    .ok();
 715            }
 716
 717            buffer.file_updated(Arc::new(new_file), cx);
 718            Some(events)
 719        })?;
 720
 721        for event in events {
 722            cx.emit(event);
 723        }
 724
 725        None
 726    }
 727
 728    fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
 729        let file = File::from_dyn(buffer.read(cx).file())?;
 730
 731        let remote_id = buffer.read(cx).remote_id();
 732        if let Some(entry_id) = file.entry_id {
 733            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 734                Some(_) => {
 735                    return None;
 736                }
 737                None => {
 738                    self.local_buffer_ids_by_entry_id
 739                        .insert(entry_id, remote_id);
 740                }
 741            }
 742        };
 743        self.local_buffer_ids_by_path.insert(
 744            ProjectPath {
 745                worktree_id: file.worktree_id(cx),
 746                path: file.path.clone(),
 747            },
 748            remote_id,
 749        );
 750
 751        Some(())
 752    }
 753
 754    fn save_buffer(
 755        &self,
 756        buffer: Entity<Buffer>,
 757        cx: &mut Context<BufferStore>,
 758    ) -> Task<Result<()>> {
 759        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 760            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 761        };
 762        let worktree = file.worktree.clone();
 763        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 764    }
 765
 766    fn save_buffer_as(
 767        &self,
 768        buffer: Entity<Buffer>,
 769        path: ProjectPath,
 770        cx: &mut Context<BufferStore>,
 771    ) -> Task<Result<()>> {
 772        let Some(worktree) = self
 773            .worktree_store
 774            .read(cx)
 775            .worktree_for_id(path.worktree_id, cx)
 776        else {
 777            return Task::ready(Err(anyhow!("no such worktree")));
 778        };
 779        self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
 780    }
 781
 782    fn open_buffer(
 783        &self,
 784        path: Arc<Path>,
 785        worktree: Entity<Worktree>,
 786        cx: &mut Context<BufferStore>,
 787    ) -> Task<Result<Entity<Buffer>>> {
 788        let load_buffer = worktree.update(cx, |worktree, cx| {
 789            let load_file = worktree.load_file(path.as_ref(), cx);
 790            let reservation = cx.reserve_model();
 791            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 792            cx.spawn(move |_, mut cx| async move {
 793                let loaded = load_file.await?;
 794                let text_buffer = cx
 795                    .background_executor()
 796                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 797                    .await;
 798                cx.insert_model(reservation, |_| {
 799                    Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
 800                })
 801            })
 802        });
 803
 804        cx.spawn(move |this, mut cx| async move {
 805            let buffer = match load_buffer.await {
 806                Ok(buffer) => Ok(buffer),
 807                Err(error) if is_not_found_error(&error) => cx.new(|cx| {
 808                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 809                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 810                    Buffer::build(
 811                        text_buffer,
 812                        Some(Arc::new(File {
 813                            worktree,
 814                            path,
 815                            disk_state: DiskState::New,
 816                            entry_id: None,
 817                            is_local: true,
 818                            is_private: false,
 819                        })),
 820                        Capability::ReadWrite,
 821                    )
 822                }),
 823                Err(e) => Err(e),
 824            }?;
 825            this.update(&mut cx, |this, cx| {
 826                this.add_buffer(buffer.clone(), cx)?;
 827                let buffer_id = buffer.read(cx).remote_id();
 828                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 829                    let this = this.as_local_mut().unwrap();
 830                    this.local_buffer_ids_by_path.insert(
 831                        ProjectPath {
 832                            worktree_id: file.worktree_id(cx),
 833                            path: file.path.clone(),
 834                        },
 835                        buffer_id,
 836                    );
 837
 838                    if let Some(entry_id) = file.entry_id {
 839                        this.local_buffer_ids_by_entry_id
 840                            .insert(entry_id, buffer_id);
 841                    }
 842                }
 843
 844                anyhow::Ok(())
 845            })??;
 846
 847            Ok(buffer)
 848        })
 849    }
 850
 851    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
 852        cx.spawn(|buffer_store, mut cx| async move {
 853            let buffer =
 854                cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
 855            buffer_store.update(&mut cx, |buffer_store, cx| {
 856                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 857            })?;
 858            Ok(buffer)
 859        })
 860    }
 861
 862    fn reload_buffers(
 863        &self,
 864        buffers: HashSet<Entity<Buffer>>,
 865        push_to_history: bool,
 866        cx: &mut Context<BufferStore>,
 867    ) -> Task<Result<ProjectTransaction>> {
 868        cx.spawn(move |_, mut cx| async move {
 869            let mut project_transaction = ProjectTransaction::default();
 870            for buffer in buffers {
 871                let transaction = buffer
 872                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
 873                    .await?;
 874                buffer.update(&mut cx, |buffer, cx| {
 875                    if let Some(transaction) = transaction {
 876                        if !push_to_history {
 877                            buffer.forget_transaction(transaction.id);
 878                        }
 879                        project_transaction.0.insert(cx.model(), transaction);
 880                    }
 881                })?;
 882            }
 883
 884            Ok(project_transaction)
 885        })
 886    }
 887}
 888
 889impl BufferStore {
 890    pub fn init(client: &AnyProtoClient) {
 891        client.add_model_message_handler(Self::handle_buffer_reloaded);
 892        client.add_model_message_handler(Self::handle_buffer_saved);
 893        client.add_model_message_handler(Self::handle_update_buffer_file);
 894        client.add_model_request_handler(Self::handle_save_buffer);
 895        client.add_model_request_handler(Self::handle_blame_buffer);
 896        client.add_model_request_handler(Self::handle_reload_buffers);
 897        client.add_model_request_handler(Self::handle_get_permalink_to_line);
 898        client.add_model_request_handler(Self::handle_get_staged_text);
 899        client.add_model_message_handler(Self::handle_update_diff_base);
 900    }
 901
 902    /// Creates a buffer store, optionally retaining its buffers.
 903    pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
 904        Self {
 905            state: BufferStoreState::Local(LocalBufferStore {
 906                local_buffer_ids_by_path: Default::default(),
 907                local_buffer_ids_by_entry_id: Default::default(),
 908                worktree_store: worktree_store.clone(),
 909                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
 910                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 911                        let this = this.as_local_mut().unwrap();
 912                        this.subscribe_to_worktree(worktree, cx);
 913                    }
 914                }),
 915            }),
 916            downstream_client: None,
 917            opened_buffers: Default::default(),
 918            shared_buffers: Default::default(),
 919            loading_buffers: Default::default(),
 920            loading_change_sets: Default::default(),
 921            worktree_store,
 922        }
 923    }
 924
 925    pub fn remote(
 926        worktree_store: Entity<WorktreeStore>,
 927        upstream_client: AnyProtoClient,
 928        remote_id: u64,
 929        _cx: &mut Context<Self>,
 930    ) -> Self {
 931        Self {
 932            state: BufferStoreState::Remote(RemoteBufferStore {
 933                shared_with_me: Default::default(),
 934                loading_remote_buffers_by_id: Default::default(),
 935                remote_buffer_listeners: Default::default(),
 936                project_id: remote_id,
 937                upstream_client,
 938                worktree_store: worktree_store.clone(),
 939            }),
 940            downstream_client: None,
 941            opened_buffers: Default::default(),
 942            loading_buffers: Default::default(),
 943            loading_change_sets: Default::default(),
 944            shared_buffers: Default::default(),
 945            worktree_store,
 946        }
 947    }
 948
 949    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
 950        match &mut self.state {
 951            BufferStoreState::Local(state) => Some(state),
 952            _ => None,
 953        }
 954    }
 955
 956    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
 957        match &mut self.state {
 958            BufferStoreState::Remote(state) => Some(state),
 959            _ => None,
 960        }
 961    }
 962
 963    fn as_remote(&self) -> Option<&RemoteBufferStore> {
 964        match &self.state {
 965            BufferStoreState::Remote(state) => Some(state),
 966            _ => None,
 967        }
 968    }
 969
 970    pub fn open_buffer(
 971        &mut self,
 972        project_path: ProjectPath,
 973        cx: &mut Context<Self>,
 974    ) -> Task<Result<Entity<Buffer>>> {
 975        if let Some(buffer) = self.get_by_path(&project_path, cx) {
 976            return Task::ready(Ok(buffer));
 977        }
 978
 979        let task = match self.loading_buffers.entry(project_path.clone()) {
 980            hash_map::Entry::Occupied(e) => e.get().clone(),
 981            hash_map::Entry::Vacant(entry) => {
 982                let path = project_path.path.clone();
 983                let Some(worktree) = self
 984                    .worktree_store
 985                    .read(cx)
 986                    .worktree_for_id(project_path.worktree_id, cx)
 987                else {
 988                    return Task::ready(Err(anyhow!("no such worktree")));
 989                };
 990                let load_buffer = match &self.state {
 991                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
 992                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
 993                };
 994
 995                entry
 996                    .insert(
 997                        cx.spawn(move |this, mut cx| async move {
 998                            let load_result = load_buffer.await;
 999                            this.update(&mut cx, |this, _cx| {
1000                                // Record the fact that the buffer is no longer loading.
1001                                this.loading_buffers.remove(&project_path);
1002                            })
1003                            .ok();
1004                            load_result.map_err(Arc::new)
1005                        })
1006                        .shared(),
1007                    )
1008                    .clone()
1009            }
1010        };
1011
1012        cx.background_executor()
1013            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1014    }
1015
1016    pub fn open_unstaged_changes(
1017        &mut self,
1018        buffer: Entity<Buffer>,
1019        cx: &mut Context<Self>,
1020    ) -> Task<Result<Entity<BufferChangeSet>>> {
1021        let buffer_id = buffer.read(cx).remote_id();
1022        if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1023            return Task::ready(Ok(change_set));
1024        }
1025
1026        let task = match self.loading_change_sets.entry(buffer_id) {
1027            hash_map::Entry::Occupied(e) => e.get().clone(),
1028            hash_map::Entry::Vacant(entry) => {
1029                let load = match &self.state {
1030                    BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1031                    BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1032                };
1033
1034                entry
1035                    .insert(
1036                        cx.spawn(move |this, cx| async move {
1037                            Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1038                                .await
1039                                .map_err(Arc::new)
1040                        })
1041                        .shared(),
1042                    )
1043                    .clone()
1044            }
1045        };
1046
1047        cx.background_executor()
1048            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1049    }
1050
1051    #[cfg(any(test, feature = "test-support"))]
1052    pub fn set_change_set(&mut self, buffer_id: BufferId, change_set: Entity<BufferChangeSet>) {
1053        self.loading_change_sets
1054            .insert(buffer_id, Task::ready(Ok(change_set)).shared());
1055    }
1056
1057    pub async fn open_unstaged_changes_internal(
1058        this: WeakEntity<Self>,
1059        text: Result<Option<String>>,
1060        buffer: Entity<Buffer>,
1061        mut cx: AsyncAppContext,
1062    ) -> Result<Entity<BufferChangeSet>> {
1063        let text = match text {
1064            Err(e) => {
1065                this.update(&mut cx, |this, cx| {
1066                    let buffer_id = buffer.read(cx).remote_id();
1067                    this.loading_change_sets.remove(&buffer_id);
1068                })?;
1069                return Err(e);
1070            }
1071            Ok(text) => text,
1072        };
1073
1074        let change_set = cx.new(|cx| BufferChangeSet::new(&buffer, cx)).unwrap();
1075
1076        if let Some(text) = text {
1077            change_set
1078                .update(&mut cx, |change_set, cx| {
1079                    let snapshot = buffer.read(cx).text_snapshot();
1080                    change_set.set_base_text(text, snapshot, cx)
1081                })?
1082                .await
1083                .ok();
1084        }
1085
1086        this.update(&mut cx, |this, cx| {
1087            let buffer_id = buffer.read(cx).remote_id();
1088            this.loading_change_sets.remove(&buffer_id);
1089            if let Some(OpenBuffer::Complete {
1090                unstaged_changes, ..
1091            }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1092            {
1093                *unstaged_changes = Some(change_set.downgrade());
1094            }
1095        })?;
1096
1097        Ok(change_set)
1098    }
1099
1100    pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
1101        match &self.state {
1102            BufferStoreState::Local(this) => this.create_buffer(cx),
1103            BufferStoreState::Remote(this) => this.create_buffer(cx),
1104        }
1105    }
1106
1107    pub fn save_buffer(
1108        &mut self,
1109        buffer: Entity<Buffer>,
1110        cx: &mut Context<Self>,
1111    ) -> Task<Result<()>> {
1112        match &mut self.state {
1113            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1114            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1115        }
1116    }
1117
1118    pub fn save_buffer_as(
1119        &mut self,
1120        buffer: Entity<Buffer>,
1121        path: ProjectPath,
1122        cx: &mut Context<Self>,
1123    ) -> Task<Result<()>> {
1124        let old_file = buffer.read(cx).file().cloned();
1125        let task = match &self.state {
1126            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1127            BufferStoreState::Remote(this) => {
1128                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1129            }
1130        };
1131        cx.spawn(|this, mut cx| async move {
1132            task.await?;
1133            this.update(&mut cx, |_, cx| {
1134                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1135            })
1136        })
1137    }
1138
1139    pub fn blame_buffer(
1140        &self,
1141        buffer: &Entity<Buffer>,
1142        version: Option<clock::Global>,
1143        cx: &App,
1144    ) -> Task<Result<Option<Blame>>> {
1145        let buffer = buffer.read(cx);
1146        let Some(file) = File::from_dyn(buffer.file()) else {
1147            return Task::ready(Err(anyhow!("buffer has no file")));
1148        };
1149
1150        match file.worktree.clone().read(cx) {
1151            Worktree::Local(worktree) => {
1152                let worktree = worktree.snapshot();
1153                let blame_params = maybe!({
1154                    let local_repo = match worktree.local_repo_for_path(&file.path) {
1155                        Some(repo_for_path) => repo_for_path,
1156                        None => return Ok(None),
1157                    };
1158
1159                    let relative_path = local_repo
1160                        .relativize(&file.path)
1161                        .context("failed to relativize buffer path")?;
1162
1163                    let repo = local_repo.repo().clone();
1164
1165                    let content = match version {
1166                        Some(version) => buffer.rope_for_version(&version).clone(),
1167                        None => buffer.as_rope().clone(),
1168                    };
1169
1170                    anyhow::Ok(Some((repo, relative_path, content)))
1171                });
1172
1173                cx.background_executor().spawn(async move {
1174                    let Some((repo, relative_path, content)) = blame_params? else {
1175                        return Ok(None);
1176                    };
1177                    repo.blame(&relative_path, content)
1178                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1179                        .map(Some)
1180                })
1181            }
1182            Worktree::Remote(worktree) => {
1183                let buffer_id = buffer.remote_id();
1184                let version = buffer.version();
1185                let project_id = worktree.project_id();
1186                let client = worktree.client();
1187                cx.spawn(|_| async move {
1188                    let response = client
1189                        .request(proto::BlameBuffer {
1190                            project_id,
1191                            buffer_id: buffer_id.into(),
1192                            version: serialize_version(&version),
1193                        })
1194                        .await?;
1195                    Ok(deserialize_blame_buffer_response(response))
1196                })
1197            }
1198        }
1199    }
1200
1201    pub fn get_permalink_to_line(
1202        &self,
1203        buffer: &Entity<Buffer>,
1204        selection: Range<u32>,
1205        cx: &App,
1206    ) -> Task<Result<url::Url>> {
1207        let buffer = buffer.read(cx);
1208        let Some(file) = File::from_dyn(buffer.file()) else {
1209            return Task::ready(Err(anyhow!("buffer has no file")));
1210        };
1211
1212        match file.worktree.read(cx) {
1213            Worktree::Local(worktree) => {
1214                let worktree_path = worktree.abs_path().clone();
1215                let Some((repo_entry, repo)) =
1216                    worktree.repository_for_path(file.path()).and_then(|entry| {
1217                        let repo = worktree.get_local_repo(&entry)?.repo().clone();
1218                        Some((entry, repo))
1219                    })
1220                else {
1221                    // If we're not in a Git repo, check whether this is a Rust source
1222                    // file in the Cargo registry (presumably opened with go-to-definition
1223                    // from a normal Rust file). If so, we can put together a permalink
1224                    // using crate metadata.
1225                    if !buffer
1226                        .language()
1227                        .is_some_and(|lang| lang.name() == "Rust".into())
1228                    {
1229                        return Task::ready(Err(anyhow!("no permalink available")));
1230                    }
1231                    let file_path = worktree_path.join(file.path());
1232                    return cx.spawn(|cx| async move {
1233                        let provider_registry =
1234                            cx.update(GitHostingProviderRegistry::default_global)?;
1235                        get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1236                            .map_err(|_| anyhow!("no permalink available"))
1237                    });
1238                };
1239
1240                let path = match repo_entry.relativize(file.path()) {
1241                    Ok(RepoPath(path)) => path,
1242                    Err(e) => return Task::ready(Err(e)),
1243                };
1244
1245                cx.spawn(|cx| async move {
1246                    const REMOTE_NAME: &str = "origin";
1247                    let origin_url = repo
1248                        .remote_url(REMOTE_NAME)
1249                        .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1250
1251                    let sha = repo
1252                        .head_sha()
1253                        .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1254
1255                    let provider_registry =
1256                        cx.update(GitHostingProviderRegistry::default_global)?;
1257
1258                    let (provider, remote) =
1259                        parse_git_remote_url(provider_registry, &origin_url)
1260                            .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1261
1262                    let path = path
1263                        .to_str()
1264                        .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1265
1266                    Ok(provider.build_permalink(
1267                        remote,
1268                        BuildPermalinkParams {
1269                            sha: &sha,
1270                            path,
1271                            selection: Some(selection),
1272                        },
1273                    ))
1274                })
1275            }
1276            Worktree::Remote(worktree) => {
1277                let buffer_id = buffer.remote_id();
1278                let project_id = worktree.project_id();
1279                let client = worktree.client();
1280                cx.spawn(|_| async move {
1281                    let response = client
1282                        .request(proto::GetPermalinkToLine {
1283                            project_id,
1284                            buffer_id: buffer_id.into(),
1285                            selection: Some(proto::Range {
1286                                start: selection.start as u64,
1287                                end: selection.end as u64,
1288                            }),
1289                        })
1290                        .await?;
1291
1292                    url::Url::parse(&response.permalink).context("failed to parse permalink")
1293                })
1294            }
1295        }
1296    }
1297
1298    fn add_buffer(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
1299        let remote_id = buffer.read(cx).remote_id();
1300        let is_remote = buffer.read(cx).replica_id() != 0;
1301        let open_buffer = OpenBuffer::Complete {
1302            buffer: buffer.downgrade(),
1303            unstaged_changes: None,
1304        };
1305
1306        let handle = cx.model().downgrade();
1307        buffer.update(cx, move |_, cx| {
1308            cx.on_release(move |buffer, cx| {
1309                handle
1310                    .update(cx, |_, cx| {
1311                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1312                    })
1313                    .ok();
1314            })
1315            .detach()
1316        });
1317
1318        match self.opened_buffers.entry(remote_id) {
1319            hash_map::Entry::Vacant(entry) => {
1320                entry.insert(open_buffer);
1321            }
1322            hash_map::Entry::Occupied(mut entry) => {
1323                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1324                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1325                } else if entry.get().upgrade().is_some() {
1326                    if is_remote {
1327                        return Ok(());
1328                    } else {
1329                        debug_panic!("buffer {} was already registered", remote_id);
1330                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1331                    }
1332                }
1333                entry.insert(open_buffer);
1334            }
1335        }
1336
1337        cx.subscribe(&buffer, Self::on_buffer_event).detach();
1338        cx.emit(BufferStoreEvent::BufferAdded(buffer));
1339        Ok(())
1340    }
1341
1342    pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1343        self.opened_buffers
1344            .values()
1345            .filter_map(|buffer| buffer.upgrade())
1346    }
1347
1348    pub fn loading_buffers(
1349        &self,
1350    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1351        self.loading_buffers.iter().map(|(path, task)| {
1352            let task = task.clone();
1353            (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1354        })
1355    }
1356
1357    pub fn get_by_path(&self, path: &ProjectPath, cx: &App) -> Option<Entity<Buffer>> {
1358        self.buffers().find_map(|buffer| {
1359            let file = File::from_dyn(buffer.read(cx).file())?;
1360            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1361                Some(buffer)
1362            } else {
1363                None
1364            }
1365        })
1366    }
1367
1368    pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1369        self.opened_buffers.get(&buffer_id)?.upgrade()
1370    }
1371
1372    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1373        self.get(buffer_id)
1374            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1375    }
1376
1377    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1378        self.get(buffer_id).or_else(|| {
1379            self.as_remote()
1380                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1381        })
1382    }
1383
1384    pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Entity<BufferChangeSet>> {
1385        if let OpenBuffer::Complete {
1386            unstaged_changes, ..
1387        } = self.opened_buffers.get(&buffer_id)?
1388        {
1389            unstaged_changes.as_ref()?.upgrade()
1390        } else {
1391            None
1392        }
1393    }
1394
1395    pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1396        let buffers = self
1397            .buffers()
1398            .map(|buffer| {
1399                let buffer = buffer.read(cx);
1400                proto::BufferVersion {
1401                    id: buffer.remote_id().into(),
1402                    version: language::proto::serialize_version(&buffer.version),
1403                }
1404            })
1405            .collect();
1406        let incomplete_buffer_ids = self
1407            .as_remote()
1408            .map(|remote| remote.incomplete_buffer_ids())
1409            .unwrap_or_default();
1410        (buffers, incomplete_buffer_ids)
1411    }
1412
1413    pub fn disconnected_from_host(&mut self, cx: &mut App) {
1414        for open_buffer in self.opened_buffers.values_mut() {
1415            if let Some(buffer) = open_buffer.upgrade() {
1416                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1417            }
1418        }
1419
1420        for buffer in self.buffers() {
1421            buffer.update(cx, |buffer, cx| {
1422                buffer.set_capability(Capability::ReadOnly, cx)
1423            });
1424        }
1425
1426        if let Some(remote) = self.as_remote_mut() {
1427            // Wake up all futures currently waiting on a buffer to get opened,
1428            // to give them a chance to fail now that we've disconnected.
1429            remote.remote_buffer_listeners.clear()
1430        }
1431    }
1432
1433    pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1434        self.downstream_client = Some((downstream_client, remote_id));
1435    }
1436
1437    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1438        self.downstream_client.take();
1439        self.forget_shared_buffers();
1440    }
1441
1442    pub fn discard_incomplete(&mut self) {
1443        self.opened_buffers
1444            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1445    }
1446
1447    pub fn find_search_candidates(
1448        &mut self,
1449        query: &SearchQuery,
1450        mut limit: usize,
1451        fs: Arc<dyn Fs>,
1452        cx: &mut Context<Self>,
1453    ) -> Receiver<Entity<Buffer>> {
1454        let (tx, rx) = smol::channel::unbounded();
1455        let mut open_buffers = HashSet::default();
1456        let mut unnamed_buffers = Vec::new();
1457        for handle in self.buffers() {
1458            let buffer = handle.read(cx);
1459            if let Some(entry_id) = buffer.entry_id(cx) {
1460                open_buffers.insert(entry_id);
1461            } else {
1462                limit = limit.saturating_sub(1);
1463                unnamed_buffers.push(handle)
1464            };
1465        }
1466
1467        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1468        let project_paths_rx = self
1469            .worktree_store
1470            .update(cx, |worktree_store, cx| {
1471                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1472            })
1473            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1474
1475        cx.spawn(|this, mut cx| async move {
1476            for buffer in unnamed_buffers {
1477                tx.send(buffer).await.ok();
1478            }
1479
1480            let mut project_paths_rx = pin!(project_paths_rx);
1481            while let Some(project_paths) = project_paths_rx.next().await {
1482                let buffers = this.update(&mut cx, |this, cx| {
1483                    project_paths
1484                        .into_iter()
1485                        .map(|project_path| this.open_buffer(project_path, cx))
1486                        .collect::<Vec<_>>()
1487                })?;
1488                for buffer_task in buffers {
1489                    if let Some(buffer) = buffer_task.await.log_err() {
1490                        if tx.send(buffer).await.is_err() {
1491                            return anyhow::Ok(());
1492                        }
1493                    }
1494                }
1495            }
1496            anyhow::Ok(())
1497        })
1498        .detach();
1499        rx
1500    }
1501
1502    pub fn recalculate_buffer_diffs(
1503        &mut self,
1504        buffers: Vec<Entity<Buffer>>,
1505        cx: &mut Context<Self>,
1506    ) -> impl Future<Output = ()> {
1507        let mut futures = Vec::new();
1508        for buffer in buffers {
1509            let buffer = buffer.read(cx).text_snapshot();
1510            if let Some(OpenBuffer::Complete {
1511                unstaged_changes, ..
1512            }) = self.opened_buffers.get_mut(&buffer.remote_id())
1513            {
1514                if let Some(unstaged_changes) = unstaged_changes
1515                    .as_ref()
1516                    .and_then(|changes| changes.upgrade())
1517                {
1518                    unstaged_changes.update(cx, |unstaged_changes, cx| {
1519                        futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1520                    });
1521                } else {
1522                    unstaged_changes.take();
1523                }
1524            }
1525        }
1526        async move {
1527            futures::future::join_all(futures).await;
1528        }
1529    }
1530
1531    fn on_buffer_event(
1532        &mut self,
1533        buffer: Entity<Buffer>,
1534        event: &BufferEvent,
1535        cx: &mut Context<Self>,
1536    ) {
1537        match event {
1538            BufferEvent::FileHandleChanged => {
1539                if let Some(local) = self.as_local_mut() {
1540                    local.buffer_changed_file(buffer, cx);
1541                }
1542            }
1543            BufferEvent::Reloaded => {
1544                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1545                    return;
1546                };
1547                let buffer = buffer.read(cx);
1548                downstream_client
1549                    .send(proto::BufferReloaded {
1550                        project_id: *project_id,
1551                        buffer_id: buffer.remote_id().to_proto(),
1552                        version: serialize_version(&buffer.version()),
1553                        mtime: buffer.saved_mtime().map(|t| t.into()),
1554                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1555                    })
1556                    .log_err();
1557            }
1558            _ => {}
1559        }
1560    }
1561
1562    pub async fn handle_update_buffer(
1563        this: Entity<Self>,
1564        envelope: TypedEnvelope<proto::UpdateBuffer>,
1565        mut cx: AsyncAppContext,
1566    ) -> Result<proto::Ack> {
1567        let payload = envelope.payload.clone();
1568        let buffer_id = BufferId::new(payload.buffer_id)?;
1569        let ops = payload
1570            .operations
1571            .into_iter()
1572            .map(language::proto::deserialize_operation)
1573            .collect::<Result<Vec<_>, _>>()?;
1574        this.update(&mut cx, |this, cx| {
1575            match this.opened_buffers.entry(buffer_id) {
1576                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1577                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1578                    OpenBuffer::Complete { buffer, .. } => {
1579                        if let Some(buffer) = buffer.upgrade() {
1580                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1581                        }
1582                    }
1583                },
1584                hash_map::Entry::Vacant(e) => {
1585                    e.insert(OpenBuffer::Operations(ops));
1586                }
1587            }
1588            Ok(proto::Ack {})
1589        })?
1590    }
1591
1592    pub fn register_shared_lsp_handle(
1593        &mut self,
1594        peer_id: proto::PeerId,
1595        buffer_id: BufferId,
1596        handle: OpenLspBufferHandle,
1597    ) {
1598        if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id) {
1599            if let Some(buffer) = shared_buffers.get_mut(&buffer_id) {
1600                buffer.lsp_handle = Some(handle);
1601                return;
1602            }
1603        }
1604        debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1605    }
1606
1607    pub fn handle_synchronize_buffers(
1608        &mut self,
1609        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1610        cx: &mut Context<Self>,
1611        client: Arc<Client>,
1612    ) -> Result<proto::SynchronizeBuffersResponse> {
1613        let project_id = envelope.payload.project_id;
1614        let mut response = proto::SynchronizeBuffersResponse {
1615            buffers: Default::default(),
1616        };
1617        let Some(guest_id) = envelope.original_sender_id else {
1618            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1619        };
1620
1621        self.shared_buffers.entry(guest_id).or_default().clear();
1622        for buffer in envelope.payload.buffers {
1623            let buffer_id = BufferId::new(buffer.id)?;
1624            let remote_version = language::proto::deserialize_version(&buffer.version);
1625            if let Some(buffer) = self.get(buffer_id) {
1626                self.shared_buffers
1627                    .entry(guest_id)
1628                    .or_default()
1629                    .entry(buffer_id)
1630                    .or_insert_with(|| SharedBuffer {
1631                        buffer: buffer.clone(),
1632                        unstaged_changes: None,
1633                        lsp_handle: None,
1634                    });
1635
1636                let buffer = buffer.read(cx);
1637                response.buffers.push(proto::BufferVersion {
1638                    id: buffer_id.into(),
1639                    version: language::proto::serialize_version(&buffer.version),
1640                });
1641
1642                let operations = buffer.serialize_ops(Some(remote_version), cx);
1643                let client = client.clone();
1644                if let Some(file) = buffer.file() {
1645                    client
1646                        .send(proto::UpdateBufferFile {
1647                            project_id,
1648                            buffer_id: buffer_id.into(),
1649                            file: Some(file.to_proto(cx)),
1650                        })
1651                        .log_err();
1652                }
1653
1654                // TODO(max): do something
1655                // client
1656                //     .send(proto::UpdateStagedText {
1657                //         project_id,
1658                //         buffer_id: buffer_id.into(),
1659                //         diff_base: buffer.diff_base().map(ToString::to_string),
1660                //     })
1661                //     .log_err();
1662
1663                client
1664                    .send(proto::BufferReloaded {
1665                        project_id,
1666                        buffer_id: buffer_id.into(),
1667                        version: language::proto::serialize_version(buffer.saved_version()),
1668                        mtime: buffer.saved_mtime().map(|time| time.into()),
1669                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1670                            as i32,
1671                    })
1672                    .log_err();
1673
1674                cx.background_executor()
1675                    .spawn(
1676                        async move {
1677                            let operations = operations.await;
1678                            for chunk in split_operations(operations) {
1679                                client
1680                                    .request(proto::UpdateBuffer {
1681                                        project_id,
1682                                        buffer_id: buffer_id.into(),
1683                                        operations: chunk,
1684                                    })
1685                                    .await?;
1686                            }
1687                            anyhow::Ok(())
1688                        }
1689                        .log_err(),
1690                    )
1691                    .detach();
1692            }
1693        }
1694        Ok(response)
1695    }
1696
1697    pub fn handle_create_buffer_for_peer(
1698        &mut self,
1699        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1700        replica_id: u16,
1701        capability: Capability,
1702        cx: &mut Context<Self>,
1703    ) -> Result<()> {
1704        let Some(remote) = self.as_remote_mut() else {
1705            return Err(anyhow!("buffer store is not a remote"));
1706        };
1707
1708        if let Some(buffer) =
1709            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1710        {
1711            self.add_buffer(buffer, cx)?;
1712        }
1713
1714        Ok(())
1715    }
1716
1717    pub async fn handle_update_buffer_file(
1718        this: Entity<Self>,
1719        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1720        mut cx: AsyncAppContext,
1721    ) -> Result<()> {
1722        let buffer_id = envelope.payload.buffer_id;
1723        let buffer_id = BufferId::new(buffer_id)?;
1724
1725        this.update(&mut cx, |this, cx| {
1726            let payload = envelope.payload.clone();
1727            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1728                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1729                let worktree = this
1730                    .worktree_store
1731                    .read(cx)
1732                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1733                    .ok_or_else(|| anyhow!("no such worktree"))?;
1734                let file = File::from_proto(file, worktree, cx)?;
1735                let old_file = buffer.update(cx, |buffer, cx| {
1736                    let old_file = buffer.file().cloned();
1737                    let new_path = file.path.clone();
1738                    buffer.file_updated(Arc::new(file), cx);
1739                    if old_file
1740                        .as_ref()
1741                        .map_or(true, |old| *old.path() != new_path)
1742                    {
1743                        Some(old_file)
1744                    } else {
1745                        None
1746                    }
1747                });
1748                if let Some(old_file) = old_file {
1749                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1750                }
1751            }
1752            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1753                downstream_client
1754                    .send(proto::UpdateBufferFile {
1755                        project_id: *project_id,
1756                        buffer_id: buffer_id.into(),
1757                        file: envelope.payload.file,
1758                    })
1759                    .log_err();
1760            }
1761            Ok(())
1762        })?
1763    }
1764
1765    pub async fn handle_save_buffer(
1766        this: Entity<Self>,
1767        envelope: TypedEnvelope<proto::SaveBuffer>,
1768        mut cx: AsyncAppContext,
1769    ) -> Result<proto::BufferSaved> {
1770        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1771        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1772            anyhow::Ok((
1773                this.get_existing(buffer_id)?,
1774                this.downstream_client
1775                    .as_ref()
1776                    .map(|(_, project_id)| *project_id)
1777                    .context("project is not shared")?,
1778            ))
1779        })??;
1780        buffer
1781            .update(&mut cx, |buffer, _| {
1782                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1783            })?
1784            .await?;
1785        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1786
1787        if let Some(new_path) = envelope.payload.new_path {
1788            let new_path = ProjectPath::from_proto(new_path);
1789            this.update(&mut cx, |this, cx| {
1790                this.save_buffer_as(buffer.clone(), new_path, cx)
1791            })?
1792            .await?;
1793        } else {
1794            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1795                .await?;
1796        }
1797
1798        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1799            project_id,
1800            buffer_id: buffer_id.into(),
1801            version: serialize_version(buffer.saved_version()),
1802            mtime: buffer.saved_mtime().map(|time| time.into()),
1803        })
1804    }
1805
1806    pub async fn handle_close_buffer(
1807        this: Entity<Self>,
1808        envelope: TypedEnvelope<proto::CloseBuffer>,
1809        mut cx: AsyncAppContext,
1810    ) -> Result<()> {
1811        let peer_id = envelope.sender_id;
1812        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1813        this.update(&mut cx, |this, _| {
1814            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1815                if shared.remove(&buffer_id).is_some() {
1816                    if shared.is_empty() {
1817                        this.shared_buffers.remove(&peer_id);
1818                    }
1819                    return;
1820                }
1821            }
1822            debug_panic!(
1823                "peer_id {} closed buffer_id {} which was either not open or already closed",
1824                peer_id,
1825                buffer_id
1826            )
1827        })
1828    }
1829
1830    pub async fn handle_buffer_saved(
1831        this: Entity<Self>,
1832        envelope: TypedEnvelope<proto::BufferSaved>,
1833        mut cx: AsyncAppContext,
1834    ) -> Result<()> {
1835        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1836        let version = deserialize_version(&envelope.payload.version);
1837        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1838        this.update(&mut cx, move |this, cx| {
1839            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1840                buffer.update(cx, |buffer, cx| {
1841                    buffer.did_save(version, mtime, cx);
1842                });
1843            }
1844
1845            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1846                downstream_client
1847                    .send(proto::BufferSaved {
1848                        project_id: *project_id,
1849                        buffer_id: buffer_id.into(),
1850                        mtime: envelope.payload.mtime,
1851                        version: envelope.payload.version,
1852                    })
1853                    .log_err();
1854            }
1855        })
1856    }
1857
1858    pub async fn handle_buffer_reloaded(
1859        this: Entity<Self>,
1860        envelope: TypedEnvelope<proto::BufferReloaded>,
1861        mut cx: AsyncAppContext,
1862    ) -> Result<()> {
1863        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1864        let version = deserialize_version(&envelope.payload.version);
1865        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1866        let line_ending = deserialize_line_ending(
1867            proto::LineEnding::from_i32(envelope.payload.line_ending)
1868                .ok_or_else(|| anyhow!("missing line ending"))?,
1869        );
1870        this.update(&mut cx, |this, cx| {
1871            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1872                buffer.update(cx, |buffer, cx| {
1873                    buffer.did_reload(version, line_ending, mtime, cx);
1874                });
1875            }
1876
1877            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1878                downstream_client
1879                    .send(proto::BufferReloaded {
1880                        project_id: *project_id,
1881                        buffer_id: buffer_id.into(),
1882                        mtime: envelope.payload.mtime,
1883                        version: envelope.payload.version,
1884                        line_ending: envelope.payload.line_ending,
1885                    })
1886                    .log_err();
1887            }
1888        })
1889    }
1890
1891    pub async fn handle_blame_buffer(
1892        this: Entity<Self>,
1893        envelope: TypedEnvelope<proto::BlameBuffer>,
1894        mut cx: AsyncAppContext,
1895    ) -> Result<proto::BlameBufferResponse> {
1896        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1897        let version = deserialize_version(&envelope.payload.version);
1898        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1899        buffer
1900            .update(&mut cx, |buffer, _| {
1901                buffer.wait_for_version(version.clone())
1902            })?
1903            .await?;
1904        let blame = this
1905            .update(&mut cx, |this, cx| {
1906                this.blame_buffer(&buffer, Some(version), cx)
1907            })?
1908            .await?;
1909        Ok(serialize_blame_buffer_response(blame))
1910    }
1911
1912    pub async fn handle_get_permalink_to_line(
1913        this: Entity<Self>,
1914        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1915        mut cx: AsyncAppContext,
1916    ) -> Result<proto::GetPermalinkToLineResponse> {
1917        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1918        // let version = deserialize_version(&envelope.payload.version);
1919        let selection = {
1920            let proto_selection = envelope
1921                .payload
1922                .selection
1923                .context("no selection to get permalink for defined")?;
1924            proto_selection.start as u32..proto_selection.end as u32
1925        };
1926        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1927        let permalink = this
1928            .update(&mut cx, |this, cx| {
1929                this.get_permalink_to_line(&buffer, selection, cx)
1930            })?
1931            .await?;
1932        Ok(proto::GetPermalinkToLineResponse {
1933            permalink: permalink.to_string(),
1934        })
1935    }
1936
1937    pub async fn handle_get_staged_text(
1938        this: Entity<Self>,
1939        request: TypedEnvelope<proto::GetStagedText>,
1940        mut cx: AsyncAppContext,
1941    ) -> Result<proto::GetStagedTextResponse> {
1942        let buffer_id = BufferId::new(request.payload.buffer_id)?;
1943        let change_set = this
1944            .update(&mut cx, |this, cx| {
1945                let buffer = this.get(buffer_id)?;
1946                Some(this.open_unstaged_changes(buffer, cx))
1947            })?
1948            .ok_or_else(|| anyhow!("no such buffer"))?
1949            .await?;
1950        this.update(&mut cx, |this, _| {
1951            let shared_buffers = this
1952                .shared_buffers
1953                .entry(request.original_sender_id.unwrap_or(request.sender_id))
1954                .or_default();
1955            debug_assert!(shared_buffers.contains_key(&buffer_id));
1956            if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1957                shared.unstaged_changes = Some(change_set.clone());
1958            }
1959        })?;
1960        let staged_text = change_set.read_with(&cx, |change_set, _| {
1961            change_set.base_text.as_ref().map(|buffer| buffer.text())
1962        })?;
1963        Ok(proto::GetStagedTextResponse { staged_text })
1964    }
1965
1966    pub async fn handle_update_diff_base(
1967        this: Entity<Self>,
1968        request: TypedEnvelope<proto::UpdateDiffBase>,
1969        mut cx: AsyncAppContext,
1970    ) -> Result<()> {
1971        let buffer_id = BufferId::new(request.payload.buffer_id)?;
1972        let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1973            if let OpenBuffer::Complete {
1974                unstaged_changes,
1975                buffer,
1976            } = this.opened_buffers.get(&buffer_id)?
1977            {
1978                Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1979            } else {
1980                None
1981            }
1982        })?
1983        else {
1984            return Ok(());
1985        };
1986        change_set.update(&mut cx, |change_set, cx| {
1987            if let Some(staged_text) = request.payload.staged_text {
1988                let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
1989            } else {
1990                change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
1991            }
1992        })?;
1993        Ok(())
1994    }
1995
1996    pub fn reload_buffers(
1997        &self,
1998        buffers: HashSet<Entity<Buffer>>,
1999        push_to_history: bool,
2000        cx: &mut Context<Self>,
2001    ) -> Task<Result<ProjectTransaction>> {
2002        if buffers.is_empty() {
2003            return Task::ready(Ok(ProjectTransaction::default()));
2004        }
2005        match &self.state {
2006            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
2007            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
2008        }
2009    }
2010
2011    async fn handle_reload_buffers(
2012        this: Entity<Self>,
2013        envelope: TypedEnvelope<proto::ReloadBuffers>,
2014        mut cx: AsyncAppContext,
2015    ) -> Result<proto::ReloadBuffersResponse> {
2016        let sender_id = envelope.original_sender_id().unwrap_or_default();
2017        let reload = this.update(&mut cx, |this, cx| {
2018            let mut buffers = HashSet::default();
2019            for buffer_id in &envelope.payload.buffer_ids {
2020                let buffer_id = BufferId::new(*buffer_id)?;
2021                buffers.insert(this.get_existing(buffer_id)?);
2022            }
2023            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
2024        })??;
2025
2026        let project_transaction = reload.await?;
2027        let project_transaction = this.update(&mut cx, |this, cx| {
2028            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
2029        })?;
2030        Ok(proto::ReloadBuffersResponse {
2031            transaction: Some(project_transaction),
2032        })
2033    }
2034
2035    pub fn create_buffer_for_peer(
2036        &mut self,
2037        buffer: &Entity<Buffer>,
2038        peer_id: proto::PeerId,
2039        cx: &mut Context<Self>,
2040    ) -> Task<Result<()>> {
2041        let buffer_id = buffer.read(cx).remote_id();
2042        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2043        if shared_buffers.contains_key(&buffer_id) {
2044            return Task::ready(Ok(()));
2045        }
2046        shared_buffers.insert(
2047            buffer_id,
2048            SharedBuffer {
2049                buffer: buffer.clone(),
2050                unstaged_changes: None,
2051                lsp_handle: None,
2052            },
2053        );
2054
2055        let Some((client, project_id)) = self.downstream_client.clone() else {
2056            return Task::ready(Ok(()));
2057        };
2058
2059        cx.spawn(|this, mut cx| async move {
2060            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2061                return anyhow::Ok(());
2062            };
2063
2064            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2065            let operations = operations.await;
2066            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2067
2068            let initial_state = proto::CreateBufferForPeer {
2069                project_id,
2070                peer_id: Some(peer_id),
2071                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2072            };
2073
2074            if client.send(initial_state).log_err().is_some() {
2075                let client = client.clone();
2076                cx.background_executor()
2077                    .spawn(async move {
2078                        let mut chunks = split_operations(operations).peekable();
2079                        while let Some(chunk) = chunks.next() {
2080                            let is_last = chunks.peek().is_none();
2081                            client.send(proto::CreateBufferForPeer {
2082                                project_id,
2083                                peer_id: Some(peer_id),
2084                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2085                                    proto::BufferChunk {
2086                                        buffer_id: buffer_id.into(),
2087                                        operations: chunk,
2088                                        is_last,
2089                                    },
2090                                )),
2091                            })?;
2092                        }
2093                        anyhow::Ok(())
2094                    })
2095                    .await
2096                    .log_err();
2097            }
2098            Ok(())
2099        })
2100    }
2101
2102    pub fn forget_shared_buffers(&mut self) {
2103        self.shared_buffers.clear();
2104    }
2105
2106    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2107        self.shared_buffers.remove(peer_id);
2108    }
2109
2110    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2111        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2112            self.shared_buffers.insert(new_peer_id, buffers);
2113        }
2114    }
2115
2116    pub fn has_shared_buffers(&self) -> bool {
2117        !self.shared_buffers.is_empty()
2118    }
2119
2120    pub fn create_local_buffer(
2121        &mut self,
2122        text: &str,
2123        language: Option<Arc<Language>>,
2124        cx: &mut Context<Self>,
2125    ) -> Entity<Buffer> {
2126        let buffer = cx.new(|cx| {
2127            Buffer::local(text, cx)
2128                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2129        });
2130
2131        self.add_buffer(buffer.clone(), cx).log_err();
2132        let buffer_id = buffer.read(cx).remote_id();
2133
2134        let this = self
2135            .as_local_mut()
2136            .expect("local-only method called in a non-local context");
2137        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2138            this.local_buffer_ids_by_path.insert(
2139                ProjectPath {
2140                    worktree_id: file.worktree_id(cx),
2141                    path: file.path.clone(),
2142                },
2143                buffer_id,
2144            );
2145
2146            if let Some(entry_id) = file.entry_id {
2147                this.local_buffer_ids_by_entry_id
2148                    .insert(entry_id, buffer_id);
2149            }
2150        }
2151        buffer
2152    }
2153
2154    pub fn deserialize_project_transaction(
2155        &mut self,
2156        message: proto::ProjectTransaction,
2157        push_to_history: bool,
2158        cx: &mut Context<Self>,
2159    ) -> Task<Result<ProjectTransaction>> {
2160        if let Some(this) = self.as_remote_mut() {
2161            this.deserialize_project_transaction(message, push_to_history, cx)
2162        } else {
2163            debug_panic!("not a remote buffer store");
2164            Task::ready(Err(anyhow!("not a remote buffer store")))
2165        }
2166    }
2167
2168    pub fn wait_for_remote_buffer(
2169        &mut self,
2170        id: BufferId,
2171        cx: &mut Context<BufferStore>,
2172    ) -> Task<Result<Entity<Buffer>>> {
2173        if let Some(this) = self.as_remote_mut() {
2174            this.wait_for_remote_buffer(id, cx)
2175        } else {
2176            debug_panic!("not a remote buffer store");
2177            Task::ready(Err(anyhow!("not a remote buffer store")))
2178        }
2179    }
2180
2181    pub fn serialize_project_transaction_for_peer(
2182        &mut self,
2183        project_transaction: ProjectTransaction,
2184        peer_id: proto::PeerId,
2185        cx: &mut Context<Self>,
2186    ) -> proto::ProjectTransaction {
2187        let mut serialized_transaction = proto::ProjectTransaction {
2188            buffer_ids: Default::default(),
2189            transactions: Default::default(),
2190        };
2191        for (buffer, transaction) in project_transaction.0 {
2192            self.create_buffer_for_peer(&buffer, peer_id, cx)
2193                .detach_and_log_err(cx);
2194            serialized_transaction
2195                .buffer_ids
2196                .push(buffer.read(cx).remote_id().into());
2197            serialized_transaction
2198                .transactions
2199                .push(language::proto::serialize_transaction(&transaction));
2200        }
2201        serialized_transaction
2202    }
2203}
2204
2205impl BufferChangeSet {
2206    pub fn new(buffer: &Entity<Buffer>, cx: &mut Context<Self>) -> Self {
2207        cx.subscribe(buffer, |this, buffer, event, cx| match event {
2208            BufferEvent::LanguageChanged => {
2209                this.language = buffer.read(cx).language().cloned();
2210                if let Some(base_text) = &this.base_text {
2211                    let snapshot = language::Buffer::build_snapshot(
2212                        base_text.as_rope().clone(),
2213                        this.language.clone(),
2214                        this.language_registry.clone(),
2215                        cx,
2216                    );
2217                    this.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2218                        let base_text = cx.background_executor().spawn(snapshot).await;
2219                        this.update(&mut cx, |this, cx| {
2220                            this.base_text = Some(base_text);
2221                            cx.notify();
2222                        })
2223                    }));
2224                }
2225            }
2226            _ => {}
2227        })
2228        .detach();
2229
2230        let buffer = buffer.read(cx);
2231
2232        Self {
2233            buffer_id: buffer.remote_id(),
2234            base_text: None,
2235            diff_to_buffer: git::diff::BufferDiff::new(buffer),
2236            recalculate_diff_task: None,
2237            diff_updated_futures: Vec::new(),
2238            language: buffer.language().cloned(),
2239            language_registry: buffer.language_registry(),
2240        }
2241    }
2242
2243    #[cfg(any(test, feature = "test-support"))]
2244    pub fn new_with_base_text(
2245        base_text: String,
2246        buffer: &Entity<Buffer>,
2247        cx: &mut Context<Self>,
2248    ) -> Self {
2249        let mut this = Self::new(&buffer, cx);
2250        let _ = this.set_base_text(base_text, buffer.read(cx).text_snapshot(), cx);
2251        this
2252    }
2253
2254    pub fn diff_hunks_intersecting_range<'a>(
2255        &'a self,
2256        range: Range<text::Anchor>,
2257        buffer_snapshot: &'a text::BufferSnapshot,
2258    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2259        self.diff_to_buffer
2260            .hunks_intersecting_range(range, buffer_snapshot)
2261    }
2262
2263    pub fn diff_hunks_intersecting_range_rev<'a>(
2264        &'a self,
2265        range: Range<text::Anchor>,
2266        buffer_snapshot: &'a text::BufferSnapshot,
2267    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2268        self.diff_to_buffer
2269            .hunks_intersecting_range_rev(range, buffer_snapshot)
2270    }
2271
2272    #[cfg(any(test, feature = "test-support"))]
2273    pub fn base_text_string(&self) -> Option<String> {
2274        self.base_text.as_ref().map(|buffer| buffer.text())
2275    }
2276
2277    pub fn set_base_text(
2278        &mut self,
2279        mut base_text: String,
2280        buffer_snapshot: text::BufferSnapshot,
2281        cx: &mut Context<Self>,
2282    ) -> oneshot::Receiver<()> {
2283        LineEnding::normalize(&mut base_text);
2284        self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2285    }
2286
2287    pub fn unset_base_text(
2288        &mut self,
2289        buffer_snapshot: text::BufferSnapshot,
2290        cx: &mut Context<Self>,
2291    ) {
2292        if self.base_text.is_some() {
2293            self.base_text = None;
2294            self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2295            self.recalculate_diff_task.take();
2296            cx.notify();
2297        }
2298    }
2299
2300    pub fn recalculate_diff(
2301        &mut self,
2302        buffer_snapshot: text::BufferSnapshot,
2303        cx: &mut Context<Self>,
2304    ) -> oneshot::Receiver<()> {
2305        if let Some(base_text) = self.base_text.clone() {
2306            self.recalculate_diff_internal(base_text.text(), buffer_snapshot, false, cx)
2307        } else {
2308            oneshot::channel().1
2309        }
2310    }
2311
2312    fn recalculate_diff_internal(
2313        &mut self,
2314        base_text: String,
2315        buffer_snapshot: text::BufferSnapshot,
2316        base_text_changed: bool,
2317        cx: &mut Context<Self>,
2318    ) -> oneshot::Receiver<()> {
2319        let (tx, rx) = oneshot::channel();
2320        self.diff_updated_futures.push(tx);
2321        self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2322            let new_base_text = if base_text_changed {
2323                let base_text_rope: Rope = base_text.as_str().into();
2324                let snapshot = this.update(&mut cx, |this, cx| {
2325                    language::Buffer::build_snapshot(
2326                        base_text_rope,
2327                        this.language.clone(),
2328                        this.language_registry.clone(),
2329                        cx,
2330                    )
2331                })?;
2332                Some(cx.background_executor().spawn(snapshot).await)
2333            } else {
2334                None
2335            };
2336            let diff = cx
2337                .background_executor()
2338                .spawn({
2339                    let buffer_snapshot = buffer_snapshot.clone();
2340                    async move { BufferDiff::build(&base_text, &buffer_snapshot) }
2341                })
2342                .await;
2343            this.update(&mut cx, |this, cx| {
2344                if let Some(new_base_text) = new_base_text {
2345                    this.base_text = Some(new_base_text)
2346                }
2347                this.diff_to_buffer = diff;
2348                this.recalculate_diff_task.take();
2349                for tx in this.diff_updated_futures.drain(..) {
2350                    tx.send(()).ok();
2351                }
2352                cx.notify();
2353            })?;
2354            Ok(())
2355        }));
2356        rx
2357    }
2358
2359    #[cfg(any(test, feature = "test-support"))]
2360    pub fn recalculate_diff_sync(
2361        &mut self,
2362        mut base_text: String,
2363        buffer_snapshot: text::BufferSnapshot,
2364        base_text_changed: bool,
2365        cx: &mut Context<Self>,
2366    ) {
2367        LineEnding::normalize(&mut base_text);
2368        let diff = BufferDiff::build(&base_text, &buffer_snapshot);
2369        if base_text_changed {
2370            self.base_text = Some(
2371                cx.background_executor()
2372                    .clone()
2373                    .block(Buffer::build_snapshot(
2374                        base_text.into(),
2375                        self.language.clone(),
2376                        self.language_registry.clone(),
2377                        cx,
2378                    )),
2379            );
2380        }
2381        self.diff_to_buffer = diff;
2382        self.recalculate_diff_task.take();
2383        cx.notify();
2384    }
2385}
2386
2387impl OpenBuffer {
2388    fn upgrade(&self) -> Option<Entity<Buffer>> {
2389        match self {
2390            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2391            OpenBuffer::Operations(_) => None,
2392        }
2393    }
2394}
2395
2396fn is_not_found_error(error: &anyhow::Error) -> bool {
2397    error
2398        .root_cause()
2399        .downcast_ref::<io::Error>()
2400        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2401}
2402
2403fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2404    let Some(blame) = blame else {
2405        return proto::BlameBufferResponse {
2406            blame_response: None,
2407        };
2408    };
2409
2410    let entries = blame
2411        .entries
2412        .into_iter()
2413        .map(|entry| proto::BlameEntry {
2414            sha: entry.sha.as_bytes().into(),
2415            start_line: entry.range.start,
2416            end_line: entry.range.end,
2417            original_line_number: entry.original_line_number,
2418            author: entry.author.clone(),
2419            author_mail: entry.author_mail.clone(),
2420            author_time: entry.author_time,
2421            author_tz: entry.author_tz.clone(),
2422            committer: entry.committer.clone(),
2423            committer_mail: entry.committer_mail.clone(),
2424            committer_time: entry.committer_time,
2425            committer_tz: entry.committer_tz.clone(),
2426            summary: entry.summary.clone(),
2427            previous: entry.previous.clone(),
2428            filename: entry.filename.clone(),
2429        })
2430        .collect::<Vec<_>>();
2431
2432    let messages = blame
2433        .messages
2434        .into_iter()
2435        .map(|(oid, message)| proto::CommitMessage {
2436            oid: oid.as_bytes().into(),
2437            message,
2438        })
2439        .collect::<Vec<_>>();
2440
2441    let permalinks = blame
2442        .permalinks
2443        .into_iter()
2444        .map(|(oid, url)| proto::CommitPermalink {
2445            oid: oid.as_bytes().into(),
2446            permalink: url.to_string(),
2447        })
2448        .collect::<Vec<_>>();
2449
2450    proto::BlameBufferResponse {
2451        blame_response: Some(proto::blame_buffer_response::BlameResponse {
2452            entries,
2453            messages,
2454            permalinks,
2455            remote_url: blame.remote_url,
2456        }),
2457    }
2458}
2459
2460fn deserialize_blame_buffer_response(
2461    response: proto::BlameBufferResponse,
2462) -> Option<git::blame::Blame> {
2463    let response = response.blame_response?;
2464    let entries = response
2465        .entries
2466        .into_iter()
2467        .filter_map(|entry| {
2468            Some(git::blame::BlameEntry {
2469                sha: git::Oid::from_bytes(&entry.sha).ok()?,
2470                range: entry.start_line..entry.end_line,
2471                original_line_number: entry.original_line_number,
2472                committer: entry.committer,
2473                committer_time: entry.committer_time,
2474                committer_tz: entry.committer_tz,
2475                committer_mail: entry.committer_mail,
2476                author: entry.author,
2477                author_mail: entry.author_mail,
2478                author_time: entry.author_time,
2479                author_tz: entry.author_tz,
2480                summary: entry.summary,
2481                previous: entry.previous,
2482                filename: entry.filename,
2483            })
2484        })
2485        .collect::<Vec<_>>();
2486
2487    let messages = response
2488        .messages
2489        .into_iter()
2490        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2491        .collect::<HashMap<_, _>>();
2492
2493    let permalinks = response
2494        .permalinks
2495        .into_iter()
2496        .filter_map(|permalink| {
2497            Some((
2498                git::Oid::from_bytes(&permalink.oid).ok()?,
2499                Url::from_str(&permalink.permalink).ok()?,
2500            ))
2501        })
2502        .collect::<HashMap<_, _>>();
2503
2504    Some(Blame {
2505        entries,
2506        permalinks,
2507        messages,
2508        remote_url: response.remote_url,
2509    })
2510}
2511
2512fn get_permalink_in_rust_registry_src(
2513    provider_registry: Arc<GitHostingProviderRegistry>,
2514    path: PathBuf,
2515    selection: Range<u32>,
2516) -> Result<url::Url> {
2517    #[derive(Deserialize)]
2518    struct CargoVcsGit {
2519        sha1: String,
2520    }
2521
2522    #[derive(Deserialize)]
2523    struct CargoVcsInfo {
2524        git: CargoVcsGit,
2525        path_in_vcs: String,
2526    }
2527
2528    #[derive(Deserialize)]
2529    struct CargoPackage {
2530        repository: String,
2531    }
2532
2533    #[derive(Deserialize)]
2534    struct CargoToml {
2535        package: CargoPackage,
2536    }
2537
2538    let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
2539        let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
2540        Some((dir, json))
2541    }) else {
2542        bail!("No .cargo_vcs_info.json found in parent directories")
2543    };
2544    let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
2545    let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
2546    let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
2547    let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
2548        .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
2549    let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
2550    let permalink = provider.build_permalink(
2551        remote,
2552        BuildPermalinkParams {
2553            sha: &cargo_vcs_info.git.sha1,
2554            path: &path.to_string_lossy(),
2555            selection: Some(selection),
2556        },
2557    );
2558    Ok(permalink)
2559}