buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    ProjectItem as _, ProjectPath,
   5};
   6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
   7use anyhow::{anyhow, Context as _, Result};
   8use client::Client;
   9use collections::{hash_map, HashMap, HashSet};
  10use fs::Fs;
  11use futures::{channel::oneshot, future::Shared, Future, FutureExt as _, StreamExt};
  12use git::{blame::Blame, diff::BufferDiff};
  13use gpui::{
  14    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
  15    Task, WeakModel,
  16};
  17use http_client::Url;
  18use language::{
  19    proto::{
  20        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  21        split_operations,
  22    },
  23    Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
  24};
  25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  26use smol::channel::Receiver;
  27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
  28use text::{BufferId, LineEnding, Rope};
  29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
  31
  32/// A set of open buffers.
  33pub struct BufferStore {
  34    state: BufferStoreState,
  35    #[allow(clippy::type_complexity)]
  36    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Model<Buffer>, Arc<anyhow::Error>>>>>,
  37    #[allow(clippy::type_complexity)]
  38    loading_change_sets:
  39        HashMap<BufferId, Shared<Task<Result<Model<BufferChangeSet>, Arc<anyhow::Error>>>>>,
  40    worktree_store: Model<WorktreeStore>,
  41    opened_buffers: HashMap<BufferId, OpenBuffer>,
  42    downstream_client: Option<(AnyProtoClient, u64)>,
  43    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  44}
  45
  46#[derive(Hash, Eq, PartialEq, Clone)]
  47struct SharedBuffer {
  48    buffer: Model<Buffer>,
  49    unstaged_changes: Option<Model<BufferChangeSet>>,
  50}
  51
  52pub struct BufferChangeSet {
  53    pub buffer_id: BufferId,
  54    pub base_text: Option<Model<Buffer>>,
  55    pub diff_to_buffer: git::diff::BufferDiff,
  56    pub recalculate_diff_task: Option<Task<Result<()>>>,
  57    pub diff_updated_futures: Vec<oneshot::Sender<()>>,
  58    pub base_text_version: usize,
  59}
  60
  61enum BufferStoreState {
  62    Local(LocalBufferStore),
  63    Remote(RemoteBufferStore),
  64}
  65
  66struct RemoteBufferStore {
  67    shared_with_me: HashSet<Model<Buffer>>,
  68    upstream_client: AnyProtoClient,
  69    project_id: u64,
  70    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  71    remote_buffer_listeners:
  72        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  73    worktree_store: Model<WorktreeStore>,
  74}
  75
  76struct LocalBufferStore {
  77    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  78    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  79    worktree_store: Model<WorktreeStore>,
  80    _subscription: Subscription,
  81}
  82
  83enum OpenBuffer {
  84    Complete {
  85        buffer: WeakModel<Buffer>,
  86        unstaged_changes: Option<WeakModel<BufferChangeSet>>,
  87    },
  88    Operations(Vec<Operation>),
  89}
  90
  91pub enum BufferStoreEvent {
  92    BufferAdded(Model<Buffer>),
  93    BufferDropped(BufferId),
  94    BufferChangedFilePath {
  95        buffer: Model<Buffer>,
  96        old_file: Option<Arc<dyn language::File>>,
  97    },
  98}
  99
 100#[derive(Default, Debug)]
 101pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
 102
 103impl EventEmitter<BufferStoreEvent> for BufferStore {}
 104
 105impl RemoteBufferStore {
 106    fn load_staged_text(
 107        &self,
 108        buffer_id: BufferId,
 109        cx: &AppContext,
 110    ) -> Task<Result<Option<String>>> {
 111        let project_id = self.project_id;
 112        let client = self.upstream_client.clone();
 113        cx.background_executor().spawn(async move {
 114            Ok(client
 115                .request(proto::GetStagedText {
 116                    project_id,
 117                    buffer_id: buffer_id.to_proto(),
 118                })
 119                .await?
 120                .staged_text)
 121        })
 122    }
 123    pub fn wait_for_remote_buffer(
 124        &mut self,
 125        id: BufferId,
 126        cx: &mut ModelContext<BufferStore>,
 127    ) -> Task<Result<Model<Buffer>>> {
 128        let (tx, rx) = oneshot::channel();
 129        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 130
 131        cx.spawn(|this, cx| async move {
 132            if let Some(buffer) = this
 133                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
 134                .ok()
 135                .flatten()
 136            {
 137                return Ok(buffer);
 138            }
 139
 140            cx.background_executor()
 141                .spawn(async move { rx.await? })
 142                .await
 143        })
 144    }
 145
 146    fn save_remote_buffer(
 147        &self,
 148        buffer_handle: Model<Buffer>,
 149        new_path: Option<proto::ProjectPath>,
 150        cx: &ModelContext<BufferStore>,
 151    ) -> Task<Result<()>> {
 152        let buffer = buffer_handle.read(cx);
 153        let buffer_id = buffer.remote_id().into();
 154        let version = buffer.version();
 155        let rpc = self.upstream_client.clone();
 156        let project_id = self.project_id;
 157        cx.spawn(move |_, mut cx| async move {
 158            let response = rpc
 159                .request(proto::SaveBuffer {
 160                    project_id,
 161                    buffer_id,
 162                    new_path,
 163                    version: serialize_version(&version),
 164                })
 165                .await?;
 166            let version = deserialize_version(&response.version);
 167            let mtime = response.mtime.map(|mtime| mtime.into());
 168
 169            buffer_handle.update(&mut cx, |buffer, cx| {
 170                buffer.did_save(version.clone(), mtime, cx);
 171            })?;
 172
 173            Ok(())
 174        })
 175    }
 176
 177    pub fn handle_create_buffer_for_peer(
 178        &mut self,
 179        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 180        replica_id: u16,
 181        capability: Capability,
 182        cx: &mut ModelContext<BufferStore>,
 183    ) -> Result<Option<Model<Buffer>>> {
 184        match envelope
 185            .payload
 186            .variant
 187            .ok_or_else(|| anyhow!("missing variant"))?
 188        {
 189            proto::create_buffer_for_peer::Variant::State(mut state) => {
 190                let buffer_id = BufferId::new(state.id)?;
 191
 192                let buffer_result = maybe!({
 193                    let mut buffer_file = None;
 194                    if let Some(file) = state.file.take() {
 195                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 196                        let worktree = self
 197                            .worktree_store
 198                            .read(cx)
 199                            .worktree_for_id(worktree_id, cx)
 200                            .ok_or_else(|| {
 201                                anyhow!("no worktree found for id {}", file.worktree_id)
 202                            })?;
 203                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 204                            as Arc<dyn language::File>);
 205                    }
 206                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 207                });
 208
 209                match buffer_result {
 210                    Ok(buffer) => {
 211                        let buffer = cx.new_model(|_| buffer);
 212                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 213                    }
 214                    Err(error) => {
 215                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 216                            for listener in listeners {
 217                                listener.send(Err(anyhow!(error.cloned()))).ok();
 218                            }
 219                        }
 220                    }
 221                }
 222            }
 223            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 224                let buffer_id = BufferId::new(chunk.buffer_id)?;
 225                let buffer = self
 226                    .loading_remote_buffers_by_id
 227                    .get(&buffer_id)
 228                    .cloned()
 229                    .ok_or_else(|| {
 230                        anyhow!(
 231                            "received chunk for buffer {} without initial state",
 232                            chunk.buffer_id
 233                        )
 234                    })?;
 235
 236                let result = maybe!({
 237                    let operations = chunk
 238                        .operations
 239                        .into_iter()
 240                        .map(language::proto::deserialize_operation)
 241                        .collect::<Result<Vec<_>>>()?;
 242                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 243                    anyhow::Ok(())
 244                });
 245
 246                if let Err(error) = result {
 247                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 248                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 249                        for listener in listeners {
 250                            listener.send(Err(error.cloned())).ok();
 251                        }
 252                    }
 253                } else if chunk.is_last {
 254                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 255                    if self.upstream_client.is_via_collab() {
 256                        // retain buffers sent by peers to avoid races.
 257                        self.shared_with_me.insert(buffer.clone());
 258                    }
 259
 260                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 261                        for sender in senders {
 262                            sender.send(Ok(buffer.clone())).ok();
 263                        }
 264                    }
 265                    return Ok(Some(buffer));
 266                }
 267            }
 268        }
 269        return Ok(None);
 270    }
 271
 272    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 273        self.loading_remote_buffers_by_id
 274            .keys()
 275            .copied()
 276            .collect::<Vec<_>>()
 277    }
 278
 279    pub fn deserialize_project_transaction(
 280        &self,
 281        message: proto::ProjectTransaction,
 282        push_to_history: bool,
 283        cx: &mut ModelContext<BufferStore>,
 284    ) -> Task<Result<ProjectTransaction>> {
 285        cx.spawn(|this, mut cx| async move {
 286            let mut project_transaction = ProjectTransaction::default();
 287            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 288            {
 289                let buffer_id = BufferId::new(buffer_id)?;
 290                let buffer = this
 291                    .update(&mut cx, |this, cx| {
 292                        this.wait_for_remote_buffer(buffer_id, cx)
 293                    })?
 294                    .await?;
 295                let transaction = language::proto::deserialize_transaction(transaction)?;
 296                project_transaction.0.insert(buffer, transaction);
 297            }
 298
 299            for (buffer, transaction) in &project_transaction.0 {
 300                buffer
 301                    .update(&mut cx, |buffer, _| {
 302                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 303                    })?
 304                    .await?;
 305
 306                if push_to_history {
 307                    buffer.update(&mut cx, |buffer, _| {
 308                        buffer.push_transaction(transaction.clone(), Instant::now());
 309                    })?;
 310                }
 311            }
 312
 313            Ok(project_transaction)
 314        })
 315    }
 316
 317    fn open_buffer(
 318        &self,
 319        path: Arc<Path>,
 320        worktree: Model<Worktree>,
 321        cx: &mut ModelContext<BufferStore>,
 322    ) -> Task<Result<Model<Buffer>>> {
 323        let worktree_id = worktree.read(cx).id().to_proto();
 324        let project_id = self.project_id;
 325        let client = self.upstream_client.clone();
 326        let path_string = path.clone().to_string_lossy().to_string();
 327        cx.spawn(move |this, mut cx| async move {
 328            let response = client
 329                .request(proto::OpenBufferByPath {
 330                    project_id,
 331                    worktree_id,
 332                    path: path_string,
 333                })
 334                .await?;
 335            let buffer_id = BufferId::new(response.buffer_id)?;
 336
 337            let buffer = this
 338                .update(&mut cx, {
 339                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 340                })?
 341                .await?;
 342
 343            Ok(buffer)
 344        })
 345    }
 346
 347    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 348        let create = self.upstream_client.request(proto::OpenNewBuffer {
 349            project_id: self.project_id,
 350        });
 351        cx.spawn(|this, mut cx| async move {
 352            let response = create.await?;
 353            let buffer_id = BufferId::new(response.buffer_id)?;
 354
 355            this.update(&mut cx, |this, cx| {
 356                this.wait_for_remote_buffer(buffer_id, cx)
 357            })?
 358            .await
 359        })
 360    }
 361
 362    fn reload_buffers(
 363        &self,
 364        buffers: HashSet<Model<Buffer>>,
 365        push_to_history: bool,
 366        cx: &mut ModelContext<BufferStore>,
 367    ) -> Task<Result<ProjectTransaction>> {
 368        let request = self.upstream_client.request(proto::ReloadBuffers {
 369            project_id: self.project_id,
 370            buffer_ids: buffers
 371                .iter()
 372                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 373                .collect(),
 374        });
 375
 376        cx.spawn(|this, mut cx| async move {
 377            let response = request
 378                .await?
 379                .transaction
 380                .ok_or_else(|| anyhow!("missing transaction"))?;
 381            this.update(&mut cx, |this, cx| {
 382                this.deserialize_project_transaction(response, push_to_history, cx)
 383            })?
 384            .await
 385        })
 386    }
 387}
 388
 389impl LocalBufferStore {
 390    fn load_staged_text(
 391        &self,
 392        buffer: &Model<Buffer>,
 393        cx: &AppContext,
 394    ) -> Task<Result<Option<String>>> {
 395        let Some(file) = buffer.read(cx).file() else {
 396            return Task::ready(Err(anyhow!("buffer has no file")));
 397        };
 398        let worktree_id = file.worktree_id(cx);
 399        let path = file.path().clone();
 400        let Some(worktree) = self
 401            .worktree_store
 402            .read(cx)
 403            .worktree_for_id(worktree_id, cx)
 404        else {
 405            return Task::ready(Err(anyhow!("no such worktree")));
 406        };
 407
 408        worktree.read(cx).load_staged_file(path.as_ref(), cx)
 409    }
 410
 411    fn save_local_buffer(
 412        &self,
 413        buffer_handle: Model<Buffer>,
 414        worktree: Model<Worktree>,
 415        path: Arc<Path>,
 416        mut has_changed_file: bool,
 417        cx: &mut ModelContext<BufferStore>,
 418    ) -> Task<Result<()>> {
 419        let buffer = buffer_handle.read(cx);
 420
 421        let text = buffer.as_rope().clone();
 422        let line_ending = buffer.line_ending();
 423        let version = buffer.version();
 424        let buffer_id = buffer.remote_id();
 425        if buffer
 426            .file()
 427            .is_some_and(|file| file.disk_state() == DiskState::New)
 428        {
 429            has_changed_file = true;
 430        }
 431
 432        let save = worktree.update(cx, |worktree, cx| {
 433            worktree.write_file(path.as_ref(), text, line_ending, cx)
 434        });
 435
 436        cx.spawn(move |this, mut cx| async move {
 437            let new_file = save.await?;
 438            let mtime = new_file.disk_state().mtime();
 439            this.update(&mut cx, |this, cx| {
 440                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 441                    if has_changed_file {
 442                        downstream_client
 443                            .send(proto::UpdateBufferFile {
 444                                project_id,
 445                                buffer_id: buffer_id.to_proto(),
 446                                file: Some(language::File::to_proto(&*new_file, cx)),
 447                            })
 448                            .log_err();
 449                    }
 450                    downstream_client
 451                        .send(proto::BufferSaved {
 452                            project_id,
 453                            buffer_id: buffer_id.to_proto(),
 454                            version: serialize_version(&version),
 455                            mtime: mtime.map(|time| time.into()),
 456                        })
 457                        .log_err();
 458                }
 459            })?;
 460            buffer_handle.update(&mut cx, |buffer, cx| {
 461                if has_changed_file {
 462                    buffer.file_updated(new_file, cx);
 463                }
 464                buffer.did_save(version.clone(), mtime, cx);
 465            })
 466        })
 467    }
 468
 469    fn subscribe_to_worktree(
 470        &mut self,
 471        worktree: &Model<Worktree>,
 472        cx: &mut ModelContext<BufferStore>,
 473    ) {
 474        cx.subscribe(worktree, |this, worktree, event, cx| {
 475            if worktree.read(cx).is_local() {
 476                match event {
 477                    worktree::Event::UpdatedEntries(changes) => {
 478                        Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 479                    }
 480                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 481                        Self::local_worktree_git_repos_changed(
 482                            this,
 483                            worktree.clone(),
 484                            updated_repos,
 485                            cx,
 486                        )
 487                    }
 488                    _ => {}
 489                }
 490            }
 491        })
 492        .detach();
 493    }
 494
 495    fn local_worktree_entries_changed(
 496        this: &mut BufferStore,
 497        worktree_handle: &Model<Worktree>,
 498        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 499        cx: &mut ModelContext<BufferStore>,
 500    ) {
 501        let snapshot = worktree_handle.read(cx).snapshot();
 502        for (path, entry_id, _) in changes {
 503            Self::local_worktree_entry_changed(
 504                this,
 505                *entry_id,
 506                path,
 507                worktree_handle,
 508                &snapshot,
 509                cx,
 510            );
 511        }
 512    }
 513
 514    fn local_worktree_git_repos_changed(
 515        this: &mut BufferStore,
 516        worktree_handle: Model<Worktree>,
 517        changed_repos: &UpdatedGitRepositoriesSet,
 518        cx: &mut ModelContext<BufferStore>,
 519    ) {
 520        debug_assert!(worktree_handle.read(cx).is_local());
 521
 522        let buffer_change_sets = this
 523            .opened_buffers
 524            .values()
 525            .filter_map(|buffer| {
 526                if let OpenBuffer::Complete {
 527                    buffer,
 528                    unstaged_changes,
 529                } = buffer
 530                {
 531                    let buffer = buffer.upgrade()?.read(cx);
 532                    let file = File::from_dyn(buffer.file())?;
 533                    if file.worktree != worktree_handle {
 534                        return None;
 535                    }
 536                    changed_repos
 537                        .iter()
 538                        .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 539                    let unstaged_changes = unstaged_changes.as_ref()?.upgrade()?;
 540                    let snapshot = buffer.text_snapshot();
 541                    Some((unstaged_changes, snapshot, file.path.clone()))
 542                } else {
 543                    None
 544                }
 545            })
 546            .collect::<Vec<_>>();
 547
 548        if buffer_change_sets.is_empty() {
 549            return;
 550        }
 551
 552        cx.spawn(move |this, mut cx| async move {
 553            let snapshot =
 554                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 555            let diff_bases_by_buffer = cx
 556                .background_executor()
 557                .spawn(async move {
 558                    buffer_change_sets
 559                        .into_iter()
 560                        .filter_map(|(change_set, buffer_snapshot, path)| {
 561                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 562                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 563                            let base_text = local_repo_entry.repo().load_index_text(&relative_path);
 564                            Some((change_set, buffer_snapshot, base_text))
 565                        })
 566                        .collect::<Vec<_>>()
 567                })
 568                .await;
 569
 570            this.update(&mut cx, |this, cx| {
 571                for (change_set, buffer_snapshot, staged_text) in diff_bases_by_buffer {
 572                    change_set.update(cx, |change_set, cx| {
 573                        if let Some(staged_text) = staged_text.clone() {
 574                            let _ =
 575                                change_set.set_base_text(staged_text, buffer_snapshot.clone(), cx);
 576                        } else {
 577                            change_set.unset_base_text(buffer_snapshot.clone(), cx);
 578                        }
 579                    });
 580
 581                    if let Some((client, project_id)) = &this.downstream_client.clone() {
 582                        client
 583                            .send(proto::UpdateDiffBase {
 584                                project_id: *project_id,
 585                                buffer_id: buffer_snapshot.remote_id().to_proto(),
 586                                staged_text,
 587                            })
 588                            .log_err();
 589                    }
 590                }
 591            })
 592        })
 593        .detach_and_log_err(cx);
 594    }
 595
 596    fn local_worktree_entry_changed(
 597        this: &mut BufferStore,
 598        entry_id: ProjectEntryId,
 599        path: &Arc<Path>,
 600        worktree: &Model<worktree::Worktree>,
 601        snapshot: &worktree::Snapshot,
 602        cx: &mut ModelContext<BufferStore>,
 603    ) -> Option<()> {
 604        let project_path = ProjectPath {
 605            worktree_id: snapshot.id(),
 606            path: path.clone(),
 607        };
 608
 609        let buffer_id = {
 610            let local = this.as_local_mut()?;
 611            match local.local_buffer_ids_by_entry_id.get(&entry_id) {
 612                Some(&buffer_id) => buffer_id,
 613                None => local.local_buffer_ids_by_path.get(&project_path).copied()?,
 614            }
 615        };
 616
 617        let buffer = if let Some(buffer) = this.get(buffer_id) {
 618            Some(buffer)
 619        } else {
 620            this.opened_buffers.remove(&buffer_id);
 621            None
 622        };
 623
 624        let buffer = if let Some(buffer) = buffer {
 625            buffer
 626        } else {
 627            let this = this.as_local_mut()?;
 628            this.local_buffer_ids_by_path.remove(&project_path);
 629            this.local_buffer_ids_by_entry_id.remove(&entry_id);
 630            return None;
 631        };
 632
 633        let events = buffer.update(cx, |buffer, cx| {
 634            let local = this.as_local_mut()?;
 635            let file = buffer.file()?;
 636            let old_file = File::from_dyn(Some(file))?;
 637            if old_file.worktree != *worktree {
 638                return None;
 639            }
 640
 641            let snapshot_entry = old_file
 642                .entry_id
 643                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 644                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
 645
 646            let new_file = if let Some(entry) = snapshot_entry {
 647                File {
 648                    disk_state: match entry.mtime {
 649                        Some(mtime) => DiskState::Present { mtime },
 650                        None => old_file.disk_state,
 651                    },
 652                    is_local: true,
 653                    entry_id: Some(entry.id),
 654                    path: entry.path.clone(),
 655                    worktree: worktree.clone(),
 656                    is_private: entry.is_private,
 657                }
 658            } else {
 659                File {
 660                    disk_state: DiskState::Deleted,
 661                    is_local: true,
 662                    entry_id: old_file.entry_id,
 663                    path: old_file.path.clone(),
 664                    worktree: worktree.clone(),
 665                    is_private: old_file.is_private,
 666                }
 667            };
 668
 669            if new_file == *old_file {
 670                return None;
 671            }
 672
 673            let mut events = Vec::new();
 674            if new_file.path != old_file.path {
 675                local.local_buffer_ids_by_path.remove(&ProjectPath {
 676                    path: old_file.path.clone(),
 677                    worktree_id: old_file.worktree_id(cx),
 678                });
 679                local.local_buffer_ids_by_path.insert(
 680                    ProjectPath {
 681                        worktree_id: new_file.worktree_id(cx),
 682                        path: new_file.path.clone(),
 683                    },
 684                    buffer_id,
 685                );
 686                events.push(BufferStoreEvent::BufferChangedFilePath {
 687                    buffer: cx.handle(),
 688                    old_file: buffer.file().cloned(),
 689                });
 690            }
 691
 692            if new_file.entry_id != old_file.entry_id {
 693                if let Some(entry_id) = old_file.entry_id {
 694                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
 695                }
 696                if let Some(entry_id) = new_file.entry_id {
 697                    local
 698                        .local_buffer_ids_by_entry_id
 699                        .insert(entry_id, buffer_id);
 700                }
 701            }
 702
 703            if let Some((client, project_id)) = &this.downstream_client {
 704                client
 705                    .send(proto::UpdateBufferFile {
 706                        project_id: *project_id,
 707                        buffer_id: buffer_id.to_proto(),
 708                        file: Some(new_file.to_proto(cx)),
 709                    })
 710                    .ok();
 711            }
 712
 713            buffer.file_updated(Arc::new(new_file), cx);
 714            Some(events)
 715        })?;
 716
 717        for event in events {
 718            cx.emit(event);
 719        }
 720
 721        None
 722    }
 723
 724    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 725        let file = File::from_dyn(buffer.read(cx).file())?;
 726
 727        let remote_id = buffer.read(cx).remote_id();
 728        if let Some(entry_id) = file.entry_id {
 729            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 730                Some(_) => {
 731                    return None;
 732                }
 733                None => {
 734                    self.local_buffer_ids_by_entry_id
 735                        .insert(entry_id, remote_id);
 736                }
 737            }
 738        };
 739        self.local_buffer_ids_by_path.insert(
 740            ProjectPath {
 741                worktree_id: file.worktree_id(cx),
 742                path: file.path.clone(),
 743            },
 744            remote_id,
 745        );
 746
 747        Some(())
 748    }
 749
 750    fn save_buffer(
 751        &self,
 752        buffer: Model<Buffer>,
 753        cx: &mut ModelContext<BufferStore>,
 754    ) -> Task<Result<()>> {
 755        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 756            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 757        };
 758        let worktree = file.worktree.clone();
 759        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 760    }
 761
 762    fn save_buffer_as(
 763        &self,
 764        buffer: Model<Buffer>,
 765        path: ProjectPath,
 766        cx: &mut ModelContext<BufferStore>,
 767    ) -> Task<Result<()>> {
 768        let Some(worktree) = self
 769            .worktree_store
 770            .read(cx)
 771            .worktree_for_id(path.worktree_id, cx)
 772        else {
 773            return Task::ready(Err(anyhow!("no such worktree")));
 774        };
 775        self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
 776    }
 777
 778    fn open_buffer(
 779        &self,
 780        path: Arc<Path>,
 781        worktree: Model<Worktree>,
 782        cx: &mut ModelContext<BufferStore>,
 783    ) -> Task<Result<Model<Buffer>>> {
 784        let load_buffer = worktree.update(cx, |worktree, cx| {
 785            let load_file = worktree.load_file(path.as_ref(), cx);
 786            let reservation = cx.reserve_model();
 787            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 788            cx.spawn(move |_, mut cx| async move {
 789                let loaded = load_file.await?;
 790                let text_buffer = cx
 791                    .background_executor()
 792                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 793                    .await;
 794                cx.insert_model(reservation, |_| {
 795                    Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
 796                })
 797            })
 798        });
 799
 800        cx.spawn(move |this, mut cx| async move {
 801            let buffer = match load_buffer.await {
 802                Ok(buffer) => Ok(buffer),
 803                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 804                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 805                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 806                    Buffer::build(
 807                        text_buffer,
 808                        Some(Arc::new(File {
 809                            worktree,
 810                            path,
 811                            disk_state: DiskState::New,
 812                            entry_id: None,
 813                            is_local: true,
 814                            is_private: false,
 815                        })),
 816                        Capability::ReadWrite,
 817                    )
 818                }),
 819                Err(e) => Err(e),
 820            }?;
 821            this.update(&mut cx, |this, cx| {
 822                this.add_buffer(buffer.clone(), cx)?;
 823                let buffer_id = buffer.read(cx).remote_id();
 824                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 825                    let this = this.as_local_mut().unwrap();
 826                    this.local_buffer_ids_by_path.insert(
 827                        ProjectPath {
 828                            worktree_id: file.worktree_id(cx),
 829                            path: file.path.clone(),
 830                        },
 831                        buffer_id,
 832                    );
 833
 834                    if let Some(entry_id) = file.entry_id {
 835                        this.local_buffer_ids_by_entry_id
 836                            .insert(entry_id, buffer_id);
 837                    }
 838                }
 839
 840                anyhow::Ok(())
 841            })??;
 842
 843            Ok(buffer)
 844        })
 845    }
 846
 847    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 848        cx.spawn(|buffer_store, mut cx| async move {
 849            let buffer = cx.new_model(|cx| {
 850                Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
 851            })?;
 852            buffer_store.update(&mut cx, |buffer_store, cx| {
 853                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 854            })?;
 855            Ok(buffer)
 856        })
 857    }
 858
 859    fn reload_buffers(
 860        &self,
 861        buffers: HashSet<Model<Buffer>>,
 862        push_to_history: bool,
 863        cx: &mut ModelContext<BufferStore>,
 864    ) -> Task<Result<ProjectTransaction>> {
 865        cx.spawn(move |_, mut cx| async move {
 866            let mut project_transaction = ProjectTransaction::default();
 867            for buffer in buffers {
 868                let transaction = buffer
 869                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
 870                    .await?;
 871                buffer.update(&mut cx, |buffer, cx| {
 872                    if let Some(transaction) = transaction {
 873                        if !push_to_history {
 874                            buffer.forget_transaction(transaction.id);
 875                        }
 876                        project_transaction.0.insert(cx.handle(), transaction);
 877                    }
 878                })?;
 879            }
 880
 881            Ok(project_transaction)
 882        })
 883    }
 884}
 885
 886impl BufferStore {
 887    pub fn init(client: &AnyProtoClient) {
 888        client.add_model_message_handler(Self::handle_buffer_reloaded);
 889        client.add_model_message_handler(Self::handle_buffer_saved);
 890        client.add_model_message_handler(Self::handle_update_buffer_file);
 891        client.add_model_request_handler(Self::handle_save_buffer);
 892        client.add_model_request_handler(Self::handle_blame_buffer);
 893        client.add_model_request_handler(Self::handle_reload_buffers);
 894        client.add_model_request_handler(Self::handle_get_permalink_to_line);
 895        client.add_model_request_handler(Self::handle_get_staged_text);
 896        client.add_model_message_handler(Self::handle_update_diff_base);
 897    }
 898
 899    /// Creates a buffer store, optionally retaining its buffers.
 900    pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
 901        Self {
 902            state: BufferStoreState::Local(LocalBufferStore {
 903                local_buffer_ids_by_path: Default::default(),
 904                local_buffer_ids_by_entry_id: Default::default(),
 905                worktree_store: worktree_store.clone(),
 906                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
 907                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 908                        let this = this.as_local_mut().unwrap();
 909                        this.subscribe_to_worktree(worktree, cx);
 910                    }
 911                }),
 912            }),
 913            downstream_client: None,
 914            opened_buffers: Default::default(),
 915            shared_buffers: Default::default(),
 916            loading_buffers: Default::default(),
 917            loading_change_sets: Default::default(),
 918            worktree_store,
 919        }
 920    }
 921
 922    pub fn remote(
 923        worktree_store: Model<WorktreeStore>,
 924        upstream_client: AnyProtoClient,
 925        remote_id: u64,
 926        _cx: &mut ModelContext<Self>,
 927    ) -> Self {
 928        Self {
 929            state: BufferStoreState::Remote(RemoteBufferStore {
 930                shared_with_me: Default::default(),
 931                loading_remote_buffers_by_id: Default::default(),
 932                remote_buffer_listeners: Default::default(),
 933                project_id: remote_id,
 934                upstream_client,
 935                worktree_store: worktree_store.clone(),
 936            }),
 937            downstream_client: None,
 938            opened_buffers: Default::default(),
 939            loading_buffers: Default::default(),
 940            loading_change_sets: Default::default(),
 941            shared_buffers: Default::default(),
 942            worktree_store,
 943        }
 944    }
 945
 946    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
 947        match &mut self.state {
 948            BufferStoreState::Local(state) => Some(state),
 949            _ => None,
 950        }
 951    }
 952
 953    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
 954        match &mut self.state {
 955            BufferStoreState::Remote(state) => Some(state),
 956            _ => None,
 957        }
 958    }
 959
 960    fn as_remote(&self) -> Option<&RemoteBufferStore> {
 961        match &self.state {
 962            BufferStoreState::Remote(state) => Some(state),
 963            _ => None,
 964        }
 965    }
 966
 967    pub fn open_buffer(
 968        &mut self,
 969        project_path: ProjectPath,
 970        cx: &mut ModelContext<Self>,
 971    ) -> Task<Result<Model<Buffer>>> {
 972        if let Some(buffer) = self.get_by_path(&project_path, cx) {
 973            return Task::ready(Ok(buffer));
 974        }
 975
 976        let task = match self.loading_buffers.entry(project_path.clone()) {
 977            hash_map::Entry::Occupied(e) => e.get().clone(),
 978            hash_map::Entry::Vacant(entry) => {
 979                let path = project_path.path.clone();
 980                let Some(worktree) = self
 981                    .worktree_store
 982                    .read(cx)
 983                    .worktree_for_id(project_path.worktree_id, cx)
 984                else {
 985                    return Task::ready(Err(anyhow!("no such worktree")));
 986                };
 987                let load_buffer = match &self.state {
 988                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
 989                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
 990                };
 991
 992                entry
 993                    .insert(
 994                        cx.spawn(move |this, mut cx| async move {
 995                            let load_result = load_buffer.await;
 996                            this.update(&mut cx, |this, _cx| {
 997                                // Record the fact that the buffer is no longer loading.
 998                                this.loading_buffers.remove(&project_path);
 999                            })
1000                            .ok();
1001                            load_result.map_err(Arc::new)
1002                        })
1003                        .shared(),
1004                    )
1005                    .clone()
1006            }
1007        };
1008
1009        cx.background_executor()
1010            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1011    }
1012
1013    pub fn open_unstaged_changes(
1014        &mut self,
1015        buffer: Model<Buffer>,
1016        cx: &mut ModelContext<Self>,
1017    ) -> Task<Result<Model<BufferChangeSet>>> {
1018        let buffer_id = buffer.read(cx).remote_id();
1019        if let Some(change_set) = self.get_unstaged_changes(buffer_id) {
1020            return Task::ready(Ok(change_set));
1021        }
1022
1023        let task = match self.loading_change_sets.entry(buffer_id) {
1024            hash_map::Entry::Occupied(e) => e.get().clone(),
1025            hash_map::Entry::Vacant(entry) => {
1026                let load = match &self.state {
1027                    BufferStoreState::Local(this) => this.load_staged_text(&buffer, cx),
1028                    BufferStoreState::Remote(this) => this.load_staged_text(buffer_id, cx),
1029                };
1030
1031                entry
1032                    .insert(
1033                        cx.spawn(move |this, cx| async move {
1034                            Self::open_unstaged_changes_internal(this, load.await, buffer, cx)
1035                                .await
1036                                .map_err(Arc::new)
1037                        })
1038                        .shared(),
1039                    )
1040                    .clone()
1041            }
1042        };
1043
1044        cx.background_executor()
1045            .spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
1046    }
1047
1048    pub async fn open_unstaged_changes_internal(
1049        this: WeakModel<Self>,
1050        text: Result<Option<String>>,
1051        buffer: Model<Buffer>,
1052        mut cx: AsyncAppContext,
1053    ) -> Result<Model<BufferChangeSet>> {
1054        let text = match text {
1055            Err(e) => {
1056                this.update(&mut cx, |this, cx| {
1057                    let buffer_id = buffer.read(cx).remote_id();
1058                    this.loading_change_sets.remove(&buffer_id);
1059                })?;
1060                return Err(e);
1061            }
1062            Ok(text) => text,
1063        };
1064
1065        let change_set = buffer.update(&mut cx, |buffer, cx| {
1066            cx.new_model(|_| BufferChangeSet::new(buffer))
1067        })?;
1068
1069        if let Some(text) = text {
1070            change_set
1071                .update(&mut cx, |change_set, cx| {
1072                    let snapshot = buffer.read(cx).text_snapshot();
1073                    change_set.set_base_text(text, snapshot, cx)
1074                })?
1075                .await
1076                .ok();
1077        }
1078
1079        this.update(&mut cx, |this, cx| {
1080            let buffer_id = buffer.read(cx).remote_id();
1081            this.loading_change_sets.remove(&buffer_id);
1082            if let Some(OpenBuffer::Complete {
1083                unstaged_changes, ..
1084            }) = this.opened_buffers.get_mut(&buffer.read(cx).remote_id())
1085            {
1086                *unstaged_changes = Some(change_set.downgrade());
1087            }
1088        })?;
1089
1090        Ok(change_set)
1091    }
1092
1093    pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1094        match &self.state {
1095            BufferStoreState::Local(this) => this.create_buffer(cx),
1096            BufferStoreState::Remote(this) => this.create_buffer(cx),
1097        }
1098    }
1099
1100    pub fn save_buffer(
1101        &mut self,
1102        buffer: Model<Buffer>,
1103        cx: &mut ModelContext<Self>,
1104    ) -> Task<Result<()>> {
1105        match &mut self.state {
1106            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
1107            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
1108        }
1109    }
1110
1111    pub fn save_buffer_as(
1112        &mut self,
1113        buffer: Model<Buffer>,
1114        path: ProjectPath,
1115        cx: &mut ModelContext<Self>,
1116    ) -> Task<Result<()>> {
1117        let old_file = buffer.read(cx).file().cloned();
1118        let task = match &self.state {
1119            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
1120            BufferStoreState::Remote(this) => {
1121                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
1122            }
1123        };
1124        cx.spawn(|this, mut cx| async move {
1125            task.await?;
1126            this.update(&mut cx, |_, cx| {
1127                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1128            })
1129        })
1130    }
1131
1132    pub fn blame_buffer(
1133        &self,
1134        buffer: &Model<Buffer>,
1135        version: Option<clock::Global>,
1136        cx: &AppContext,
1137    ) -> Task<Result<Option<Blame>>> {
1138        let buffer = buffer.read(cx);
1139        let Some(file) = File::from_dyn(buffer.file()) else {
1140            return Task::ready(Err(anyhow!("buffer has no file")));
1141        };
1142
1143        match file.worktree.clone().read(cx) {
1144            Worktree::Local(worktree) => {
1145                let worktree = worktree.snapshot();
1146                let blame_params = maybe!({
1147                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1148                        Some(repo_for_path) => repo_for_path,
1149                        None => return Ok(None),
1150                    };
1151
1152                    let relative_path = repo_entry
1153                        .relativize(&worktree, &file.path)
1154                        .context("failed to relativize buffer path")?;
1155
1156                    let repo = local_repo_entry.repo().clone();
1157
1158                    let content = match version {
1159                        Some(version) => buffer.rope_for_version(&version).clone(),
1160                        None => buffer.as_rope().clone(),
1161                    };
1162
1163                    anyhow::Ok(Some((repo, relative_path, content)))
1164                });
1165
1166                cx.background_executor().spawn(async move {
1167                    let Some((repo, relative_path, content)) = blame_params? else {
1168                        return Ok(None);
1169                    };
1170                    repo.blame(&relative_path, content)
1171                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1172                        .map(Some)
1173                })
1174            }
1175            Worktree::Remote(worktree) => {
1176                let buffer_id = buffer.remote_id();
1177                let version = buffer.version();
1178                let project_id = worktree.project_id();
1179                let client = worktree.client();
1180                cx.spawn(|_| async move {
1181                    let response = client
1182                        .request(proto::BlameBuffer {
1183                            project_id,
1184                            buffer_id: buffer_id.into(),
1185                            version: serialize_version(&version),
1186                        })
1187                        .await?;
1188                    Ok(deserialize_blame_buffer_response(response))
1189                })
1190            }
1191        }
1192    }
1193
1194    pub fn get_permalink_to_line(
1195        &self,
1196        buffer: &Model<Buffer>,
1197        selection: Range<u32>,
1198        cx: &AppContext,
1199    ) -> Task<Result<url::Url>> {
1200        let buffer = buffer.read(cx);
1201        let Some(file) = File::from_dyn(buffer.file()) else {
1202            return Task::ready(Err(anyhow!("buffer has no file")));
1203        };
1204
1205        match file.worktree.clone().read(cx) {
1206            Worktree::Local(worktree) => {
1207                let Some(repo) = worktree.local_git_repo(file.path()) else {
1208                    return Task::ready(Err(anyhow!("no repository for buffer found")));
1209                };
1210
1211                let path = file.path().clone();
1212
1213                cx.spawn(|cx| async move {
1214                    const REMOTE_NAME: &str = "origin";
1215                    let origin_url = repo
1216                        .remote_url(REMOTE_NAME)
1217                        .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1218
1219                    let sha = repo
1220                        .head_sha()
1221                        .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1222
1223                    let provider_registry =
1224                        cx.update(GitHostingProviderRegistry::default_global)?;
1225
1226                    let (provider, remote) =
1227                        parse_git_remote_url(provider_registry, &origin_url)
1228                            .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1229
1230                    let path = path
1231                        .to_str()
1232                        .context("failed to convert buffer path to string")?;
1233
1234                    Ok(provider.build_permalink(
1235                        remote,
1236                        BuildPermalinkParams {
1237                            sha: &sha,
1238                            path,
1239                            selection: Some(selection),
1240                        },
1241                    ))
1242                })
1243            }
1244            Worktree::Remote(worktree) => {
1245                let buffer_id = buffer.remote_id();
1246                let project_id = worktree.project_id();
1247                let client = worktree.client();
1248                cx.spawn(|_| async move {
1249                    let response = client
1250                        .request(proto::GetPermalinkToLine {
1251                            project_id,
1252                            buffer_id: buffer_id.into(),
1253                            selection: Some(proto::Range {
1254                                start: selection.start as u64,
1255                                end: selection.end as u64,
1256                            }),
1257                        })
1258                        .await?;
1259
1260                    url::Url::parse(&response.permalink).context("failed to parse permalink")
1261                })
1262            }
1263        }
1264    }
1265
1266    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1267        let remote_id = buffer.read(cx).remote_id();
1268        let is_remote = buffer.read(cx).replica_id() != 0;
1269        let open_buffer = OpenBuffer::Complete {
1270            buffer: buffer.downgrade(),
1271            unstaged_changes: None,
1272        };
1273
1274        let handle = cx.handle().downgrade();
1275        buffer.update(cx, move |_, cx| {
1276            cx.on_release(move |buffer, cx| {
1277                handle
1278                    .update(cx, |_, cx| {
1279                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1280                    })
1281                    .ok();
1282            })
1283            .detach()
1284        });
1285
1286        match self.opened_buffers.entry(remote_id) {
1287            hash_map::Entry::Vacant(entry) => {
1288                entry.insert(open_buffer);
1289            }
1290            hash_map::Entry::Occupied(mut entry) => {
1291                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1292                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1293                } else if entry.get().upgrade().is_some() {
1294                    if is_remote {
1295                        return Ok(());
1296                    } else {
1297                        debug_panic!("buffer {} was already registered", remote_id);
1298                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1299                    }
1300                }
1301                entry.insert(open_buffer);
1302            }
1303        }
1304
1305        cx.subscribe(&buffer, Self::on_buffer_event).detach();
1306        cx.emit(BufferStoreEvent::BufferAdded(buffer));
1307        Ok(())
1308    }
1309
1310    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1311        self.opened_buffers
1312            .values()
1313            .filter_map(|buffer| buffer.upgrade())
1314    }
1315
1316    pub fn loading_buffers(
1317        &self,
1318    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Model<Buffer>>>)> {
1319        self.loading_buffers.iter().map(|(path, task)| {
1320            let task = task.clone();
1321            (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
1322        })
1323    }
1324
1325    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1326        self.buffers().find_map(|buffer| {
1327            let file = File::from_dyn(buffer.read(cx).file())?;
1328            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1329                Some(buffer)
1330            } else {
1331                None
1332            }
1333        })
1334    }
1335
1336    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1337        self.opened_buffers.get(&buffer_id)?.upgrade()
1338    }
1339
1340    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1341        self.get(buffer_id)
1342            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1343    }
1344
1345    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1346        self.get(buffer_id).or_else(|| {
1347            self.as_remote()
1348                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1349        })
1350    }
1351
1352    pub fn get_unstaged_changes(&self, buffer_id: BufferId) -> Option<Model<BufferChangeSet>> {
1353        if let OpenBuffer::Complete {
1354            unstaged_changes, ..
1355        } = self.opened_buffers.get(&buffer_id)?
1356        {
1357            unstaged_changes.as_ref()?.upgrade()
1358        } else {
1359            None
1360        }
1361    }
1362
1363    pub fn buffer_version_info(
1364        &self,
1365        cx: &AppContext,
1366    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1367        let buffers = self
1368            .buffers()
1369            .map(|buffer| {
1370                let buffer = buffer.read(cx);
1371                proto::BufferVersion {
1372                    id: buffer.remote_id().into(),
1373                    version: language::proto::serialize_version(&buffer.version),
1374                }
1375            })
1376            .collect();
1377        let incomplete_buffer_ids = self
1378            .as_remote()
1379            .map(|remote| remote.incomplete_buffer_ids())
1380            .unwrap_or_default();
1381        (buffers, incomplete_buffer_ids)
1382    }
1383
1384    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1385        for open_buffer in self.opened_buffers.values_mut() {
1386            if let Some(buffer) = open_buffer.upgrade() {
1387                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1388            }
1389        }
1390
1391        for buffer in self.buffers() {
1392            buffer.update(cx, |buffer, cx| {
1393                buffer.set_capability(Capability::ReadOnly, cx)
1394            });
1395        }
1396
1397        if let Some(remote) = self.as_remote_mut() {
1398            // Wake up all futures currently waiting on a buffer to get opened,
1399            // to give them a chance to fail now that we've disconnected.
1400            remote.remote_buffer_listeners.clear()
1401        }
1402    }
1403
1404    pub fn shared(
1405        &mut self,
1406        remote_id: u64,
1407        downstream_client: AnyProtoClient,
1408        _cx: &mut AppContext,
1409    ) {
1410        self.downstream_client = Some((downstream_client, remote_id));
1411    }
1412
1413    pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1414        self.downstream_client.take();
1415        self.forget_shared_buffers();
1416    }
1417
1418    pub fn discard_incomplete(&mut self) {
1419        self.opened_buffers
1420            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1421    }
1422
1423    pub fn find_search_candidates(
1424        &mut self,
1425        query: &SearchQuery,
1426        mut limit: usize,
1427        fs: Arc<dyn Fs>,
1428        cx: &mut ModelContext<Self>,
1429    ) -> Receiver<Model<Buffer>> {
1430        let (tx, rx) = smol::channel::unbounded();
1431        let mut open_buffers = HashSet::default();
1432        let mut unnamed_buffers = Vec::new();
1433        for handle in self.buffers() {
1434            let buffer = handle.read(cx);
1435            if let Some(entry_id) = buffer.entry_id(cx) {
1436                open_buffers.insert(entry_id);
1437            } else {
1438                limit = limit.saturating_sub(1);
1439                unnamed_buffers.push(handle)
1440            };
1441        }
1442
1443        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1444        let mut project_paths_rx = self
1445            .worktree_store
1446            .update(cx, |worktree_store, cx| {
1447                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1448            })
1449            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1450
1451        cx.spawn(|this, mut cx| async move {
1452            for buffer in unnamed_buffers {
1453                tx.send(buffer).await.ok();
1454            }
1455
1456            while let Some(project_paths) = project_paths_rx.next().await {
1457                let buffers = this.update(&mut cx, |this, cx| {
1458                    project_paths
1459                        .into_iter()
1460                        .map(|project_path| this.open_buffer(project_path, cx))
1461                        .collect::<Vec<_>>()
1462                })?;
1463                for buffer_task in buffers {
1464                    if let Some(buffer) = buffer_task.await.log_err() {
1465                        if tx.send(buffer).await.is_err() {
1466                            return anyhow::Ok(());
1467                        }
1468                    }
1469                }
1470            }
1471            anyhow::Ok(())
1472        })
1473        .detach();
1474        rx
1475    }
1476
1477    pub fn recalculate_buffer_diffs(
1478        &mut self,
1479        buffers: Vec<Model<Buffer>>,
1480        cx: &mut ModelContext<Self>,
1481    ) -> impl Future<Output = ()> {
1482        let mut futures = Vec::new();
1483        for buffer in buffers {
1484            let buffer = buffer.read(cx).text_snapshot();
1485            if let Some(OpenBuffer::Complete {
1486                unstaged_changes, ..
1487            }) = self.opened_buffers.get_mut(&buffer.remote_id())
1488            {
1489                if let Some(unstaged_changes) = unstaged_changes
1490                    .as_ref()
1491                    .and_then(|changes| changes.upgrade())
1492                {
1493                    unstaged_changes.update(cx, |unstaged_changes, cx| {
1494                        futures.push(unstaged_changes.recalculate_diff(buffer.clone(), cx));
1495                    });
1496                } else {
1497                    unstaged_changes.take();
1498                }
1499            }
1500        }
1501        async move {
1502            futures::future::join_all(futures).await;
1503        }
1504    }
1505
1506    fn on_buffer_event(
1507        &mut self,
1508        buffer: Model<Buffer>,
1509        event: &BufferEvent,
1510        cx: &mut ModelContext<Self>,
1511    ) {
1512        match event {
1513            BufferEvent::FileHandleChanged => {
1514                if let Some(local) = self.as_local_mut() {
1515                    local.buffer_changed_file(buffer, cx);
1516                }
1517            }
1518            BufferEvent::Reloaded => {
1519                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1520                    return;
1521                };
1522                let buffer = buffer.read(cx);
1523                downstream_client
1524                    .send(proto::BufferReloaded {
1525                        project_id: *project_id,
1526                        buffer_id: buffer.remote_id().to_proto(),
1527                        version: serialize_version(&buffer.version()),
1528                        mtime: buffer.saved_mtime().map(|t| t.into()),
1529                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1530                    })
1531                    .log_err();
1532            }
1533            _ => {}
1534        }
1535    }
1536
1537    pub async fn handle_update_buffer(
1538        this: Model<Self>,
1539        envelope: TypedEnvelope<proto::UpdateBuffer>,
1540        mut cx: AsyncAppContext,
1541    ) -> Result<proto::Ack> {
1542        let payload = envelope.payload.clone();
1543        let buffer_id = BufferId::new(payload.buffer_id)?;
1544        let ops = payload
1545            .operations
1546            .into_iter()
1547            .map(language::proto::deserialize_operation)
1548            .collect::<Result<Vec<_>, _>>()?;
1549        this.update(&mut cx, |this, cx| {
1550            match this.opened_buffers.entry(buffer_id) {
1551                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1552                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1553                    OpenBuffer::Complete { buffer, .. } => {
1554                        if let Some(buffer) = buffer.upgrade() {
1555                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1556                        }
1557                    }
1558                },
1559                hash_map::Entry::Vacant(e) => {
1560                    e.insert(OpenBuffer::Operations(ops));
1561                }
1562            }
1563            Ok(proto::Ack {})
1564        })?
1565    }
1566
1567    pub fn handle_synchronize_buffers(
1568        &mut self,
1569        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1570        cx: &mut ModelContext<Self>,
1571        client: Arc<Client>,
1572    ) -> Result<proto::SynchronizeBuffersResponse> {
1573        let project_id = envelope.payload.project_id;
1574        let mut response = proto::SynchronizeBuffersResponse {
1575            buffers: Default::default(),
1576        };
1577        let Some(guest_id) = envelope.original_sender_id else {
1578            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1579        };
1580
1581        self.shared_buffers.entry(guest_id).or_default().clear();
1582        for buffer in envelope.payload.buffers {
1583            let buffer_id = BufferId::new(buffer.id)?;
1584            let remote_version = language::proto::deserialize_version(&buffer.version);
1585            if let Some(buffer) = self.get(buffer_id) {
1586                self.shared_buffers
1587                    .entry(guest_id)
1588                    .or_default()
1589                    .entry(buffer_id)
1590                    .or_insert_with(|| SharedBuffer {
1591                        buffer: buffer.clone(),
1592                        unstaged_changes: None,
1593                    });
1594
1595                let buffer = buffer.read(cx);
1596                response.buffers.push(proto::BufferVersion {
1597                    id: buffer_id.into(),
1598                    version: language::proto::serialize_version(&buffer.version),
1599                });
1600
1601                let operations = buffer.serialize_ops(Some(remote_version), cx);
1602                let client = client.clone();
1603                if let Some(file) = buffer.file() {
1604                    client
1605                        .send(proto::UpdateBufferFile {
1606                            project_id,
1607                            buffer_id: buffer_id.into(),
1608                            file: Some(file.to_proto(cx)),
1609                        })
1610                        .log_err();
1611                }
1612
1613                // todo!(max): do something
1614                // client
1615                //     .send(proto::UpdateStagedText {
1616                //         project_id,
1617                //         buffer_id: buffer_id.into(),
1618                //         diff_base: buffer.diff_base().map(ToString::to_string),
1619                //     })
1620                //     .log_err();
1621
1622                client
1623                    .send(proto::BufferReloaded {
1624                        project_id,
1625                        buffer_id: buffer_id.into(),
1626                        version: language::proto::serialize_version(buffer.saved_version()),
1627                        mtime: buffer.saved_mtime().map(|time| time.into()),
1628                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1629                            as i32,
1630                    })
1631                    .log_err();
1632
1633                cx.background_executor()
1634                    .spawn(
1635                        async move {
1636                            let operations = operations.await;
1637                            for chunk in split_operations(operations) {
1638                                client
1639                                    .request(proto::UpdateBuffer {
1640                                        project_id,
1641                                        buffer_id: buffer_id.into(),
1642                                        operations: chunk,
1643                                    })
1644                                    .await?;
1645                            }
1646                            anyhow::Ok(())
1647                        }
1648                        .log_err(),
1649                    )
1650                    .detach();
1651            }
1652        }
1653        Ok(response)
1654    }
1655
1656    pub fn handle_create_buffer_for_peer(
1657        &mut self,
1658        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1659        replica_id: u16,
1660        capability: Capability,
1661        cx: &mut ModelContext<Self>,
1662    ) -> Result<()> {
1663        let Some(remote) = self.as_remote_mut() else {
1664            return Err(anyhow!("buffer store is not a remote"));
1665        };
1666
1667        if let Some(buffer) =
1668            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1669        {
1670            self.add_buffer(buffer, cx)?;
1671        }
1672
1673        Ok(())
1674    }
1675
1676    pub async fn handle_update_buffer_file(
1677        this: Model<Self>,
1678        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1679        mut cx: AsyncAppContext,
1680    ) -> Result<()> {
1681        let buffer_id = envelope.payload.buffer_id;
1682        let buffer_id = BufferId::new(buffer_id)?;
1683
1684        this.update(&mut cx, |this, cx| {
1685            let payload = envelope.payload.clone();
1686            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1687                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1688                let worktree = this
1689                    .worktree_store
1690                    .read(cx)
1691                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1692                    .ok_or_else(|| anyhow!("no such worktree"))?;
1693                let file = File::from_proto(file, worktree, cx)?;
1694                let old_file = buffer.update(cx, |buffer, cx| {
1695                    let old_file = buffer.file().cloned();
1696                    let new_path = file.path.clone();
1697                    buffer.file_updated(Arc::new(file), cx);
1698                    if old_file
1699                        .as_ref()
1700                        .map_or(true, |old| *old.path() != new_path)
1701                    {
1702                        Some(old_file)
1703                    } else {
1704                        None
1705                    }
1706                });
1707                if let Some(old_file) = old_file {
1708                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1709                }
1710            }
1711            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1712                downstream_client
1713                    .send(proto::UpdateBufferFile {
1714                        project_id: *project_id,
1715                        buffer_id: buffer_id.into(),
1716                        file: envelope.payload.file,
1717                    })
1718                    .log_err();
1719            }
1720            Ok(())
1721        })?
1722    }
1723
1724    pub async fn handle_save_buffer(
1725        this: Model<Self>,
1726        envelope: TypedEnvelope<proto::SaveBuffer>,
1727        mut cx: AsyncAppContext,
1728    ) -> Result<proto::BufferSaved> {
1729        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1730        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1731            anyhow::Ok((
1732                this.get_existing(buffer_id)?,
1733                this.downstream_client
1734                    .as_ref()
1735                    .map(|(_, project_id)| *project_id)
1736                    .context("project is not shared")?,
1737            ))
1738        })??;
1739        buffer
1740            .update(&mut cx, |buffer, _| {
1741                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1742            })?
1743            .await?;
1744        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1745
1746        if let Some(new_path) = envelope.payload.new_path {
1747            let new_path = ProjectPath::from_proto(new_path);
1748            this.update(&mut cx, |this, cx| {
1749                this.save_buffer_as(buffer.clone(), new_path, cx)
1750            })?
1751            .await?;
1752        } else {
1753            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1754                .await?;
1755        }
1756
1757        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1758            project_id,
1759            buffer_id: buffer_id.into(),
1760            version: serialize_version(buffer.saved_version()),
1761            mtime: buffer.saved_mtime().map(|time| time.into()),
1762        })
1763    }
1764
1765    pub async fn handle_close_buffer(
1766        this: Model<Self>,
1767        envelope: TypedEnvelope<proto::CloseBuffer>,
1768        mut cx: AsyncAppContext,
1769    ) -> Result<()> {
1770        let peer_id = envelope.sender_id;
1771        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1772        this.update(&mut cx, |this, _| {
1773            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1774                if shared.remove(&buffer_id).is_some() {
1775                    if shared.is_empty() {
1776                        this.shared_buffers.remove(&peer_id);
1777                    }
1778                    return;
1779                }
1780            }
1781            debug_panic!(
1782                "peer_id {} closed buffer_id {} which was either not open or already closed",
1783                peer_id,
1784                buffer_id
1785            )
1786        })
1787    }
1788
1789    pub async fn handle_buffer_saved(
1790        this: Model<Self>,
1791        envelope: TypedEnvelope<proto::BufferSaved>,
1792        mut cx: AsyncAppContext,
1793    ) -> Result<()> {
1794        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1795        let version = deserialize_version(&envelope.payload.version);
1796        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1797        this.update(&mut cx, move |this, cx| {
1798            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1799                buffer.update(cx, |buffer, cx| {
1800                    buffer.did_save(version, mtime, cx);
1801                });
1802            }
1803
1804            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1805                downstream_client
1806                    .send(proto::BufferSaved {
1807                        project_id: *project_id,
1808                        buffer_id: buffer_id.into(),
1809                        mtime: envelope.payload.mtime,
1810                        version: envelope.payload.version,
1811                    })
1812                    .log_err();
1813            }
1814        })
1815    }
1816
1817    pub async fn handle_buffer_reloaded(
1818        this: Model<Self>,
1819        envelope: TypedEnvelope<proto::BufferReloaded>,
1820        mut cx: AsyncAppContext,
1821    ) -> Result<()> {
1822        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1823        let version = deserialize_version(&envelope.payload.version);
1824        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1825        let line_ending = deserialize_line_ending(
1826            proto::LineEnding::from_i32(envelope.payload.line_ending)
1827                .ok_or_else(|| anyhow!("missing line ending"))?,
1828        );
1829        this.update(&mut cx, |this, cx| {
1830            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1831                buffer.update(cx, |buffer, cx| {
1832                    buffer.did_reload(version, line_ending, mtime, cx);
1833                });
1834            }
1835
1836            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1837                downstream_client
1838                    .send(proto::BufferReloaded {
1839                        project_id: *project_id,
1840                        buffer_id: buffer_id.into(),
1841                        mtime: envelope.payload.mtime,
1842                        version: envelope.payload.version,
1843                        line_ending: envelope.payload.line_ending,
1844                    })
1845                    .log_err();
1846            }
1847        })
1848    }
1849
1850    pub async fn handle_blame_buffer(
1851        this: Model<Self>,
1852        envelope: TypedEnvelope<proto::BlameBuffer>,
1853        mut cx: AsyncAppContext,
1854    ) -> Result<proto::BlameBufferResponse> {
1855        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1856        let version = deserialize_version(&envelope.payload.version);
1857        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1858        buffer
1859            .update(&mut cx, |buffer, _| {
1860                buffer.wait_for_version(version.clone())
1861            })?
1862            .await?;
1863        let blame = this
1864            .update(&mut cx, |this, cx| {
1865                this.blame_buffer(&buffer, Some(version), cx)
1866            })?
1867            .await?;
1868        Ok(serialize_blame_buffer_response(blame))
1869    }
1870
1871    pub async fn handle_get_permalink_to_line(
1872        this: Model<Self>,
1873        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1874        mut cx: AsyncAppContext,
1875    ) -> Result<proto::GetPermalinkToLineResponse> {
1876        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1877        // let version = deserialize_version(&envelope.payload.version);
1878        let selection = {
1879            let proto_selection = envelope
1880                .payload
1881                .selection
1882                .context("no selection to get permalink for defined")?;
1883            proto_selection.start as u32..proto_selection.end as u32
1884        };
1885        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1886        let permalink = this
1887            .update(&mut cx, |this, cx| {
1888                this.get_permalink_to_line(&buffer, selection, cx)
1889            })?
1890            .await?;
1891        Ok(proto::GetPermalinkToLineResponse {
1892            permalink: permalink.to_string(),
1893        })
1894    }
1895
1896    pub async fn handle_get_staged_text(
1897        this: Model<Self>,
1898        request: TypedEnvelope<proto::GetStagedText>,
1899        mut cx: AsyncAppContext,
1900    ) -> Result<proto::GetStagedTextResponse> {
1901        let buffer_id = BufferId::new(request.payload.buffer_id)?;
1902        let change_set = this
1903            .update(&mut cx, |this, cx| {
1904                let buffer = this.get(buffer_id)?;
1905                Some(this.open_unstaged_changes(buffer, cx))
1906            })?
1907            .ok_or_else(|| anyhow!("no such buffer"))?
1908            .await?;
1909        this.update(&mut cx, |this, _| {
1910            let shared_buffers = this
1911                .shared_buffers
1912                .entry(request.original_sender_id.unwrap_or(request.sender_id))
1913                .or_default();
1914            debug_assert!(shared_buffers.contains_key(&buffer_id));
1915            if let Some(shared) = shared_buffers.get_mut(&buffer_id) {
1916                shared.unstaged_changes = Some(change_set.clone());
1917            }
1918        })?;
1919        let staged_text = change_set.read_with(&cx, |change_set, cx| {
1920            change_set
1921                .base_text
1922                .as_ref()
1923                .map(|buffer| buffer.read(cx).text())
1924        })?;
1925        Ok(proto::GetStagedTextResponse { staged_text })
1926    }
1927
1928    pub async fn handle_update_diff_base(
1929        this: Model<Self>,
1930        request: TypedEnvelope<proto::UpdateDiffBase>,
1931        mut cx: AsyncAppContext,
1932    ) -> Result<()> {
1933        let buffer_id = BufferId::new(request.payload.buffer_id)?;
1934        let Some((buffer, change_set)) = this.update(&mut cx, |this, _| {
1935            if let OpenBuffer::Complete {
1936                unstaged_changes,
1937                buffer,
1938            } = this.opened_buffers.get(&buffer_id)?
1939            {
1940                Some((buffer.upgrade()?, unstaged_changes.as_ref()?.upgrade()?))
1941            } else {
1942                None
1943            }
1944        })?
1945        else {
1946            return Ok(());
1947        };
1948        change_set.update(&mut cx, |change_set, cx| {
1949            if let Some(staged_text) = request.payload.staged_text {
1950                let _ = change_set.set_base_text(staged_text, buffer.read(cx).text_snapshot(), cx);
1951            } else {
1952                change_set.unset_base_text(buffer.read(cx).text_snapshot(), cx)
1953            }
1954        })?;
1955        Ok(())
1956    }
1957
1958    pub fn reload_buffers(
1959        &self,
1960        buffers: HashSet<Model<Buffer>>,
1961        push_to_history: bool,
1962        cx: &mut ModelContext<Self>,
1963    ) -> Task<Result<ProjectTransaction>> {
1964        if buffers.is_empty() {
1965            return Task::ready(Ok(ProjectTransaction::default()));
1966        }
1967        match &self.state {
1968            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1969            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1970        }
1971    }
1972
1973    async fn handle_reload_buffers(
1974        this: Model<Self>,
1975        envelope: TypedEnvelope<proto::ReloadBuffers>,
1976        mut cx: AsyncAppContext,
1977    ) -> Result<proto::ReloadBuffersResponse> {
1978        let sender_id = envelope.original_sender_id().unwrap_or_default();
1979        let reload = this.update(&mut cx, |this, cx| {
1980            let mut buffers = HashSet::default();
1981            for buffer_id in &envelope.payload.buffer_ids {
1982                let buffer_id = BufferId::new(*buffer_id)?;
1983                buffers.insert(this.get_existing(buffer_id)?);
1984            }
1985            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1986        })??;
1987
1988        let project_transaction = reload.await?;
1989        let project_transaction = this.update(&mut cx, |this, cx| {
1990            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1991        })?;
1992        Ok(proto::ReloadBuffersResponse {
1993            transaction: Some(project_transaction),
1994        })
1995    }
1996
1997    pub fn create_buffer_for_peer(
1998        &mut self,
1999        buffer: &Model<Buffer>,
2000        peer_id: proto::PeerId,
2001        cx: &mut ModelContext<Self>,
2002    ) -> Task<Result<()>> {
2003        let buffer_id = buffer.read(cx).remote_id();
2004        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
2005        if shared_buffers.contains_key(&buffer_id) {
2006            return Task::ready(Ok(()));
2007        }
2008        shared_buffers.insert(
2009            buffer_id,
2010            SharedBuffer {
2011                buffer: buffer.clone(),
2012                unstaged_changes: None,
2013            },
2014        );
2015
2016        let Some((client, project_id)) = self.downstream_client.clone() else {
2017            return Task::ready(Ok(()));
2018        };
2019
2020        cx.spawn(|this, mut cx| async move {
2021            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
2022                return anyhow::Ok(());
2023            };
2024
2025            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
2026            let operations = operations.await;
2027            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
2028
2029            let initial_state = proto::CreateBufferForPeer {
2030                project_id,
2031                peer_id: Some(peer_id),
2032                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
2033            };
2034
2035            if client.send(initial_state).log_err().is_some() {
2036                let client = client.clone();
2037                cx.background_executor()
2038                    .spawn(async move {
2039                        let mut chunks = split_operations(operations).peekable();
2040                        while let Some(chunk) = chunks.next() {
2041                            let is_last = chunks.peek().is_none();
2042                            client.send(proto::CreateBufferForPeer {
2043                                project_id,
2044                                peer_id: Some(peer_id),
2045                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
2046                                    proto::BufferChunk {
2047                                        buffer_id: buffer_id.into(),
2048                                        operations: chunk,
2049                                        is_last,
2050                                    },
2051                                )),
2052                            })?;
2053                        }
2054                        anyhow::Ok(())
2055                    })
2056                    .await
2057                    .log_err();
2058            }
2059            Ok(())
2060        })
2061    }
2062
2063    pub fn forget_shared_buffers(&mut self) {
2064        self.shared_buffers.clear();
2065    }
2066
2067    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
2068        self.shared_buffers.remove(peer_id);
2069    }
2070
2071    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2072        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2073            self.shared_buffers.insert(new_peer_id, buffers);
2074        }
2075    }
2076
2077    pub fn has_shared_buffers(&self) -> bool {
2078        !self.shared_buffers.is_empty()
2079    }
2080
2081    pub fn create_local_buffer(
2082        &mut self,
2083        text: &str,
2084        language: Option<Arc<Language>>,
2085        cx: &mut ModelContext<Self>,
2086    ) -> Model<Buffer> {
2087        let buffer = cx.new_model(|cx| {
2088            Buffer::local(text, cx)
2089                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2090        });
2091
2092        self.add_buffer(buffer.clone(), cx).log_err();
2093        let buffer_id = buffer.read(cx).remote_id();
2094
2095        let this = self
2096            .as_local_mut()
2097            .expect("local-only method called in a non-local context");
2098        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2099            this.local_buffer_ids_by_path.insert(
2100                ProjectPath {
2101                    worktree_id: file.worktree_id(cx),
2102                    path: file.path.clone(),
2103                },
2104                buffer_id,
2105            );
2106
2107            if let Some(entry_id) = file.entry_id {
2108                this.local_buffer_ids_by_entry_id
2109                    .insert(entry_id, buffer_id);
2110            }
2111        }
2112        buffer
2113    }
2114
2115    pub fn deserialize_project_transaction(
2116        &mut self,
2117        message: proto::ProjectTransaction,
2118        push_to_history: bool,
2119        cx: &mut ModelContext<Self>,
2120    ) -> Task<Result<ProjectTransaction>> {
2121        if let Some(this) = self.as_remote_mut() {
2122            this.deserialize_project_transaction(message, push_to_history, cx)
2123        } else {
2124            debug_panic!("not a remote buffer store");
2125            Task::ready(Err(anyhow!("not a remote buffer store")))
2126        }
2127    }
2128
2129    pub fn wait_for_remote_buffer(
2130        &mut self,
2131        id: BufferId,
2132        cx: &mut ModelContext<BufferStore>,
2133    ) -> Task<Result<Model<Buffer>>> {
2134        if let Some(this) = self.as_remote_mut() {
2135            this.wait_for_remote_buffer(id, cx)
2136        } else {
2137            debug_panic!("not a remote buffer store");
2138            Task::ready(Err(anyhow!("not a remote buffer store")))
2139        }
2140    }
2141
2142    pub fn serialize_project_transaction_for_peer(
2143        &mut self,
2144        project_transaction: ProjectTransaction,
2145        peer_id: proto::PeerId,
2146        cx: &mut ModelContext<Self>,
2147    ) -> proto::ProjectTransaction {
2148        let mut serialized_transaction = proto::ProjectTransaction {
2149            buffer_ids: Default::default(),
2150            transactions: Default::default(),
2151        };
2152        for (buffer, transaction) in project_transaction.0 {
2153            self.create_buffer_for_peer(&buffer, peer_id, cx)
2154                .detach_and_log_err(cx);
2155            serialized_transaction
2156                .buffer_ids
2157                .push(buffer.read(cx).remote_id().into());
2158            serialized_transaction
2159                .transactions
2160                .push(language::proto::serialize_transaction(&transaction));
2161        }
2162        serialized_transaction
2163    }
2164}
2165
2166impl BufferChangeSet {
2167    pub fn new(buffer: &text::BufferSnapshot) -> Self {
2168        Self {
2169            buffer_id: buffer.remote_id(),
2170            base_text: None,
2171            diff_to_buffer: git::diff::BufferDiff::new(buffer),
2172            recalculate_diff_task: None,
2173            diff_updated_futures: Vec::new(),
2174            base_text_version: 0,
2175        }
2176    }
2177
2178    #[cfg(any(test, feature = "test-support"))]
2179    pub fn new_with_base_text(
2180        base_text: String,
2181        buffer: text::BufferSnapshot,
2182        cx: &mut ModelContext<Self>,
2183    ) -> Self {
2184        let mut this = Self::new(&buffer);
2185        let _ = this.set_base_text(base_text, buffer, cx);
2186        this
2187    }
2188
2189    pub fn diff_hunks_intersecting_range<'a>(
2190        &'a self,
2191        range: Range<text::Anchor>,
2192        buffer_snapshot: &'a text::BufferSnapshot,
2193    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2194        self.diff_to_buffer
2195            .hunks_intersecting_range(range, buffer_snapshot)
2196    }
2197
2198    pub fn diff_hunks_intersecting_range_rev<'a>(
2199        &'a self,
2200        range: Range<text::Anchor>,
2201        buffer_snapshot: &'a text::BufferSnapshot,
2202    ) -> impl 'a + Iterator<Item = git::diff::DiffHunk> {
2203        self.diff_to_buffer
2204            .hunks_intersecting_range_rev(range, buffer_snapshot)
2205    }
2206
2207    #[cfg(any(test, feature = "test-support"))]
2208    pub fn base_text_string(&self, cx: &AppContext) -> Option<String> {
2209        self.base_text.as_ref().map(|buffer| buffer.read(cx).text())
2210    }
2211
2212    pub fn set_base_text(
2213        &mut self,
2214        mut base_text: String,
2215        buffer_snapshot: text::BufferSnapshot,
2216        cx: &mut ModelContext<Self>,
2217    ) -> oneshot::Receiver<()> {
2218        LineEnding::normalize(&mut base_text);
2219        self.recalculate_diff_internal(base_text, buffer_snapshot, true, cx)
2220    }
2221
2222    pub fn unset_base_text(
2223        &mut self,
2224        buffer_snapshot: text::BufferSnapshot,
2225        cx: &mut ModelContext<Self>,
2226    ) {
2227        if self.base_text.is_some() {
2228            self.base_text = None;
2229            self.diff_to_buffer = BufferDiff::new(&buffer_snapshot);
2230            self.recalculate_diff_task.take();
2231            self.base_text_version += 1;
2232            cx.notify();
2233        }
2234    }
2235
2236    pub fn recalculate_diff(
2237        &mut self,
2238        buffer_snapshot: text::BufferSnapshot,
2239        cx: &mut ModelContext<Self>,
2240    ) -> oneshot::Receiver<()> {
2241        if let Some(base_text) = self.base_text.clone() {
2242            self.recalculate_diff_internal(base_text.read(cx).text(), buffer_snapshot, false, cx)
2243        } else {
2244            oneshot::channel().1
2245        }
2246    }
2247
2248    fn recalculate_diff_internal(
2249        &mut self,
2250        base_text: String,
2251        buffer_snapshot: text::BufferSnapshot,
2252        base_text_changed: bool,
2253        cx: &mut ModelContext<Self>,
2254    ) -> oneshot::Receiver<()> {
2255        let (tx, rx) = oneshot::channel();
2256        self.diff_updated_futures.push(tx);
2257        self.recalculate_diff_task = Some(cx.spawn(|this, mut cx| async move {
2258            let (base_text, diff) = cx
2259                .background_executor()
2260                .spawn(async move {
2261                    let diff = BufferDiff::build(&base_text, &buffer_snapshot).await;
2262                    (base_text, diff)
2263                })
2264                .await;
2265            this.update(&mut cx, |this, cx| {
2266                if base_text_changed {
2267                    this.base_text_version += 1;
2268                    this.base_text = Some(cx.new_model(|cx| {
2269                        Buffer::local_normalized(Rope::from(base_text), LineEnding::default(), cx)
2270                    }));
2271                }
2272                this.diff_to_buffer = diff;
2273                this.recalculate_diff_task.take();
2274                for tx in this.diff_updated_futures.drain(..) {
2275                    tx.send(()).ok();
2276                }
2277                cx.notify();
2278            })?;
2279            Ok(())
2280        }));
2281        rx
2282    }
2283}
2284
2285impl OpenBuffer {
2286    fn upgrade(&self) -> Option<Model<Buffer>> {
2287        match self {
2288            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
2289            OpenBuffer::Operations(_) => None,
2290        }
2291    }
2292}
2293
2294fn is_not_found_error(error: &anyhow::Error) -> bool {
2295    error
2296        .root_cause()
2297        .downcast_ref::<io::Error>()
2298        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2299}
2300
2301fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2302    let Some(blame) = blame else {
2303        return proto::BlameBufferResponse {
2304            blame_response: None,
2305        };
2306    };
2307
2308    let entries = blame
2309        .entries
2310        .into_iter()
2311        .map(|entry| proto::BlameEntry {
2312            sha: entry.sha.as_bytes().into(),
2313            start_line: entry.range.start,
2314            end_line: entry.range.end,
2315            original_line_number: entry.original_line_number,
2316            author: entry.author.clone(),
2317            author_mail: entry.author_mail.clone(),
2318            author_time: entry.author_time,
2319            author_tz: entry.author_tz.clone(),
2320            committer: entry.committer.clone(),
2321            committer_mail: entry.committer_mail.clone(),
2322            committer_time: entry.committer_time,
2323            committer_tz: entry.committer_tz.clone(),
2324            summary: entry.summary.clone(),
2325            previous: entry.previous.clone(),
2326            filename: entry.filename.clone(),
2327        })
2328        .collect::<Vec<_>>();
2329
2330    let messages = blame
2331        .messages
2332        .into_iter()
2333        .map(|(oid, message)| proto::CommitMessage {
2334            oid: oid.as_bytes().into(),
2335            message,
2336        })
2337        .collect::<Vec<_>>();
2338
2339    let permalinks = blame
2340        .permalinks
2341        .into_iter()
2342        .map(|(oid, url)| proto::CommitPermalink {
2343            oid: oid.as_bytes().into(),
2344            permalink: url.to_string(),
2345        })
2346        .collect::<Vec<_>>();
2347
2348    proto::BlameBufferResponse {
2349        blame_response: Some(proto::blame_buffer_response::BlameResponse {
2350            entries,
2351            messages,
2352            permalinks,
2353            remote_url: blame.remote_url,
2354        }),
2355    }
2356}
2357
2358fn deserialize_blame_buffer_response(
2359    response: proto::BlameBufferResponse,
2360) -> Option<git::blame::Blame> {
2361    let response = response.blame_response?;
2362    let entries = response
2363        .entries
2364        .into_iter()
2365        .filter_map(|entry| {
2366            Some(git::blame::BlameEntry {
2367                sha: git::Oid::from_bytes(&entry.sha).ok()?,
2368                range: entry.start_line..entry.end_line,
2369                original_line_number: entry.original_line_number,
2370                committer: entry.committer,
2371                committer_time: entry.committer_time,
2372                committer_tz: entry.committer_tz,
2373                committer_mail: entry.committer_mail,
2374                author: entry.author,
2375                author_mail: entry.author_mail,
2376                author_time: entry.author_time,
2377                author_tz: entry.author_tz,
2378                summary: entry.summary,
2379                previous: entry.previous,
2380                filename: entry.filename,
2381            })
2382        })
2383        .collect::<Vec<_>>();
2384
2385    let messages = response
2386        .messages
2387        .into_iter()
2388        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2389        .collect::<HashMap<_, _>>();
2390
2391    let permalinks = response
2392        .permalinks
2393        .into_iter()
2394        .filter_map(|permalink| {
2395            Some((
2396                git::Oid::from_bytes(&permalink.oid).ok()?,
2397                Url::from_str(&permalink.permalink).ok()?,
2398            ))
2399        })
2400        .collect::<HashMap<_, _>>();
2401
2402    Some(Blame {
2403        entries,
2404        permalinks,
2405        messages,
2406        remote_url: response.remote_url,
2407    })
2408}