buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, ProjectPath,
   5};
   6use ::git::{parse_git_remote_url, BuildPermalinkParams, GitHostingProviderRegistry};
   7use anyhow::{anyhow, Context as _, Result};
   8use client::Client;
   9use collections::{hash_map, HashMap, HashSet};
  10use fs::Fs;
  11use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
  12use git::blame::Blame;
  13use gpui::{
  14    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
  15    Task, WeakModel,
  16};
  17use http_client::Url;
  18use language::{
  19    proto::{
  20        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  21        split_operations,
  22    },
  23    Buffer, BufferEvent, Capability, File as _, Language, Operation,
  24};
  25use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  26use smol::channel::Receiver;
  27use std::{io, ops::Range, path::Path, str::FromStr as _, sync::Arc, time::Instant};
  28use text::BufferId;
  29use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  30use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
  31
  32trait BufferStoreImpl {
  33    fn open_buffer(
  34        &self,
  35        path: Arc<Path>,
  36        worktree: Model<Worktree>,
  37        cx: &mut ModelContext<BufferStore>,
  38    ) -> Task<Result<Model<Buffer>>>;
  39
  40    fn save_buffer(
  41        &self,
  42        buffer: Model<Buffer>,
  43        cx: &mut ModelContext<BufferStore>,
  44    ) -> Task<Result<()>>;
  45
  46    fn save_buffer_as(
  47        &self,
  48        buffer: Model<Buffer>,
  49        path: ProjectPath,
  50        cx: &mut ModelContext<BufferStore>,
  51    ) -> Task<Result<()>>;
  52
  53    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
  54
  55    fn reload_buffers(
  56        &self,
  57        buffers: HashSet<Model<Buffer>>,
  58        push_to_history: bool,
  59        cx: &mut ModelContext<BufferStore>,
  60    ) -> Task<Result<ProjectTransaction>>;
  61
  62    fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
  63    fn as_local(&self) -> Option<Model<LocalBufferStore>>;
  64}
  65
  66struct RemoteBufferStore {
  67    shared_with_me: HashSet<Model<Buffer>>,
  68    upstream_client: AnyProtoClient,
  69    project_id: u64,
  70    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  71    remote_buffer_listeners:
  72        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  73    worktree_store: Model<WorktreeStore>,
  74    buffer_store: WeakModel<BufferStore>,
  75}
  76
  77struct LocalBufferStore {
  78    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  79    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  80    buffer_store: WeakModel<BufferStore>,
  81    worktree_store: Model<WorktreeStore>,
  82    _subscription: Subscription,
  83}
  84
  85/// A set of open buffers.
  86pub struct BufferStore {
  87    state: Box<dyn BufferStoreImpl>,
  88    #[allow(clippy::type_complexity)]
  89    loading_buffers_by_path: HashMap<
  90        ProjectPath,
  91        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  92    >,
  93    worktree_store: Model<WorktreeStore>,
  94    opened_buffers: HashMap<BufferId, OpenBuffer>,
  95    downstream_client: Option<(AnyProtoClient, u64)>,
  96    shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
  97}
  98
  99enum OpenBuffer {
 100    Buffer(WeakModel<Buffer>),
 101    Operations(Vec<Operation>),
 102}
 103
 104pub enum BufferStoreEvent {
 105    BufferAdded(Model<Buffer>),
 106    BufferDropped(BufferId),
 107    BufferChangedFilePath {
 108        buffer: Model<Buffer>,
 109        old_file: Option<Arc<dyn language::File>>,
 110    },
 111}
 112
 113#[derive(Default, Debug)]
 114pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
 115
 116impl EventEmitter<BufferStoreEvent> for BufferStore {}
 117
 118impl RemoteBufferStore {
 119    pub fn wait_for_remote_buffer(
 120        &mut self,
 121        id: BufferId,
 122        cx: &mut AppContext,
 123    ) -> Task<Result<Model<Buffer>>> {
 124        let buffer_store = self.buffer_store.clone();
 125        let (tx, rx) = oneshot::channel();
 126        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 127
 128        cx.spawn(|cx| async move {
 129            if let Some(buffer) = buffer_store
 130                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
 131                .ok()
 132                .flatten()
 133            {
 134                return Ok(buffer);
 135            }
 136
 137            cx.background_executor()
 138                .spawn(async move { rx.await? })
 139                .await
 140        })
 141    }
 142
 143    fn save_remote_buffer(
 144        &self,
 145        buffer_handle: Model<Buffer>,
 146        new_path: Option<proto::ProjectPath>,
 147        cx: &ModelContext<Self>,
 148    ) -> Task<Result<()>> {
 149        let buffer = buffer_handle.read(cx);
 150        let buffer_id = buffer.remote_id().into();
 151        let version = buffer.version();
 152        let rpc = self.upstream_client.clone();
 153        let project_id = self.project_id;
 154        cx.spawn(move |_, mut cx| async move {
 155            let response = rpc
 156                .request(proto::SaveBuffer {
 157                    project_id,
 158                    buffer_id,
 159                    new_path,
 160                    version: serialize_version(&version),
 161                })
 162                .await?;
 163            let version = deserialize_version(&response.version);
 164            let mtime = response.mtime.map(|mtime| mtime.into());
 165
 166            buffer_handle.update(&mut cx, |buffer, cx| {
 167                buffer.did_save(version.clone(), mtime, cx);
 168            })?;
 169
 170            Ok(())
 171        })
 172    }
 173
 174    pub fn handle_create_buffer_for_peer(
 175        &mut self,
 176        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 177        replica_id: u16,
 178        capability: Capability,
 179        cx: &mut ModelContext<Self>,
 180    ) -> Result<Option<Model<Buffer>>> {
 181        match envelope
 182            .payload
 183            .variant
 184            .ok_or_else(|| anyhow!("missing variant"))?
 185        {
 186            proto::create_buffer_for_peer::Variant::State(mut state) => {
 187                let buffer_id = BufferId::new(state.id)?;
 188
 189                let buffer_result = maybe!({
 190                    let mut buffer_file = None;
 191                    if let Some(file) = state.file.take() {
 192                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 193                        let worktree = self
 194                            .worktree_store
 195                            .read(cx)
 196                            .worktree_for_id(worktree_id, cx)
 197                            .ok_or_else(|| {
 198                                anyhow!("no worktree found for id {}", file.worktree_id)
 199                            })?;
 200                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 201                            as Arc<dyn language::File>);
 202                    }
 203                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 204                });
 205
 206                match buffer_result {
 207                    Ok(buffer) => {
 208                        let buffer = cx.new_model(|_| buffer);
 209                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 210                    }
 211                    Err(error) => {
 212                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 213                            for listener in listeners {
 214                                listener.send(Err(anyhow!(error.cloned()))).ok();
 215                            }
 216                        }
 217                    }
 218                }
 219            }
 220            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 221                let buffer_id = BufferId::new(chunk.buffer_id)?;
 222                let buffer = self
 223                    .loading_remote_buffers_by_id
 224                    .get(&buffer_id)
 225                    .cloned()
 226                    .ok_or_else(|| {
 227                        anyhow!(
 228                            "received chunk for buffer {} without initial state",
 229                            chunk.buffer_id
 230                        )
 231                    })?;
 232
 233                let result = maybe!({
 234                    let operations = chunk
 235                        .operations
 236                        .into_iter()
 237                        .map(language::proto::deserialize_operation)
 238                        .collect::<Result<Vec<_>>>()?;
 239                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 240                    anyhow::Ok(())
 241                });
 242
 243                if let Err(error) = result {
 244                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 245                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 246                        for listener in listeners {
 247                            listener.send(Err(error.cloned())).ok();
 248                        }
 249                    }
 250                } else if chunk.is_last {
 251                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 252                    if self.upstream_client.is_via_collab() {
 253                        // retain buffers sent by peers to avoid races.
 254                        self.shared_with_me.insert(buffer.clone());
 255                    }
 256
 257                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 258                        for sender in senders {
 259                            sender.send(Ok(buffer.clone())).ok();
 260                        }
 261                    }
 262                    return Ok(Some(buffer));
 263                }
 264            }
 265        }
 266        return Ok(None);
 267    }
 268
 269    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 270        self.loading_remote_buffers_by_id
 271            .keys()
 272            .copied()
 273            .collect::<Vec<_>>()
 274    }
 275
 276    pub fn deserialize_project_transaction(
 277        &self,
 278        message: proto::ProjectTransaction,
 279        push_to_history: bool,
 280        cx: &mut ModelContext<Self>,
 281    ) -> Task<Result<ProjectTransaction>> {
 282        cx.spawn(|this, mut cx| async move {
 283            let mut project_transaction = ProjectTransaction::default();
 284            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 285            {
 286                let buffer_id = BufferId::new(buffer_id)?;
 287                let buffer = this
 288                    .update(&mut cx, |this, cx| {
 289                        this.wait_for_remote_buffer(buffer_id, cx)
 290                    })?
 291                    .await?;
 292                let transaction = language::proto::deserialize_transaction(transaction)?;
 293                project_transaction.0.insert(buffer, transaction);
 294            }
 295
 296            for (buffer, transaction) in &project_transaction.0 {
 297                buffer
 298                    .update(&mut cx, |buffer, _| {
 299                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 300                    })?
 301                    .await?;
 302
 303                if push_to_history {
 304                    buffer.update(&mut cx, |buffer, _| {
 305                        buffer.push_transaction(transaction.clone(), Instant::now());
 306                    })?;
 307                }
 308            }
 309
 310            Ok(project_transaction)
 311        })
 312    }
 313}
 314
 315impl BufferStoreImpl for Model<RemoteBufferStore> {
 316    fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
 317        Some(self.clone())
 318    }
 319
 320    fn as_local(&self) -> Option<Model<LocalBufferStore>> {
 321        None
 322    }
 323
 324    fn save_buffer(
 325        &self,
 326        buffer: Model<Buffer>,
 327        cx: &mut ModelContext<BufferStore>,
 328    ) -> Task<Result<()>> {
 329        self.update(cx, |this, cx| {
 330            this.save_remote_buffer(buffer.clone(), None, cx)
 331        })
 332    }
 333    fn save_buffer_as(
 334        &self,
 335        buffer: Model<Buffer>,
 336        path: ProjectPath,
 337        cx: &mut ModelContext<BufferStore>,
 338    ) -> Task<Result<()>> {
 339        self.update(cx, |this, cx| {
 340            this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
 341        })
 342    }
 343
 344    fn open_buffer(
 345        &self,
 346        path: Arc<Path>,
 347        worktree: Model<Worktree>,
 348        cx: &mut ModelContext<BufferStore>,
 349    ) -> Task<Result<Model<Buffer>>> {
 350        self.update(cx, |this, cx| {
 351            let worktree_id = worktree.read(cx).id().to_proto();
 352            let project_id = this.project_id;
 353            let client = this.upstream_client.clone();
 354            let path_string = path.clone().to_string_lossy().to_string();
 355            cx.spawn(move |this, mut cx| async move {
 356                let response = client
 357                    .request(proto::OpenBufferByPath {
 358                        project_id,
 359                        worktree_id,
 360                        path: path_string,
 361                    })
 362                    .await?;
 363                let buffer_id = BufferId::new(response.buffer_id)?;
 364
 365                let buffer = this
 366                    .update(&mut cx, {
 367                        |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 368                    })?
 369                    .await?;
 370
 371                Ok(buffer)
 372            })
 373        })
 374    }
 375
 376    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 377        self.update(cx, |this, cx| {
 378            let create = this.upstream_client.request(proto::OpenNewBuffer {
 379                project_id: this.project_id,
 380            });
 381            cx.spawn(|this, mut cx| async move {
 382                let response = create.await?;
 383                let buffer_id = BufferId::new(response.buffer_id)?;
 384
 385                this.update(&mut cx, |this, cx| {
 386                    this.wait_for_remote_buffer(buffer_id, cx)
 387                })?
 388                .await
 389            })
 390        })
 391    }
 392
 393    fn reload_buffers(
 394        &self,
 395        buffers: HashSet<Model<Buffer>>,
 396        push_to_history: bool,
 397        cx: &mut ModelContext<BufferStore>,
 398    ) -> Task<Result<ProjectTransaction>> {
 399        self.update(cx, |this, cx| {
 400            let request = this.upstream_client.request(proto::ReloadBuffers {
 401                project_id: this.project_id,
 402                buffer_ids: buffers
 403                    .iter()
 404                    .map(|buffer| buffer.read(cx).remote_id().to_proto())
 405                    .collect(),
 406            });
 407
 408            cx.spawn(|this, mut cx| async move {
 409                let response = request
 410                    .await?
 411                    .transaction
 412                    .ok_or_else(|| anyhow!("missing transaction"))?;
 413                this.update(&mut cx, |this, cx| {
 414                    this.deserialize_project_transaction(response, push_to_history, cx)
 415                })?
 416                .await
 417            })
 418        })
 419    }
 420}
 421
 422impl LocalBufferStore {
 423    fn save_local_buffer(
 424        &self,
 425        buffer_handle: Model<Buffer>,
 426        worktree: Model<Worktree>,
 427        path: Arc<Path>,
 428        mut has_changed_file: bool,
 429        cx: &mut ModelContext<Self>,
 430    ) -> Task<Result<()>> {
 431        let buffer = buffer_handle.read(cx);
 432
 433        let text = buffer.as_rope().clone();
 434        let line_ending = buffer.line_ending();
 435        let version = buffer.version();
 436        let buffer_id = buffer.remote_id();
 437        if buffer.file().is_some_and(|file| !file.is_created()) {
 438            has_changed_file = true;
 439        }
 440
 441        let save = worktree.update(cx, |worktree, cx| {
 442            worktree.write_file(path.as_ref(), text, line_ending, cx)
 443        });
 444
 445        cx.spawn(move |this, mut cx| async move {
 446            let new_file = save.await?;
 447            let mtime = new_file.mtime;
 448            this.update(&mut cx, |this, cx| {
 449                if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
 450                    if has_changed_file {
 451                        downstream_client
 452                            .send(proto::UpdateBufferFile {
 453                                project_id,
 454                                buffer_id: buffer_id.to_proto(),
 455                                file: Some(language::File::to_proto(&*new_file, cx)),
 456                            })
 457                            .log_err();
 458                    }
 459                    downstream_client
 460                        .send(proto::BufferSaved {
 461                            project_id,
 462                            buffer_id: buffer_id.to_proto(),
 463                            version: serialize_version(&version),
 464                            mtime: mtime.map(|time| time.into()),
 465                        })
 466                        .log_err();
 467                }
 468            })?;
 469            buffer_handle.update(&mut cx, |buffer, cx| {
 470                if has_changed_file {
 471                    buffer.file_updated(new_file, cx);
 472                }
 473                buffer.did_save(version.clone(), mtime, cx);
 474            })
 475        })
 476    }
 477
 478    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 479        cx.subscribe(worktree, |this, worktree, event, cx| {
 480            if worktree.read(cx).is_local() {
 481                match event {
 482                    worktree::Event::UpdatedEntries(changes) => {
 483                        this.local_worktree_entries_changed(&worktree, changes, cx);
 484                    }
 485                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 486                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 487                    }
 488                    _ => {}
 489                }
 490            }
 491        })
 492        .detach();
 493    }
 494
 495    fn local_worktree_entries_changed(
 496        &mut self,
 497        worktree_handle: &Model<Worktree>,
 498        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 499        cx: &mut ModelContext<Self>,
 500    ) {
 501        let snapshot = worktree_handle.read(cx).snapshot();
 502        for (path, entry_id, _) in changes {
 503            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 504        }
 505    }
 506
 507    fn local_worktree_git_repos_changed(
 508        &mut self,
 509        worktree_handle: Model<Worktree>,
 510        changed_repos: &UpdatedGitRepositoriesSet,
 511        cx: &mut ModelContext<Self>,
 512    ) {
 513        debug_assert!(worktree_handle.read(cx).is_local());
 514        let Some(buffer_store) = self.buffer_store.upgrade() else {
 515            return;
 516        };
 517
 518        // Identify the loading buffers whose containing repository that has changed.
 519        let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
 520            let future_buffers = buffer_store
 521                .loading_buffers()
 522                .filter_map(|(project_path, receiver)| {
 523                    if project_path.worktree_id != worktree_handle.read(cx).id() {
 524                        return None;
 525                    }
 526                    let path = &project_path.path;
 527                    changed_repos
 528                        .iter()
 529                        .find(|(work_dir, _)| path.starts_with(work_dir))?;
 530                    let path = path.clone();
 531                    Some(async move {
 532                        BufferStore::wait_for_loading_buffer(receiver)
 533                            .await
 534                            .ok()
 535                            .map(|buffer| (buffer, path))
 536                    })
 537                })
 538                .collect::<FuturesUnordered<_>>();
 539
 540            // Identify the current buffers whose containing repository has changed.
 541            let current_buffers = buffer_store
 542                .buffers()
 543                .filter_map(|buffer| {
 544                    let file = File::from_dyn(buffer.read(cx).file())?;
 545                    if file.worktree != worktree_handle {
 546                        return None;
 547                    }
 548                    changed_repos
 549                        .iter()
 550                        .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 551                    Some((buffer, file.path.clone()))
 552                })
 553                .collect::<Vec<_>>();
 554            (future_buffers, current_buffers)
 555        });
 556
 557        if future_buffers.len() + current_buffers.len() == 0 {
 558            return;
 559        }
 560
 561        cx.spawn(move |this, mut cx| async move {
 562            // Wait for all of the buffers to load.
 563            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 564
 565            // Reload the diff base for every buffer whose containing git repository has changed.
 566            let snapshot =
 567                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 568            let diff_bases_by_buffer = cx
 569                .background_executor()
 570                .spawn(async move {
 571                    let mut diff_base_tasks = future_buffers
 572                        .into_iter()
 573                        .flatten()
 574                        .chain(current_buffers)
 575                        .filter_map(|(buffer, path)| {
 576                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 577                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 578                            Some(async move {
 579                                let base_text =
 580                                    local_repo_entry.repo().load_index_text(&relative_path);
 581                                Some((buffer, base_text))
 582                            })
 583                        })
 584                        .collect::<FuturesUnordered<_>>();
 585
 586                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 587                    while let Some(diff_base) = diff_base_tasks.next().await {
 588                        if let Some(diff_base) = diff_base {
 589                            diff_bases.push(diff_base);
 590                        }
 591                    }
 592                    diff_bases
 593                })
 594                .await;
 595
 596            this.update(&mut cx, |this, cx| {
 597                // Assign the new diff bases on all of the buffers.
 598                for (buffer, diff_base) in diff_bases_by_buffer {
 599                    let buffer_id = buffer.update(cx, |buffer, cx| {
 600                        buffer.set_diff_base(diff_base.clone(), cx);
 601                        buffer.remote_id().to_proto()
 602                    });
 603                    if let Some((client, project_id)) = &this.downstream_client(cx) {
 604                        client
 605                            .send(proto::UpdateDiffBase {
 606                                project_id: *project_id,
 607                                buffer_id,
 608                                diff_base,
 609                            })
 610                            .log_err();
 611                    }
 612                }
 613            })
 614        })
 615        .detach_and_log_err(cx);
 616    }
 617
 618    fn local_worktree_entry_changed(
 619        &mut self,
 620        entry_id: ProjectEntryId,
 621        path: &Arc<Path>,
 622        worktree: &Model<worktree::Worktree>,
 623        snapshot: &worktree::Snapshot,
 624        cx: &mut ModelContext<Self>,
 625    ) -> Option<()> {
 626        let project_path = ProjectPath {
 627            worktree_id: snapshot.id(),
 628            path: path.clone(),
 629        };
 630        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 631            Some(&buffer_id) => buffer_id,
 632            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 633        };
 634        let buffer = self
 635            .buffer_store
 636            .update(cx, |buffer_store, _| {
 637                if let Some(buffer) = buffer_store.get(buffer_id) {
 638                    Some(buffer)
 639                } else {
 640                    buffer_store.opened_buffers.remove(&buffer_id);
 641                    None
 642                }
 643            })
 644            .ok()
 645            .flatten();
 646        let buffer = if let Some(buffer) = buffer {
 647            buffer
 648        } else {
 649            self.local_buffer_ids_by_path.remove(&project_path);
 650            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 651            return None;
 652        };
 653
 654        let events = buffer.update(cx, |buffer, cx| {
 655            let file = buffer.file()?;
 656            let old_file = File::from_dyn(Some(file))?;
 657            if old_file.worktree != *worktree {
 658                return None;
 659            }
 660
 661            let new_file = if let Some(entry) = old_file
 662                .entry_id
 663                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 664            {
 665                File {
 666                    is_local: true,
 667                    entry_id: Some(entry.id),
 668                    mtime: entry.mtime,
 669                    path: entry.path.clone(),
 670                    worktree: worktree.clone(),
 671                    is_deleted: false,
 672                    is_private: entry.is_private,
 673                }
 674            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 675                File {
 676                    is_local: true,
 677                    entry_id: Some(entry.id),
 678                    mtime: entry.mtime,
 679                    path: entry.path.clone(),
 680                    worktree: worktree.clone(),
 681                    is_deleted: false,
 682                    is_private: entry.is_private,
 683                }
 684            } else {
 685                File {
 686                    is_local: true,
 687                    entry_id: old_file.entry_id,
 688                    path: old_file.path.clone(),
 689                    mtime: old_file.mtime,
 690                    worktree: worktree.clone(),
 691                    is_deleted: true,
 692                    is_private: old_file.is_private,
 693                }
 694            };
 695
 696            if new_file == *old_file {
 697                return None;
 698            }
 699
 700            let mut events = Vec::new();
 701            if new_file.path != old_file.path {
 702                self.local_buffer_ids_by_path.remove(&ProjectPath {
 703                    path: old_file.path.clone(),
 704                    worktree_id: old_file.worktree_id(cx),
 705                });
 706                self.local_buffer_ids_by_path.insert(
 707                    ProjectPath {
 708                        worktree_id: new_file.worktree_id(cx),
 709                        path: new_file.path.clone(),
 710                    },
 711                    buffer_id,
 712                );
 713                events.push(BufferStoreEvent::BufferChangedFilePath {
 714                    buffer: cx.handle(),
 715                    old_file: buffer.file().cloned(),
 716                });
 717            }
 718
 719            if new_file.entry_id != old_file.entry_id {
 720                if let Some(entry_id) = old_file.entry_id {
 721                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
 722                }
 723                if let Some(entry_id) = new_file.entry_id {
 724                    self.local_buffer_ids_by_entry_id
 725                        .insert(entry_id, buffer_id);
 726                }
 727            }
 728
 729            if let Some((client, project_id)) = &self.downstream_client(cx) {
 730                client
 731                    .send(proto::UpdateBufferFile {
 732                        project_id: *project_id,
 733                        buffer_id: buffer_id.to_proto(),
 734                        file: Some(new_file.to_proto(cx)),
 735                    })
 736                    .ok();
 737            }
 738
 739            buffer.file_updated(Arc::new(new_file), cx);
 740            Some(events)
 741        })?;
 742        self.buffer_store
 743            .update(cx, |_buffer_store, cx| {
 744                for event in events {
 745                    cx.emit(event);
 746                }
 747            })
 748            .log_err()?;
 749
 750        None
 751    }
 752
 753    fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
 754        self.buffer_store
 755            .upgrade()?
 756            .read(cx)
 757            .downstream_client
 758            .clone()
 759    }
 760
 761    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 762        let file = File::from_dyn(buffer.read(cx).file())?;
 763
 764        let remote_id = buffer.read(cx).remote_id();
 765        if let Some(entry_id) = file.entry_id {
 766            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 767                Some(_) => {
 768                    return None;
 769                }
 770                None => {
 771                    self.local_buffer_ids_by_entry_id
 772                        .insert(entry_id, remote_id);
 773                }
 774            }
 775        };
 776        self.local_buffer_ids_by_path.insert(
 777            ProjectPath {
 778                worktree_id: file.worktree_id(cx),
 779                path: file.path.clone(),
 780            },
 781            remote_id,
 782        );
 783
 784        Some(())
 785    }
 786}
 787
 788impl BufferStoreImpl for Model<LocalBufferStore> {
 789    fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
 790        None
 791    }
 792
 793    fn as_local(&self) -> Option<Model<LocalBufferStore>> {
 794        Some(self.clone())
 795    }
 796
 797    fn save_buffer(
 798        &self,
 799        buffer: Model<Buffer>,
 800        cx: &mut ModelContext<BufferStore>,
 801    ) -> Task<Result<()>> {
 802        self.update(cx, |this, cx| {
 803            let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 804                return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 805            };
 806            let worktree = file.worktree.clone();
 807            this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 808        })
 809    }
 810
 811    fn save_buffer_as(
 812        &self,
 813        buffer: Model<Buffer>,
 814        path: ProjectPath,
 815        cx: &mut ModelContext<BufferStore>,
 816    ) -> Task<Result<()>> {
 817        self.update(cx, |this, cx| {
 818            let Some(worktree) = this
 819                .worktree_store
 820                .read(cx)
 821                .worktree_for_id(path.worktree_id, cx)
 822            else {
 823                return Task::ready(Err(anyhow!("no such worktree")));
 824            };
 825            this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
 826        })
 827    }
 828
 829    fn open_buffer(
 830        &self,
 831        path: Arc<Path>,
 832        worktree: Model<Worktree>,
 833        cx: &mut ModelContext<BufferStore>,
 834    ) -> Task<Result<Model<Buffer>>> {
 835        let buffer_store = cx.weak_model();
 836        self.update(cx, |_, cx| {
 837            let load_buffer = worktree.update(cx, |worktree, cx| {
 838                let load_file = worktree.load_file(path.as_ref(), cx);
 839                let reservation = cx.reserve_model();
 840                let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 841                cx.spawn(move |_, mut cx| async move {
 842                    let loaded = load_file.await?;
 843                    let text_buffer = cx
 844                        .background_executor()
 845                        .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 846                        .await;
 847                    cx.insert_model(reservation, |_| {
 848                        Buffer::build(
 849                            text_buffer,
 850                            loaded.diff_base,
 851                            Some(loaded.file),
 852                            Capability::ReadWrite,
 853                        )
 854                    })
 855                })
 856            });
 857
 858            cx.spawn(move |this, mut cx| async move {
 859                let buffer = match load_buffer.await {
 860                    Ok(buffer) => Ok(buffer),
 861                    Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 862                        let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 863                        let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 864                        Buffer::build(
 865                            text_buffer,
 866                            None,
 867                            Some(Arc::new(File {
 868                                worktree,
 869                                path,
 870                                mtime: None,
 871                                entry_id: None,
 872                                is_local: true,
 873                                is_deleted: false,
 874                                is_private: false,
 875                            })),
 876                            Capability::ReadWrite,
 877                        )
 878                    }),
 879                    Err(e) => Err(e),
 880                }?;
 881                this.update(&mut cx, |this, cx| {
 882                    buffer_store.update(cx, |buffer_store, cx| {
 883                        buffer_store.add_buffer(buffer.clone(), cx)
 884                    })??;
 885                    let buffer_id = buffer.read(cx).remote_id();
 886                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 887                        this.local_buffer_ids_by_path.insert(
 888                            ProjectPath {
 889                                worktree_id: file.worktree_id(cx),
 890                                path: file.path.clone(),
 891                            },
 892                            buffer_id,
 893                        );
 894
 895                        if let Some(entry_id) = file.entry_id {
 896                            this.local_buffer_ids_by_entry_id
 897                                .insert(entry_id, buffer_id);
 898                        }
 899                    }
 900
 901                    anyhow::Ok(())
 902                })??;
 903
 904                Ok(buffer)
 905            })
 906        })
 907    }
 908
 909    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
 910        let handle = self.clone();
 911        cx.spawn(|buffer_store, mut cx| async move {
 912            let buffer = cx.new_model(|cx| {
 913                Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
 914            })?;
 915            buffer_store.update(&mut cx, |buffer_store, cx| {
 916                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 917                let buffer_id = buffer.read(cx).remote_id();
 918                handle.update(cx, |this, cx| {
 919                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 920                        this.local_buffer_ids_by_path.insert(
 921                            ProjectPath {
 922                                worktree_id: file.worktree_id(cx),
 923                                path: file.path.clone(),
 924                            },
 925                            buffer_id,
 926                        );
 927
 928                        if let Some(entry_id) = file.entry_id {
 929                            this.local_buffer_ids_by_entry_id
 930                                .insert(entry_id, buffer_id);
 931                        }
 932                    }
 933                });
 934            })?;
 935            Ok(buffer)
 936        })
 937    }
 938
 939    fn reload_buffers(
 940        &self,
 941        buffers: HashSet<Model<Buffer>>,
 942        push_to_history: bool,
 943        cx: &mut ModelContext<BufferStore>,
 944    ) -> Task<Result<ProjectTransaction>> {
 945        cx.spawn(move |_, mut cx| async move {
 946            let mut project_transaction = ProjectTransaction::default();
 947            for buffer in buffers {
 948                let transaction = buffer
 949                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
 950                    .await?;
 951                buffer.update(&mut cx, |buffer, cx| {
 952                    if let Some(transaction) = transaction {
 953                        if !push_to_history {
 954                            buffer.forget_transaction(transaction.id);
 955                        }
 956                        project_transaction.0.insert(cx.handle(), transaction);
 957                    }
 958                })?;
 959            }
 960
 961            Ok(project_transaction)
 962        })
 963    }
 964}
 965
 966impl BufferStore {
 967    pub fn init(client: &AnyProtoClient) {
 968        client.add_model_message_handler(Self::handle_buffer_reloaded);
 969        client.add_model_message_handler(Self::handle_buffer_saved);
 970        client.add_model_message_handler(Self::handle_update_buffer_file);
 971        client.add_model_message_handler(Self::handle_update_diff_base);
 972        client.add_model_request_handler(Self::handle_save_buffer);
 973        client.add_model_request_handler(Self::handle_blame_buffer);
 974        client.add_model_request_handler(Self::handle_reload_buffers);
 975        client.add_model_request_handler(Self::handle_get_permalink_to_line);
 976    }
 977
 978    /// Creates a buffer store, optionally retaining its buffers.
 979    pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
 980        let this = cx.weak_model();
 981        Self {
 982            state: Box::new(cx.new_model(|cx| {
 983                let subscription = cx.subscribe(
 984                    &worktree_store,
 985                    |this: &mut LocalBufferStore, _, event, cx| {
 986                        if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 987                            this.subscribe_to_worktree(worktree, cx);
 988                        }
 989                    },
 990                );
 991
 992                LocalBufferStore {
 993                    local_buffer_ids_by_path: Default::default(),
 994                    local_buffer_ids_by_entry_id: Default::default(),
 995                    buffer_store: this,
 996                    worktree_store: worktree_store.clone(),
 997                    _subscription: subscription,
 998                }
 999            })),
1000            downstream_client: None,
1001            opened_buffers: Default::default(),
1002            shared_buffers: Default::default(),
1003            loading_buffers_by_path: Default::default(),
1004            worktree_store,
1005        }
1006    }
1007
1008    pub fn remote(
1009        worktree_store: Model<WorktreeStore>,
1010        upstream_client: AnyProtoClient,
1011        remote_id: u64,
1012        cx: &mut ModelContext<Self>,
1013    ) -> Self {
1014        let this = cx.weak_model();
1015        Self {
1016            state: Box::new(cx.new_model(|_| RemoteBufferStore {
1017                shared_with_me: Default::default(),
1018                loading_remote_buffers_by_id: Default::default(),
1019                remote_buffer_listeners: Default::default(),
1020                project_id: remote_id,
1021                upstream_client,
1022                worktree_store: worktree_store.clone(),
1023                buffer_store: this,
1024            })),
1025            downstream_client: None,
1026            opened_buffers: Default::default(),
1027            loading_buffers_by_path: Default::default(),
1028            shared_buffers: Default::default(),
1029            worktree_store,
1030        }
1031    }
1032
1033    pub fn open_buffer(
1034        &mut self,
1035        project_path: ProjectPath,
1036        cx: &mut ModelContext<Self>,
1037    ) -> Task<Result<Model<Buffer>>> {
1038        let existing_buffer = self.get_by_path(&project_path, cx);
1039        if let Some(existing_buffer) = existing_buffer {
1040            return Task::ready(Ok(existing_buffer));
1041        }
1042
1043        let Some(worktree) = self
1044            .worktree_store
1045            .read(cx)
1046            .worktree_for_id(project_path.worktree_id, cx)
1047        else {
1048            return Task::ready(Err(anyhow!("no such worktree")));
1049        };
1050
1051        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
1052            // If the given path is already being loaded, then wait for that existing
1053            // task to complete and return the same buffer.
1054            hash_map::Entry::Occupied(e) => e.get().clone(),
1055
1056            // Otherwise, record the fact that this path is now being loaded.
1057            hash_map::Entry::Vacant(entry) => {
1058                let (mut tx, rx) = postage::watch::channel();
1059                entry.insert(rx.clone());
1060
1061                let project_path = project_path.clone();
1062                let load_buffer = self
1063                    .state
1064                    .open_buffer(project_path.path.clone(), worktree, cx);
1065
1066                cx.spawn(move |this, mut cx| async move {
1067                    let load_result = load_buffer.await;
1068                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| {
1069                        // Record the fact that the buffer is no longer loading.
1070                        this.loading_buffers_by_path.remove(&project_path);
1071                        let buffer = load_result.map_err(Arc::new)?;
1072                        Ok(buffer)
1073                    })?);
1074                    anyhow::Ok(())
1075                })
1076                .detach();
1077                rx
1078            }
1079        };
1080
1081        cx.background_executor().spawn(async move {
1082            Self::wait_for_loading_buffer(loading_watch)
1083                .await
1084                .map_err(|e| e.cloned())
1085        })
1086    }
1087
1088    pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
1089        self.state.create_buffer(cx)
1090    }
1091
1092    pub fn save_buffer(
1093        &mut self,
1094        buffer: Model<Buffer>,
1095        cx: &mut ModelContext<Self>,
1096    ) -> Task<Result<()>> {
1097        self.state.save_buffer(buffer, cx)
1098    }
1099
1100    pub fn save_buffer_as(
1101        &mut self,
1102        buffer: Model<Buffer>,
1103        path: ProjectPath,
1104        cx: &mut ModelContext<Self>,
1105    ) -> Task<Result<()>> {
1106        let old_file = buffer.read(cx).file().cloned();
1107        let task = self.state.save_buffer_as(buffer.clone(), path, cx);
1108        cx.spawn(|this, mut cx| async move {
1109            task.await?;
1110            this.update(&mut cx, |_, cx| {
1111                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1112            })
1113        })
1114    }
1115
1116    pub fn blame_buffer(
1117        &self,
1118        buffer: &Model<Buffer>,
1119        version: Option<clock::Global>,
1120        cx: &AppContext,
1121    ) -> Task<Result<Option<Blame>>> {
1122        let buffer = buffer.read(cx);
1123        let Some(file) = File::from_dyn(buffer.file()) else {
1124            return Task::ready(Err(anyhow!("buffer has no file")));
1125        };
1126
1127        match file.worktree.clone().read(cx) {
1128            Worktree::Local(worktree) => {
1129                let worktree = worktree.snapshot();
1130                let blame_params = maybe!({
1131                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
1132                        Some(repo_for_path) => repo_for_path,
1133                        None => return Ok(None),
1134                    };
1135
1136                    let relative_path = repo_entry
1137                        .relativize(&worktree, &file.path)
1138                        .context("failed to relativize buffer path")?;
1139
1140                    let repo = local_repo_entry.repo().clone();
1141
1142                    let content = match version {
1143                        Some(version) => buffer.rope_for_version(&version).clone(),
1144                        None => buffer.as_rope().clone(),
1145                    };
1146
1147                    anyhow::Ok(Some((repo, relative_path, content)))
1148                });
1149
1150                cx.background_executor().spawn(async move {
1151                    let Some((repo, relative_path, content)) = blame_params? else {
1152                        return Ok(None);
1153                    };
1154                    repo.blame(&relative_path, content)
1155                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
1156                        .map(Some)
1157                })
1158            }
1159            Worktree::Remote(worktree) => {
1160                let buffer_id = buffer.remote_id();
1161                let version = buffer.version();
1162                let project_id = worktree.project_id();
1163                let client = worktree.client();
1164                cx.spawn(|_| async move {
1165                    let response = client
1166                        .request(proto::BlameBuffer {
1167                            project_id,
1168                            buffer_id: buffer_id.into(),
1169                            version: serialize_version(&version),
1170                        })
1171                        .await?;
1172                    Ok(deserialize_blame_buffer_response(response))
1173                })
1174            }
1175        }
1176    }
1177
1178    pub fn get_permalink_to_line(
1179        &self,
1180        buffer: &Model<Buffer>,
1181        selection: Range<u32>,
1182        cx: &AppContext,
1183    ) -> Task<Result<url::Url>> {
1184        let buffer = buffer.read(cx);
1185        let Some(file) = File::from_dyn(buffer.file()) else {
1186            return Task::ready(Err(anyhow!("buffer has no file")));
1187        };
1188
1189        match file.worktree.clone().read(cx) {
1190            Worktree::Local(worktree) => {
1191                let Some(repo) = worktree.local_git_repo(file.path()) else {
1192                    return Task::ready(Err(anyhow!("no repository for buffer found")));
1193                };
1194
1195                let path = file.path().clone();
1196
1197                cx.spawn(|cx| async move {
1198                    const REMOTE_NAME: &str = "origin";
1199                    let origin_url = repo
1200                        .remote_url(REMOTE_NAME)
1201                        .ok_or_else(|| anyhow!("remote \"{REMOTE_NAME}\" not found"))?;
1202
1203                    let sha = repo
1204                        .head_sha()
1205                        .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1206
1207                    let provider_registry =
1208                        cx.update(GitHostingProviderRegistry::default_global)?;
1209
1210                    let (provider, remote) =
1211                        parse_git_remote_url(provider_registry, &origin_url)
1212                            .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1213
1214                    let path = path
1215                        .to_str()
1216                        .context("failed to convert buffer path to string")?;
1217
1218                    Ok(provider.build_permalink(
1219                        remote,
1220                        BuildPermalinkParams {
1221                            sha: &sha,
1222                            path,
1223                            selection: Some(selection),
1224                        },
1225                    ))
1226                })
1227            }
1228            Worktree::Remote(worktree) => {
1229                let buffer_id = buffer.remote_id();
1230                let project_id = worktree.project_id();
1231                let client = worktree.client();
1232                cx.spawn(|_| async move {
1233                    let response = client
1234                        .request(proto::GetPermalinkToLine {
1235                            project_id,
1236                            buffer_id: buffer_id.into(),
1237                            selection: Some(proto::Range {
1238                                start: selection.start as u64,
1239                                end: selection.end as u64,
1240                            }),
1241                        })
1242                        .await?;
1243
1244                    url::Url::parse(&response.permalink).context("failed to parse permalink")
1245                })
1246            }
1247        }
1248    }
1249
1250    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
1251        let remote_id = buffer.read(cx).remote_id();
1252        let is_remote = buffer.read(cx).replica_id() != 0;
1253        let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
1254
1255        let handle = cx.handle().downgrade();
1256        buffer.update(cx, move |_, cx| {
1257            cx.on_release(move |buffer, cx| {
1258                handle
1259                    .update(cx, |_, cx| {
1260                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
1261                    })
1262                    .ok();
1263            })
1264            .detach()
1265        });
1266
1267        match self.opened_buffers.entry(remote_id) {
1268            hash_map::Entry::Vacant(entry) => {
1269                entry.insert(open_buffer);
1270            }
1271            hash_map::Entry::Occupied(mut entry) => {
1272                if let OpenBuffer::Operations(operations) = entry.get_mut() {
1273                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
1274                } else if entry.get().upgrade().is_some() {
1275                    if is_remote {
1276                        return Ok(());
1277                    } else {
1278                        debug_panic!("buffer {} was already registered", remote_id);
1279                        Err(anyhow!("buffer {} was already registered", remote_id))?;
1280                    }
1281                }
1282                entry.insert(open_buffer);
1283            }
1284        }
1285
1286        cx.subscribe(&buffer, Self::on_buffer_event).detach();
1287        cx.emit(BufferStoreEvent::BufferAdded(buffer));
1288        Ok(())
1289    }
1290
1291    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
1292        self.opened_buffers
1293            .values()
1294            .filter_map(|buffer| buffer.upgrade())
1295    }
1296
1297    pub fn loading_buffers(
1298        &self,
1299    ) -> impl Iterator<
1300        Item = (
1301            &ProjectPath,
1302            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1303        ),
1304    > {
1305        self.loading_buffers_by_path
1306            .iter()
1307            .map(|(path, rx)| (path, rx.clone()))
1308    }
1309
1310    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
1311        self.buffers().find_map(|buffer| {
1312            let file = File::from_dyn(buffer.read(cx).file())?;
1313            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
1314                Some(buffer)
1315            } else {
1316                None
1317            }
1318        })
1319    }
1320
1321    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
1322        self.opened_buffers
1323            .get(&buffer_id)
1324            .and_then(|buffer| buffer.upgrade())
1325    }
1326
1327    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
1328        self.get(buffer_id)
1329            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
1330    }
1331
1332    pub fn get_possibly_incomplete(
1333        &self,
1334        buffer_id: BufferId,
1335        cx: &AppContext,
1336    ) -> Option<Model<Buffer>> {
1337        self.get(buffer_id).or_else(|| {
1338            self.state.as_remote().and_then(|remote| {
1339                remote
1340                    .read(cx)
1341                    .loading_remote_buffers_by_id
1342                    .get(&buffer_id)
1343                    .cloned()
1344            })
1345        })
1346    }
1347
1348    pub fn buffer_version_info(
1349        &self,
1350        cx: &AppContext,
1351    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1352        let buffers = self
1353            .buffers()
1354            .map(|buffer| {
1355                let buffer = buffer.read(cx);
1356                proto::BufferVersion {
1357                    id: buffer.remote_id().into(),
1358                    version: language::proto::serialize_version(&buffer.version),
1359                }
1360            })
1361            .collect();
1362        let incomplete_buffer_ids = self
1363            .state
1364            .as_remote()
1365            .map(|remote| remote.read(cx).incomplete_buffer_ids())
1366            .unwrap_or_default();
1367        (buffers, incomplete_buffer_ids)
1368    }
1369
1370    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
1371        for open_buffer in self.opened_buffers.values_mut() {
1372            if let Some(buffer) = open_buffer.upgrade() {
1373                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1374            }
1375        }
1376
1377        for buffer in self.buffers() {
1378            buffer.update(cx, |buffer, cx| {
1379                buffer.set_capability(Capability::ReadOnly, cx)
1380            });
1381        }
1382
1383        if let Some(remote) = self.state.as_remote() {
1384            remote.update(cx, |remote, _| {
1385                // Wake up all futures currently waiting on a buffer to get opened,
1386                // to give them a chance to fail now that we've disconnected.
1387                remote.remote_buffer_listeners.clear()
1388            })
1389        }
1390    }
1391
1392    pub fn shared(
1393        &mut self,
1394        remote_id: u64,
1395        downstream_client: AnyProtoClient,
1396        _cx: &mut AppContext,
1397    ) {
1398        self.downstream_client = Some((downstream_client, remote_id));
1399    }
1400
1401    pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
1402        self.downstream_client.take();
1403        self.forget_shared_buffers();
1404    }
1405
1406    pub fn discard_incomplete(&mut self) {
1407        self.opened_buffers
1408            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1409    }
1410
1411    pub fn find_search_candidates(
1412        &mut self,
1413        query: &SearchQuery,
1414        mut limit: usize,
1415        fs: Arc<dyn Fs>,
1416        cx: &mut ModelContext<Self>,
1417    ) -> Receiver<Model<Buffer>> {
1418        let (tx, rx) = smol::channel::unbounded();
1419        let mut open_buffers = HashSet::default();
1420        let mut unnamed_buffers = Vec::new();
1421        for handle in self.buffers() {
1422            let buffer = handle.read(cx);
1423            if let Some(entry_id) = buffer.entry_id(cx) {
1424                open_buffers.insert(entry_id);
1425            } else {
1426                limit = limit.saturating_sub(1);
1427                unnamed_buffers.push(handle)
1428            };
1429        }
1430
1431        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1432        let mut project_paths_rx = self
1433            .worktree_store
1434            .update(cx, |worktree_store, cx| {
1435                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1436            })
1437            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1438
1439        cx.spawn(|this, mut cx| async move {
1440            for buffer in unnamed_buffers {
1441                tx.send(buffer).await.ok();
1442            }
1443
1444            while let Some(project_paths) = project_paths_rx.next().await {
1445                let buffers = this.update(&mut cx, |this, cx| {
1446                    project_paths
1447                        .into_iter()
1448                        .map(|project_path| this.open_buffer(project_path, cx))
1449                        .collect::<Vec<_>>()
1450                })?;
1451                for buffer_task in buffers {
1452                    if let Some(buffer) = buffer_task.await.log_err() {
1453                        if tx.send(buffer).await.is_err() {
1454                            return anyhow::Ok(());
1455                        }
1456                    }
1457                }
1458            }
1459            anyhow::Ok(())
1460        })
1461        .detach();
1462        rx
1463    }
1464
1465    fn on_buffer_event(
1466        &mut self,
1467        buffer: Model<Buffer>,
1468        event: &BufferEvent,
1469        cx: &mut ModelContext<Self>,
1470    ) {
1471        match event {
1472            BufferEvent::FileHandleChanged => {
1473                if let Some(local) = self.state.as_local() {
1474                    local.update(cx, |local, cx| {
1475                        local.buffer_changed_file(buffer, cx);
1476                    })
1477                }
1478            }
1479            BufferEvent::Reloaded => {
1480                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1481                    return;
1482                };
1483                let buffer = buffer.read(cx);
1484                downstream_client
1485                    .send(proto::BufferReloaded {
1486                        project_id: *project_id,
1487                        buffer_id: buffer.remote_id().to_proto(),
1488                        version: serialize_version(&buffer.version()),
1489                        mtime: buffer.saved_mtime().map(|t| t.into()),
1490                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1491                    })
1492                    .log_err();
1493            }
1494            _ => {}
1495        }
1496    }
1497
1498    pub async fn handle_update_buffer(
1499        this: Model<Self>,
1500        envelope: TypedEnvelope<proto::UpdateBuffer>,
1501        mut cx: AsyncAppContext,
1502    ) -> Result<proto::Ack> {
1503        let payload = envelope.payload.clone();
1504        let buffer_id = BufferId::new(payload.buffer_id)?;
1505        let ops = payload
1506            .operations
1507            .into_iter()
1508            .map(language::proto::deserialize_operation)
1509            .collect::<Result<Vec<_>, _>>()?;
1510        this.update(&mut cx, |this, cx| {
1511            match this.opened_buffers.entry(buffer_id) {
1512                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1513                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1514                    OpenBuffer::Buffer(buffer) => {
1515                        if let Some(buffer) = buffer.upgrade() {
1516                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1517                        }
1518                    }
1519                },
1520                hash_map::Entry::Vacant(e) => {
1521                    e.insert(OpenBuffer::Operations(ops));
1522                }
1523            }
1524            Ok(proto::Ack {})
1525        })?
1526    }
1527
1528    pub fn handle_synchronize_buffers(
1529        &mut self,
1530        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1531        cx: &mut ModelContext<Self>,
1532        client: Arc<Client>,
1533    ) -> Result<proto::SynchronizeBuffersResponse> {
1534        let project_id = envelope.payload.project_id;
1535        let mut response = proto::SynchronizeBuffersResponse {
1536            buffers: Default::default(),
1537        };
1538        let Some(guest_id) = envelope.original_sender_id else {
1539            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1540        };
1541
1542        self.shared_buffers.entry(guest_id).or_default().clear();
1543        for buffer in envelope.payload.buffers {
1544            let buffer_id = BufferId::new(buffer.id)?;
1545            let remote_version = language::proto::deserialize_version(&buffer.version);
1546            if let Some(buffer) = self.get(buffer_id) {
1547                self.shared_buffers
1548                    .entry(guest_id)
1549                    .or_default()
1550                    .insert(buffer.clone());
1551
1552                let buffer = buffer.read(cx);
1553                response.buffers.push(proto::BufferVersion {
1554                    id: buffer_id.into(),
1555                    version: language::proto::serialize_version(&buffer.version),
1556                });
1557
1558                let operations = buffer.serialize_ops(Some(remote_version), cx);
1559                let client = client.clone();
1560                if let Some(file) = buffer.file() {
1561                    client
1562                        .send(proto::UpdateBufferFile {
1563                            project_id,
1564                            buffer_id: buffer_id.into(),
1565                            file: Some(file.to_proto(cx)),
1566                        })
1567                        .log_err();
1568                }
1569
1570                client
1571                    .send(proto::UpdateDiffBase {
1572                        project_id,
1573                        buffer_id: buffer_id.into(),
1574                        diff_base: buffer.diff_base().map(ToString::to_string),
1575                    })
1576                    .log_err();
1577
1578                client
1579                    .send(proto::BufferReloaded {
1580                        project_id,
1581                        buffer_id: buffer_id.into(),
1582                        version: language::proto::serialize_version(buffer.saved_version()),
1583                        mtime: buffer.saved_mtime().map(|time| time.into()),
1584                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1585                            as i32,
1586                    })
1587                    .log_err();
1588
1589                cx.background_executor()
1590                    .spawn(
1591                        async move {
1592                            let operations = operations.await;
1593                            for chunk in split_operations(operations) {
1594                                client
1595                                    .request(proto::UpdateBuffer {
1596                                        project_id,
1597                                        buffer_id: buffer_id.into(),
1598                                        operations: chunk,
1599                                    })
1600                                    .await?;
1601                            }
1602                            anyhow::Ok(())
1603                        }
1604                        .log_err(),
1605                    )
1606                    .detach();
1607            }
1608        }
1609        Ok(response)
1610    }
1611
1612    pub fn handle_create_buffer_for_peer(
1613        &mut self,
1614        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1615        replica_id: u16,
1616        capability: Capability,
1617        cx: &mut ModelContext<Self>,
1618    ) -> Result<()> {
1619        let Some(remote) = self.state.as_remote() else {
1620            return Err(anyhow!("buffer store is not a remote"));
1621        };
1622
1623        if let Some(buffer) = remote.update(cx, |remote, cx| {
1624            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)
1625        })? {
1626            self.add_buffer(buffer, cx)?;
1627        }
1628
1629        Ok(())
1630    }
1631
1632    pub async fn handle_update_buffer_file(
1633        this: Model<Self>,
1634        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1635        mut cx: AsyncAppContext,
1636    ) -> Result<()> {
1637        let buffer_id = envelope.payload.buffer_id;
1638        let buffer_id = BufferId::new(buffer_id)?;
1639
1640        this.update(&mut cx, |this, cx| {
1641            let payload = envelope.payload.clone();
1642            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1643                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1644                let worktree = this
1645                    .worktree_store
1646                    .read(cx)
1647                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1648                    .ok_or_else(|| anyhow!("no such worktree"))?;
1649                let file = File::from_proto(file, worktree, cx)?;
1650                let old_file = buffer.update(cx, |buffer, cx| {
1651                    let old_file = buffer.file().cloned();
1652                    let new_path = file.path.clone();
1653                    buffer.file_updated(Arc::new(file), cx);
1654                    if old_file
1655                        .as_ref()
1656                        .map_or(true, |old| *old.path() != new_path)
1657                    {
1658                        Some(old_file)
1659                    } else {
1660                        None
1661                    }
1662                });
1663                if let Some(old_file) = old_file {
1664                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1665                }
1666            }
1667            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1668                downstream_client
1669                    .send(proto::UpdateBufferFile {
1670                        project_id: *project_id,
1671                        buffer_id: buffer_id.into(),
1672                        file: envelope.payload.file,
1673                    })
1674                    .log_err();
1675            }
1676            Ok(())
1677        })?
1678    }
1679
1680    pub async fn handle_update_diff_base(
1681        this: Model<Self>,
1682        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1683        mut cx: AsyncAppContext,
1684    ) -> Result<()> {
1685        this.update(&mut cx, |this, cx| {
1686            let buffer_id = envelope.payload.buffer_id;
1687            let buffer_id = BufferId::new(buffer_id)?;
1688            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1689                buffer.update(cx, |buffer, cx| {
1690                    buffer.set_diff_base(envelope.payload.diff_base.clone(), cx)
1691                });
1692            }
1693            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1694                downstream_client
1695                    .send(proto::UpdateDiffBase {
1696                        project_id: *project_id,
1697                        buffer_id: buffer_id.into(),
1698                        diff_base: envelope.payload.diff_base,
1699                    })
1700                    .log_err();
1701            }
1702            Ok(())
1703        })?
1704    }
1705
1706    pub async fn handle_save_buffer(
1707        this: Model<Self>,
1708        envelope: TypedEnvelope<proto::SaveBuffer>,
1709        mut cx: AsyncAppContext,
1710    ) -> Result<proto::BufferSaved> {
1711        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1712        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1713            anyhow::Ok((
1714                this.get_existing(buffer_id)?,
1715                this.downstream_client
1716                    .as_ref()
1717                    .map(|(_, project_id)| *project_id)
1718                    .context("project is not shared")?,
1719            ))
1720        })??;
1721        buffer
1722            .update(&mut cx, |buffer, _| {
1723                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1724            })?
1725            .await?;
1726        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1727
1728        if let Some(new_path) = envelope.payload.new_path {
1729            let new_path = ProjectPath::from_proto(new_path);
1730            this.update(&mut cx, |this, cx| {
1731                this.save_buffer_as(buffer.clone(), new_path, cx)
1732            })?
1733            .await?;
1734        } else {
1735            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1736                .await?;
1737        }
1738
1739        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1740            project_id,
1741            buffer_id: buffer_id.into(),
1742            version: serialize_version(buffer.saved_version()),
1743            mtime: buffer.saved_mtime().map(|time| time.into()),
1744        })
1745    }
1746
1747    pub async fn handle_close_buffer(
1748        this: Model<Self>,
1749        envelope: TypedEnvelope<proto::CloseBuffer>,
1750        mut cx: AsyncAppContext,
1751    ) -> Result<()> {
1752        let peer_id = envelope.sender_id;
1753        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1754        this.update(&mut cx, |this, _| {
1755            if let Some(buffer) = this.get(buffer_id) {
1756                if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1757                    if shared.remove(&buffer) {
1758                        if shared.is_empty() {
1759                            this.shared_buffers.remove(&peer_id);
1760                        }
1761                        return;
1762                    }
1763                }
1764            };
1765            debug_panic!(
1766                "peer_id {} closed buffer_id {} which was either not open or already closed",
1767                peer_id,
1768                buffer_id
1769            )
1770        })
1771    }
1772
1773    pub async fn handle_buffer_saved(
1774        this: Model<Self>,
1775        envelope: TypedEnvelope<proto::BufferSaved>,
1776        mut cx: AsyncAppContext,
1777    ) -> Result<()> {
1778        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1779        let version = deserialize_version(&envelope.payload.version);
1780        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1781        this.update(&mut cx, move |this, cx| {
1782            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1783                buffer.update(cx, |buffer, cx| {
1784                    buffer.did_save(version, mtime, cx);
1785                });
1786            }
1787
1788            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1789                downstream_client
1790                    .send(proto::BufferSaved {
1791                        project_id: *project_id,
1792                        buffer_id: buffer_id.into(),
1793                        mtime: envelope.payload.mtime,
1794                        version: envelope.payload.version,
1795                    })
1796                    .log_err();
1797            }
1798        })
1799    }
1800
1801    pub async fn handle_buffer_reloaded(
1802        this: Model<Self>,
1803        envelope: TypedEnvelope<proto::BufferReloaded>,
1804        mut cx: AsyncAppContext,
1805    ) -> Result<()> {
1806        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1807        let version = deserialize_version(&envelope.payload.version);
1808        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1809        let line_ending = deserialize_line_ending(
1810            proto::LineEnding::from_i32(envelope.payload.line_ending)
1811                .ok_or_else(|| anyhow!("missing line ending"))?,
1812        );
1813        this.update(&mut cx, |this, cx| {
1814            if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) {
1815                buffer.update(cx, |buffer, cx| {
1816                    buffer.did_reload(version, line_ending, mtime, cx);
1817                });
1818            }
1819
1820            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1821                downstream_client
1822                    .send(proto::BufferReloaded {
1823                        project_id: *project_id,
1824                        buffer_id: buffer_id.into(),
1825                        mtime: envelope.payload.mtime,
1826                        version: envelope.payload.version,
1827                        line_ending: envelope.payload.line_ending,
1828                    })
1829                    .log_err();
1830            }
1831        })
1832    }
1833
1834    pub async fn handle_blame_buffer(
1835        this: Model<Self>,
1836        envelope: TypedEnvelope<proto::BlameBuffer>,
1837        mut cx: AsyncAppContext,
1838    ) -> Result<proto::BlameBufferResponse> {
1839        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1840        let version = deserialize_version(&envelope.payload.version);
1841        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1842        buffer
1843            .update(&mut cx, |buffer, _| {
1844                buffer.wait_for_version(version.clone())
1845            })?
1846            .await?;
1847        let blame = this
1848            .update(&mut cx, |this, cx| {
1849                this.blame_buffer(&buffer, Some(version), cx)
1850            })?
1851            .await?;
1852        Ok(serialize_blame_buffer_response(blame))
1853    }
1854
1855    pub async fn handle_get_permalink_to_line(
1856        this: Model<Self>,
1857        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
1858        mut cx: AsyncAppContext,
1859    ) -> Result<proto::GetPermalinkToLineResponse> {
1860        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1861        // let version = deserialize_version(&envelope.payload.version);
1862        let selection = {
1863            let proto_selection = envelope
1864                .payload
1865                .selection
1866                .context("no selection to get permalink for defined")?;
1867            proto_selection.start as u32..proto_selection.end as u32
1868        };
1869        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1870        let permalink = this
1871            .update(&mut cx, |this, cx| {
1872                this.get_permalink_to_line(&buffer, selection, cx)
1873            })?
1874            .await?;
1875        Ok(proto::GetPermalinkToLineResponse {
1876            permalink: permalink.to_string(),
1877        })
1878    }
1879
1880    pub async fn wait_for_loading_buffer(
1881        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1882    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1883        loop {
1884            if let Some(result) = receiver.borrow().as_ref() {
1885                match result {
1886                    Ok(buffer) => return Ok(buffer.to_owned()),
1887                    Err(e) => return Err(e.to_owned()),
1888                }
1889            }
1890            receiver.next().await;
1891        }
1892    }
1893
1894    pub fn reload_buffers(
1895        &self,
1896        buffers: HashSet<Model<Buffer>>,
1897        push_to_history: bool,
1898        cx: &mut ModelContext<Self>,
1899    ) -> Task<Result<ProjectTransaction>> {
1900        if buffers.is_empty() {
1901            return Task::ready(Ok(ProjectTransaction::default()));
1902        }
1903
1904        self.state.reload_buffers(buffers, push_to_history, cx)
1905    }
1906
1907    async fn handle_reload_buffers(
1908        this: Model<Self>,
1909        envelope: TypedEnvelope<proto::ReloadBuffers>,
1910        mut cx: AsyncAppContext,
1911    ) -> Result<proto::ReloadBuffersResponse> {
1912        let sender_id = envelope.original_sender_id().unwrap_or_default();
1913        let reload = this.update(&mut cx, |this, cx| {
1914            let mut buffers = HashSet::default();
1915            for buffer_id in &envelope.payload.buffer_ids {
1916                let buffer_id = BufferId::new(*buffer_id)?;
1917                buffers.insert(this.get_existing(buffer_id)?);
1918            }
1919            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1920        })??;
1921
1922        let project_transaction = reload.await?;
1923        let project_transaction = this.update(&mut cx, |this, cx| {
1924            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1925        })?;
1926        Ok(proto::ReloadBuffersResponse {
1927            transaction: Some(project_transaction),
1928        })
1929    }
1930
1931    pub fn create_buffer_for_peer(
1932        &mut self,
1933        buffer: &Model<Buffer>,
1934        peer_id: proto::PeerId,
1935        cx: &mut ModelContext<Self>,
1936    ) -> Task<Result<()>> {
1937        let buffer_id = buffer.read(cx).remote_id();
1938        if !self
1939            .shared_buffers
1940            .entry(peer_id)
1941            .or_default()
1942            .insert(buffer.clone())
1943        {
1944            return Task::ready(Ok(()));
1945        }
1946
1947        let Some((client, project_id)) = self.downstream_client.clone() else {
1948            return Task::ready(Ok(()));
1949        };
1950
1951        cx.spawn(|this, mut cx| async move {
1952            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1953                return anyhow::Ok(());
1954            };
1955
1956            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1957            let operations = operations.await;
1958            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1959
1960            let initial_state = proto::CreateBufferForPeer {
1961                project_id,
1962                peer_id: Some(peer_id),
1963                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1964            };
1965
1966            if client.send(initial_state).log_err().is_some() {
1967                let client = client.clone();
1968                cx.background_executor()
1969                    .spawn(async move {
1970                        let mut chunks = split_operations(operations).peekable();
1971                        while let Some(chunk) = chunks.next() {
1972                            let is_last = chunks.peek().is_none();
1973                            client.send(proto::CreateBufferForPeer {
1974                                project_id,
1975                                peer_id: Some(peer_id),
1976                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1977                                    proto::BufferChunk {
1978                                        buffer_id: buffer_id.into(),
1979                                        operations: chunk,
1980                                        is_last,
1981                                    },
1982                                )),
1983                            })?;
1984                        }
1985                        anyhow::Ok(())
1986                    })
1987                    .await
1988                    .log_err();
1989            }
1990            Ok(())
1991        })
1992    }
1993
1994    pub fn forget_shared_buffers(&mut self) {
1995        self.shared_buffers.clear();
1996    }
1997
1998    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1999        self.shared_buffers.remove(peer_id);
2000    }
2001
2002    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
2003        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
2004            self.shared_buffers.insert(new_peer_id, buffers);
2005        }
2006    }
2007
2008    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
2009        &self.shared_buffers
2010    }
2011
2012    pub fn create_local_buffer(
2013        &mut self,
2014        text: &str,
2015        language: Option<Arc<Language>>,
2016        cx: &mut ModelContext<Self>,
2017    ) -> Model<Buffer> {
2018        let buffer = cx.new_model(|cx| {
2019            Buffer::local(text, cx)
2020                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
2021        });
2022
2023        self.add_buffer(buffer.clone(), cx).log_err();
2024        let buffer_id = buffer.read(cx).remote_id();
2025
2026        let local = self
2027            .state
2028            .as_local()
2029            .expect("local-only method called in a non-local context");
2030        local.update(cx, |this, cx| {
2031            if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
2032                this.local_buffer_ids_by_path.insert(
2033                    ProjectPath {
2034                        worktree_id: file.worktree_id(cx),
2035                        path: file.path.clone(),
2036                    },
2037                    buffer_id,
2038                );
2039
2040                if let Some(entry_id) = file.entry_id {
2041                    this.local_buffer_ids_by_entry_id
2042                        .insert(entry_id, buffer_id);
2043                }
2044            }
2045        });
2046        buffer
2047    }
2048
2049    pub fn deserialize_project_transaction(
2050        &mut self,
2051        message: proto::ProjectTransaction,
2052        push_to_history: bool,
2053        cx: &mut ModelContext<Self>,
2054    ) -> Task<Result<ProjectTransaction>> {
2055        if let Some(remote) = self.state.as_remote() {
2056            remote.update(cx, |remote, cx| {
2057                remote.deserialize_project_transaction(message, push_to_history, cx)
2058            })
2059        } else {
2060            debug_panic!("not a remote buffer store");
2061            Task::ready(Err(anyhow!("not a remote buffer store")))
2062        }
2063    }
2064
2065    pub fn wait_for_remote_buffer(
2066        &self,
2067        id: BufferId,
2068        cx: &mut AppContext,
2069    ) -> Task<Result<Model<Buffer>>> {
2070        if let Some(remote) = self.state.as_remote() {
2071            remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx))
2072        } else {
2073            debug_panic!("not a remote buffer store");
2074            Task::ready(Err(anyhow!("not a remote buffer store")))
2075        }
2076    }
2077
2078    pub fn serialize_project_transaction_for_peer(
2079        &mut self,
2080        project_transaction: ProjectTransaction,
2081        peer_id: proto::PeerId,
2082        cx: &mut ModelContext<Self>,
2083    ) -> proto::ProjectTransaction {
2084        let mut serialized_transaction = proto::ProjectTransaction {
2085            buffer_ids: Default::default(),
2086            transactions: Default::default(),
2087        };
2088        for (buffer, transaction) in project_transaction.0 {
2089            self.create_buffer_for_peer(&buffer, peer_id, cx)
2090                .detach_and_log_err(cx);
2091            serialized_transaction
2092                .buffer_ids
2093                .push(buffer.read(cx).remote_id().into());
2094            serialized_transaction
2095                .transactions
2096                .push(language::proto::serialize_transaction(&transaction));
2097        }
2098        serialized_transaction
2099    }
2100}
2101
2102impl OpenBuffer {
2103    fn upgrade(&self) -> Option<Model<Buffer>> {
2104        match self {
2105            OpenBuffer::Buffer(handle) => handle.upgrade(),
2106            OpenBuffer::Operations(_) => None,
2107        }
2108    }
2109}
2110
2111fn is_not_found_error(error: &anyhow::Error) -> bool {
2112    error
2113        .root_cause()
2114        .downcast_ref::<io::Error>()
2115        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
2116}
2117
2118fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
2119    let Some(blame) = blame else {
2120        return proto::BlameBufferResponse {
2121            blame_response: None,
2122        };
2123    };
2124
2125    let entries = blame
2126        .entries
2127        .into_iter()
2128        .map(|entry| proto::BlameEntry {
2129            sha: entry.sha.as_bytes().into(),
2130            start_line: entry.range.start,
2131            end_line: entry.range.end,
2132            original_line_number: entry.original_line_number,
2133            author: entry.author.clone(),
2134            author_mail: entry.author_mail.clone(),
2135            author_time: entry.author_time,
2136            author_tz: entry.author_tz.clone(),
2137            committer: entry.committer.clone(),
2138            committer_mail: entry.committer_mail.clone(),
2139            committer_time: entry.committer_time,
2140            committer_tz: entry.committer_tz.clone(),
2141            summary: entry.summary.clone(),
2142            previous: entry.previous.clone(),
2143            filename: entry.filename.clone(),
2144        })
2145        .collect::<Vec<_>>();
2146
2147    let messages = blame
2148        .messages
2149        .into_iter()
2150        .map(|(oid, message)| proto::CommitMessage {
2151            oid: oid.as_bytes().into(),
2152            message,
2153        })
2154        .collect::<Vec<_>>();
2155
2156    let permalinks = blame
2157        .permalinks
2158        .into_iter()
2159        .map(|(oid, url)| proto::CommitPermalink {
2160            oid: oid.as_bytes().into(),
2161            permalink: url.to_string(),
2162        })
2163        .collect::<Vec<_>>();
2164
2165    proto::BlameBufferResponse {
2166        blame_response: Some(proto::blame_buffer_response::BlameResponse {
2167            entries,
2168            messages,
2169            permalinks,
2170            remote_url: blame.remote_url,
2171        }),
2172    }
2173}
2174
2175fn deserialize_blame_buffer_response(
2176    response: proto::BlameBufferResponse,
2177) -> Option<git::blame::Blame> {
2178    let response = response.blame_response?;
2179    let entries = response
2180        .entries
2181        .into_iter()
2182        .filter_map(|entry| {
2183            Some(git::blame::BlameEntry {
2184                sha: git::Oid::from_bytes(&entry.sha).ok()?,
2185                range: entry.start_line..entry.end_line,
2186                original_line_number: entry.original_line_number,
2187                committer: entry.committer,
2188                committer_time: entry.committer_time,
2189                committer_tz: entry.committer_tz,
2190                committer_mail: entry.committer_mail,
2191                author: entry.author,
2192                author_mail: entry.author_mail,
2193                author_time: entry.author_time,
2194                author_tz: entry.author_tz,
2195                summary: entry.summary,
2196                previous: entry.previous,
2197                filename: entry.filename,
2198            })
2199        })
2200        .collect::<Vec<_>>();
2201
2202    let messages = response
2203        .messages
2204        .into_iter()
2205        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
2206        .collect::<HashMap<_, _>>();
2207
2208    let permalinks = response
2209        .permalinks
2210        .into_iter()
2211        .filter_map(|permalink| {
2212            Some((
2213                git::Oid::from_bytes(&permalink.oid).ok()?,
2214                Url::from_str(&permalink.permalink).ok()?,
2215            ))
2216        })
2217        .collect::<HashMap<_, _>>();
2218
2219    Some(Blame {
2220        entries,
2221        permalinks,
2222        messages,
2223        remote_url: response.remote_url,
2224    })
2225}