buffer_store.rs

   1use crate::{
   2    ProjectPath,
   3    lsp_store::OpenLspBufferHandle,
   4    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   5};
   6use anyhow::{Context as _, Result, anyhow};
   7use client::Client;
   8use collections::{HashMap, HashSet, hash_map};
   9use futures::{Future, FutureExt as _, channel::oneshot, future::Shared};
  10use gpui::{
  11    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, TaskExt,
  12    WeakEntity,
  13};
  14use language::{
  15    Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
  16    proto::{
  17        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  18        split_operations,
  19    },
  20};
  21use rpc::{
  22    AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope,
  23    proto::{self, PeerId},
  24};
  25
  26use settings::Settings;
  27use std::{io, sync::Arc, time::Instant};
  28use text::{BufferId, ReplicaId};
  29use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath};
  30use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
  31
  32/// A set of open buffers.
  33pub struct BufferStore {
  34    state: BufferStoreState,
  35    #[allow(clippy::type_complexity)]
  36    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
  37    worktree_store: Entity<WorktreeStore>,
  38    opened_buffers: HashMap<BufferId, OpenBuffer>,
  39    path_to_buffer_id: HashMap<ProjectPath, BufferId>,
  40    downstream_client: Option<(AnyProtoClient, u64)>,
  41    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  42    non_searchable_buffers: HashSet<BufferId>,
  43    project_search: RemoteProjectSearchState,
  44}
  45
  46#[derive(Default)]
  47struct RemoteProjectSearchState {
  48    // List of ongoing project search chunks from our remote host. Used by the side issuing a search RPC request.
  49    chunks: HashMap<u64, smol::channel::Sender<BufferId>>,
  50    // Monotonously-increasing handle to hand out to remote host in order to identify the project search result chunk.
  51    next_id: u64,
  52    // Used by the side running the actual search for match candidates to potentially cancel the search prematurely.
  53    searches_in_progress: HashMap<(PeerId, u64), Task<Result<()>>>,
  54}
  55
  56#[derive(Hash, Eq, PartialEq, Clone)]
  57struct SharedBuffer {
  58    buffer: Entity<Buffer>,
  59    lsp_handle: Option<OpenLspBufferHandle>,
  60}
  61
  62enum BufferStoreState {
  63    Local(LocalBufferStore),
  64    Remote(RemoteBufferStore),
  65}
  66
  67struct RemoteBufferStore {
  68    shared_with_me: HashSet<Entity<Buffer>>,
  69    upstream_client: AnyProtoClient,
  70    project_id: u64,
  71    loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
  72    remote_buffer_listeners:
  73        HashMap<BufferId, Vec<oneshot::Sender<anyhow::Result<Entity<Buffer>>>>>,
  74    worktree_store: Entity<WorktreeStore>,
  75}
  76
  77struct LocalBufferStore {
  78    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  79    worktree_store: Entity<WorktreeStore>,
  80    _subscription: Subscription,
  81}
  82
  83enum OpenBuffer {
  84    Complete { buffer: WeakEntity<Buffer> },
  85    Operations(Vec<Operation>),
  86}
  87
  88pub enum BufferStoreEvent {
  89    BufferAdded(Entity<Buffer>),
  90    SharedBufferClosed(proto::PeerId, BufferId),
  91    BufferDropped(BufferId),
  92    BufferChangedFilePath {
  93        buffer: Entity<Buffer>,
  94        old_file: Option<Arc<dyn language::File>>,
  95    },
  96}
  97
  98#[derive(Default, Debug, Clone)]
  99pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
 100
 101impl PartialEq for ProjectTransaction {
 102    fn eq(&self, other: &Self) -> bool {
 103        self.0.len() == other.0.len()
 104            && self.0.iter().all(|(buffer, transaction)| {
 105                other.0.get(buffer).is_some_and(|t| t.id == transaction.id)
 106            })
 107    }
 108}
 109
 110impl EventEmitter<BufferStoreEvent> for BufferStore {}
 111
 112impl RemoteBufferStore {
 113    pub fn wait_for_remote_buffer(
 114        &mut self,
 115        id: BufferId,
 116        cx: &mut Context<BufferStore>,
 117    ) -> Task<Result<Entity<Buffer>>> {
 118        let (tx, rx) = oneshot::channel();
 119        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 120
 121        cx.spawn(async move |this, cx| {
 122            if let Some(buffer) = this
 123                .read_with(cx, |buffer_store, _| buffer_store.get(id))
 124                .ok()
 125                .flatten()
 126            {
 127                return Ok(buffer);
 128            }
 129
 130            cx.background_spawn(async move { rx.await? }).await
 131        })
 132    }
 133
 134    fn save_remote_buffer(
 135        &self,
 136        buffer_handle: Entity<Buffer>,
 137        new_path: Option<proto::ProjectPath>,
 138        cx: &Context<BufferStore>,
 139    ) -> Task<Result<()>> {
 140        let buffer = buffer_handle.read(cx);
 141        let buffer_id = buffer.remote_id().into();
 142        let version = buffer.version();
 143        let rpc = self.upstream_client.clone();
 144        let project_id = self.project_id;
 145        cx.spawn(async move |_, cx| {
 146            let response = rpc
 147                .request(proto::SaveBuffer {
 148                    project_id,
 149                    buffer_id,
 150                    new_path,
 151                    version: serialize_version(&version),
 152                })
 153                .await?;
 154            let version = deserialize_version(&response.version);
 155            let mtime = response.mtime.map(|mtime| mtime.into());
 156
 157            buffer_handle.update(cx, |buffer, cx| {
 158                buffer.did_save(version.clone(), mtime, cx);
 159            });
 160
 161            Ok(())
 162        })
 163    }
 164
 165    pub fn handle_create_buffer_for_peer(
 166        &mut self,
 167        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 168        replica_id: ReplicaId,
 169        capability: Capability,
 170        cx: &mut Context<BufferStore>,
 171    ) -> Result<Option<Entity<Buffer>>> {
 172        match envelope.payload.variant.context("missing variant")? {
 173            proto::create_buffer_for_peer::Variant::State(mut state) => {
 174                let buffer_id = BufferId::new(state.id)?;
 175
 176                let buffer_result = maybe!({
 177                    let mut buffer_file = None;
 178                    if let Some(file) = state.file.take() {
 179                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 180                        let worktree = self
 181                            .worktree_store
 182                            .read(cx)
 183                            .worktree_for_id(worktree_id, cx)
 184                            .with_context(|| {
 185                                format!("no worktree found for id {}", file.worktree_id)
 186                            })?;
 187                        buffer_file = Some(Arc::new(File::from_proto(file, worktree, cx)?)
 188                            as Arc<dyn language::File>);
 189                    }
 190                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 191                });
 192
 193                match buffer_result {
 194                    Ok(buffer) => {
 195                        let buffer = cx.new(|_| buffer);
 196                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 197                    }
 198                    Err(error) => {
 199                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 200                            for listener in listeners {
 201                                listener.send(Err(anyhow!(error.cloned()))).ok();
 202                            }
 203                        }
 204                    }
 205                }
 206            }
 207            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 208                let buffer_id = BufferId::new(chunk.buffer_id)?;
 209                let buffer = self
 210                    .loading_remote_buffers_by_id
 211                    .get(&buffer_id)
 212                    .cloned()
 213                    .with_context(|| {
 214                        format!(
 215                            "received chunk for buffer {} without initial state",
 216                            chunk.buffer_id
 217                        )
 218                    })?;
 219
 220                let result = maybe!({
 221                    let operations = chunk
 222                        .operations
 223                        .into_iter()
 224                        .map(language::proto::deserialize_operation)
 225                        .collect::<Result<Vec<_>>>()?;
 226                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 227                    anyhow::Ok(())
 228                });
 229
 230                if let Err(error) = result {
 231                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 232                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 233                        for listener in listeners {
 234                            listener.send(Err(error.cloned())).ok();
 235                        }
 236                    }
 237                } else if chunk.is_last {
 238                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 239                    if self.upstream_client.is_via_collab() {
 240                        // retain buffers sent by peers to avoid races.
 241                        self.shared_with_me.insert(buffer.clone());
 242                    }
 243
 244                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 245                        for sender in senders {
 246                            sender.send(Ok(buffer.clone())).ok();
 247                        }
 248                    }
 249                    return Ok(Some(buffer));
 250                }
 251            }
 252        }
 253        Ok(None)
 254    }
 255
 256    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 257        self.loading_remote_buffers_by_id
 258            .keys()
 259            .copied()
 260            .collect::<Vec<_>>()
 261    }
 262
 263    pub fn deserialize_project_transaction(
 264        &self,
 265        message: proto::ProjectTransaction,
 266        push_to_history: bool,
 267        cx: &mut Context<BufferStore>,
 268    ) -> Task<Result<ProjectTransaction>> {
 269        cx.spawn(async move |this, cx| {
 270            let mut project_transaction = ProjectTransaction::default();
 271            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 272            {
 273                let buffer_id = BufferId::new(buffer_id)?;
 274                let buffer = this
 275                    .update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
 276                    .await?;
 277                let transaction = language::proto::deserialize_transaction(transaction)?;
 278                project_transaction.0.insert(buffer, transaction);
 279            }
 280
 281            for (buffer, transaction) in &project_transaction.0 {
 282                buffer
 283                    .update(cx, |buffer, _| {
 284                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 285                    })
 286                    .await?;
 287
 288                if push_to_history {
 289                    buffer.update(cx, |buffer, _| {
 290                        buffer.push_transaction(transaction.clone(), Instant::now());
 291                        buffer.finalize_last_transaction();
 292                    });
 293                }
 294            }
 295
 296            Ok(project_transaction)
 297        })
 298    }
 299
 300    fn open_buffer(
 301        &self,
 302        path: Arc<RelPath>,
 303        worktree: Entity<Worktree>,
 304        cx: &mut Context<BufferStore>,
 305    ) -> Task<Result<Entity<Buffer>>> {
 306        let worktree_id = worktree.read(cx).id().to_proto();
 307        let project_id = self.project_id;
 308        let client = self.upstream_client.clone();
 309        cx.spawn(async move |this, cx| {
 310            let response = client
 311                .request(proto::OpenBufferByPath {
 312                    project_id,
 313                    worktree_id,
 314                    path: path.to_proto(),
 315                })
 316                .await?;
 317            let buffer_id = BufferId::new(response.buffer_id)?;
 318
 319            let buffer = this
 320                .update(cx, {
 321                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 322                })?
 323                .await?;
 324
 325            Ok(buffer)
 326        })
 327    }
 328
 329    fn create_buffer(
 330        &self,
 331        language: Option<Arc<Language>>,
 332        project_searchable: bool,
 333        cx: &mut Context<BufferStore>,
 334    ) -> Task<Result<Entity<Buffer>>> {
 335        let create = self.upstream_client.request(proto::OpenNewBuffer {
 336            project_id: self.project_id,
 337        });
 338        cx.spawn(async move |this, cx| {
 339            let response = create.await?;
 340            let buffer_id = BufferId::new(response.buffer_id)?;
 341
 342            let buffer = this
 343                .update(cx, |this, cx| {
 344                    if !project_searchable {
 345                        this.non_searchable_buffers.insert(buffer_id);
 346                    }
 347                    this.wait_for_remote_buffer(buffer_id, cx)
 348                })?
 349                .await?;
 350            if let Some(language) = language {
 351                buffer.update(cx, |buffer, cx| {
 352                    buffer.set_language(Some(language), cx);
 353                });
 354            }
 355            Ok(buffer)
 356        })
 357    }
 358
 359    fn reload_buffers(
 360        &self,
 361        buffers: HashSet<Entity<Buffer>>,
 362        push_to_history: bool,
 363        cx: &mut Context<BufferStore>,
 364    ) -> Task<Result<ProjectTransaction>> {
 365        let request = self.upstream_client.request(proto::ReloadBuffers {
 366            project_id: self.project_id,
 367            buffer_ids: buffers
 368                .iter()
 369                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 370                .collect(),
 371        });
 372
 373        cx.spawn(async move |this, cx| {
 374            let response = request.await?.transaction.context("missing transaction")?;
 375            this.update(cx, |this, cx| {
 376                this.deserialize_project_transaction(response, push_to_history, cx)
 377            })?
 378            .await
 379        })
 380    }
 381}
 382
 383impl LocalBufferStore {
 384    fn save_local_buffer(
 385        &self,
 386        buffer_handle: Entity<Buffer>,
 387        worktree: Entity<Worktree>,
 388        path: Arc<RelPath>,
 389        mut has_changed_file: bool,
 390        cx: &mut Context<BufferStore>,
 391    ) -> Task<Result<()>> {
 392        let buffer = buffer_handle.read(cx);
 393
 394        let text = buffer.as_rope().clone();
 395        let line_ending = buffer.line_ending();
 396        let encoding = buffer.encoding();
 397        let has_bom = buffer.has_bom();
 398        let version = buffer.version();
 399        let buffer_id = buffer.remote_id();
 400        let file = buffer.file().cloned();
 401        if file
 402            .as_ref()
 403            .is_some_and(|file| file.disk_state() == DiskState::New)
 404        {
 405            has_changed_file = true;
 406        }
 407
 408        let save = worktree.update(cx, |worktree, cx| {
 409            worktree.write_file(path, text, line_ending, encoding, has_bom, cx)
 410        });
 411
 412        cx.spawn(async move |this, cx| {
 413            let new_file = save.await?;
 414            let mtime = new_file.disk_state().mtime();
 415            this.update(cx, |this, cx| {
 416                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 417                    if has_changed_file {
 418                        downstream_client
 419                            .send(proto::UpdateBufferFile {
 420                                project_id,
 421                                buffer_id: buffer_id.to_proto(),
 422                                file: Some(language::File::to_proto(&*new_file, cx)),
 423                            })
 424                            .log_err();
 425                    }
 426                    downstream_client
 427                        .send(proto::BufferSaved {
 428                            project_id,
 429                            buffer_id: buffer_id.to_proto(),
 430                            version: serialize_version(&version),
 431                            mtime: mtime.map(|time| time.into()),
 432                        })
 433                        .log_err();
 434                }
 435            })?;
 436            buffer_handle.update(cx, |buffer, cx| {
 437                if has_changed_file {
 438                    buffer.file_updated(new_file, cx);
 439                }
 440                buffer.did_save(version.clone(), mtime, cx);
 441            });
 442            Ok(())
 443        })
 444    }
 445
 446    fn subscribe_to_worktree(
 447        &mut self,
 448        worktree: &Entity<Worktree>,
 449        cx: &mut Context<BufferStore>,
 450    ) {
 451        cx.subscribe(worktree, |this, worktree, event, cx| {
 452            if worktree.read(cx).is_local()
 453                && let worktree::Event::UpdatedEntries(changes) = event
 454            {
 455                Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 456            }
 457        })
 458        .detach();
 459    }
 460
 461    fn local_worktree_entries_changed(
 462        this: &mut BufferStore,
 463        worktree_handle: &Entity<Worktree>,
 464        changes: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
 465        cx: &mut Context<BufferStore>,
 466    ) {
 467        let snapshot = worktree_handle.read(cx).snapshot();
 468        for (path, entry_id, _) in changes {
 469            Self::local_worktree_entry_changed(
 470                this,
 471                *entry_id,
 472                path,
 473                worktree_handle,
 474                &snapshot,
 475                cx,
 476            );
 477        }
 478    }
 479
 480    fn local_worktree_entry_changed(
 481        this: &mut BufferStore,
 482        entry_id: ProjectEntryId,
 483        path: &Arc<RelPath>,
 484        worktree: &Entity<worktree::Worktree>,
 485        snapshot: &worktree::Snapshot,
 486        cx: &mut Context<BufferStore>,
 487    ) -> Option<()> {
 488        let project_path = ProjectPath {
 489            worktree_id: snapshot.id(),
 490            path: path.clone(),
 491        };
 492
 493        let buffer_id = this
 494            .as_local_mut()
 495            .and_then(|local| local.local_buffer_ids_by_entry_id.get(&entry_id))
 496            .copied()
 497            .or_else(|| this.path_to_buffer_id.get(&project_path).copied())?;
 498
 499        let buffer = if let Some(buffer) = this.get(buffer_id) {
 500            Some(buffer)
 501        } else {
 502            this.opened_buffers.remove(&buffer_id);
 503            this.non_searchable_buffers.remove(&buffer_id);
 504            None
 505        };
 506
 507        let buffer = if let Some(buffer) = buffer {
 508            buffer
 509        } else {
 510            this.path_to_buffer_id.remove(&project_path);
 511            let this = this.as_local_mut()?;
 512            this.local_buffer_ids_by_entry_id.remove(&entry_id);
 513            return None;
 514        };
 515
 516        let events = buffer.update(cx, |buffer, cx| {
 517            let file = buffer.file()?;
 518            let old_file = File::from_dyn(Some(file))?;
 519            if old_file.worktree != *worktree {
 520                return None;
 521            }
 522
 523            let snapshot_entry = old_file
 524                .entry_id
 525                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 526                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
 527
 528            let new_file = if let Some(entry) = snapshot_entry {
 529                File {
 530                    disk_state: match entry.mtime {
 531                        Some(mtime) => DiskState::Present {
 532                            mtime,
 533                            size: entry.size,
 534                        },
 535                        None => old_file.disk_state,
 536                    },
 537                    is_local: true,
 538                    entry_id: Some(entry.id),
 539                    path: entry.path.clone(),
 540                    worktree: worktree.clone(),
 541                    is_private: entry.is_private,
 542                }
 543            } else {
 544                File {
 545                    disk_state: DiskState::Deleted,
 546                    is_local: true,
 547                    entry_id: old_file.entry_id,
 548                    path: old_file.path.clone(),
 549                    worktree: worktree.clone(),
 550                    is_private: old_file.is_private,
 551                }
 552            };
 553
 554            if new_file == *old_file {
 555                return None;
 556            }
 557
 558            let mut events = Vec::new();
 559            if new_file.path != old_file.path {
 560                this.path_to_buffer_id.remove(&ProjectPath {
 561                    path: old_file.path.clone(),
 562                    worktree_id: old_file.worktree_id(cx),
 563                });
 564                this.path_to_buffer_id.insert(
 565                    ProjectPath {
 566                        worktree_id: new_file.worktree_id(cx),
 567                        path: new_file.path.clone(),
 568                    },
 569                    buffer_id,
 570                );
 571                events.push(BufferStoreEvent::BufferChangedFilePath {
 572                    buffer: cx.entity(),
 573                    old_file: buffer.file().cloned(),
 574                });
 575            }
 576            let local = this.as_local_mut()?;
 577            if new_file.entry_id != old_file.entry_id {
 578                if let Some(entry_id) = old_file.entry_id {
 579                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
 580                }
 581                if let Some(entry_id) = new_file.entry_id {
 582                    local
 583                        .local_buffer_ids_by_entry_id
 584                        .insert(entry_id, buffer_id);
 585                }
 586            }
 587
 588            if let Some((client, project_id)) = &this.downstream_client {
 589                client
 590                    .send(proto::UpdateBufferFile {
 591                        project_id: *project_id,
 592                        buffer_id: buffer_id.to_proto(),
 593                        file: Some(new_file.to_proto(cx)),
 594                    })
 595                    .ok();
 596            }
 597
 598            buffer.file_updated(Arc::new(new_file), cx);
 599            Some(events)
 600        })?;
 601
 602        for event in events {
 603            cx.emit(event);
 604        }
 605
 606        None
 607    }
 608
 609    fn save_buffer(
 610        &self,
 611        buffer: Entity<Buffer>,
 612        cx: &mut Context<BufferStore>,
 613    ) -> Task<Result<()>> {
 614        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 615            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 616        };
 617        let worktree = file.worktree.clone();
 618        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 619    }
 620
 621    fn save_buffer_as(
 622        &self,
 623        buffer: Entity<Buffer>,
 624        path: ProjectPath,
 625        cx: &mut Context<BufferStore>,
 626    ) -> Task<Result<()>> {
 627        let Some(worktree) = self
 628            .worktree_store
 629            .read(cx)
 630            .worktree_for_id(path.worktree_id, cx)
 631        else {
 632            return Task::ready(Err(anyhow!("no such worktree")));
 633        };
 634        self.save_local_buffer(buffer, worktree, path.path, true, cx)
 635    }
 636
 637    #[ztracing::instrument(skip_all)]
 638    fn open_buffer(
 639        &self,
 640        path: Arc<RelPath>,
 641        worktree: Entity<Worktree>,
 642        cx: &mut Context<BufferStore>,
 643    ) -> Task<Result<Entity<Buffer>>> {
 644        let load_file = worktree.update(cx, |worktree, cx| worktree.load_file(path.as_ref(), cx));
 645        cx.spawn(async move |this, cx| {
 646            let path = path.clone();
 647            let buffer = match load_file.await {
 648                Ok(loaded) => {
 649                    let reservation = cx.reserve_entity::<Buffer>();
 650                    let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 651                    let text_buffer = cx
 652                        .background_spawn(async move {
 653                            text::Buffer::new(ReplicaId::LOCAL, buffer_id, loaded.text)
 654                        })
 655                        .await;
 656                    cx.insert_entity(reservation, |_| {
 657                        let mut buffer =
 658                            Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite);
 659                        buffer.set_encoding(loaded.encoding);
 660                        buffer.set_has_bom(loaded.has_bom);
 661                        buffer
 662                    })
 663                }
 664                Err(error) if is_not_found_error(&error) => cx.new(|cx| {
 665                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 666                    let text_buffer = text::Buffer::new(ReplicaId::LOCAL, buffer_id, "");
 667                    Buffer::build(
 668                        text_buffer,
 669                        Some(Arc::new(File {
 670                            worktree,
 671                            path,
 672                            disk_state: DiskState::New,
 673                            entry_id: None,
 674                            is_local: true,
 675                            is_private: false,
 676                        })),
 677                        Capability::ReadWrite,
 678                    )
 679                }),
 680                Err(e) => return Err(e),
 681            };
 682            this.update(cx, |this, cx| {
 683                this.add_buffer(buffer.clone(), cx)?;
 684                let buffer_id = buffer.read(cx).remote_id();
 685                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 686                    let project_path = ProjectPath {
 687                        worktree_id: file.worktree_id(cx),
 688                        path: file.path.clone(),
 689                    };
 690                    let entry_id = file.entry_id;
 691
 692                    // Check if the file should be read-only based on settings
 693                    let settings = WorktreeSettings::get(Some((&project_path).into()), cx);
 694                    let is_read_only = if project_path.path.is_empty() {
 695                        settings.is_std_path_read_only(&file.full_path(cx))
 696                    } else {
 697                        settings.is_path_read_only(&project_path.path)
 698                    };
 699                    if is_read_only {
 700                        buffer.update(cx, |buffer, cx| {
 701                            buffer.set_capability(Capability::Read, cx);
 702                        });
 703                    }
 704
 705                    this.path_to_buffer_id.insert(project_path, buffer_id);
 706                    let this = this.as_local_mut().unwrap();
 707                    if let Some(entry_id) = entry_id {
 708                        this.local_buffer_ids_by_entry_id
 709                            .insert(entry_id, buffer_id);
 710                    }
 711                }
 712
 713                anyhow::Ok(())
 714            })??;
 715
 716            Ok(buffer)
 717        })
 718    }
 719
 720    fn create_buffer(
 721        &self,
 722        language: Option<Arc<Language>>,
 723        project_searchable: bool,
 724        cx: &mut Context<BufferStore>,
 725    ) -> Task<Result<Entity<Buffer>>> {
 726        cx.spawn(async move |buffer_store, cx| {
 727            let buffer = cx.new(|cx| {
 728                Buffer::local("", cx)
 729                    .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 730            });
 731            buffer_store.update(cx, |buffer_store, cx| {
 732                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 733                if !project_searchable {
 734                    buffer_store
 735                        .non_searchable_buffers
 736                        .insert(buffer.read(cx).remote_id());
 737                }
 738            })?;
 739            Ok(buffer)
 740        })
 741    }
 742
 743    fn reload_buffers(
 744        &self,
 745        buffers: HashSet<Entity<Buffer>>,
 746        push_to_history: bool,
 747        cx: &mut Context<BufferStore>,
 748    ) -> Task<Result<ProjectTransaction>> {
 749        cx.spawn(async move |_, cx| {
 750            let mut project_transaction = ProjectTransaction::default();
 751            for buffer in buffers {
 752                let transaction = buffer.update(cx, |buffer, cx| buffer.reload(cx)).await?;
 753                buffer.update(cx, |buffer, cx| {
 754                    if let Some(transaction) = transaction {
 755                        if !push_to_history {
 756                            buffer.forget_transaction(transaction.id);
 757                        }
 758                        project_transaction.0.insert(cx.entity(), transaction);
 759                    }
 760                });
 761            }
 762
 763            Ok(project_transaction)
 764        })
 765    }
 766}
 767
 768impl BufferStore {
 769    pub fn init(client: &AnyProtoClient) {
 770        client.add_entity_message_handler(Self::handle_buffer_reloaded);
 771        client.add_entity_message_handler(Self::handle_buffer_saved);
 772        client.add_entity_message_handler(Self::handle_update_buffer_file);
 773        client.add_entity_request_handler(Self::handle_save_buffer);
 774        client.add_entity_request_handler(Self::handle_reload_buffers);
 775    }
 776
 777    /// Creates a buffer store, optionally retaining its buffers.
 778    pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
 779        Self {
 780            state: BufferStoreState::Local(LocalBufferStore {
 781                local_buffer_ids_by_entry_id: Default::default(),
 782                worktree_store: worktree_store.clone(),
 783                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
 784                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 785                        let this = this.as_local_mut().unwrap();
 786                        this.subscribe_to_worktree(worktree, cx);
 787                    }
 788                }),
 789            }),
 790            downstream_client: None,
 791            opened_buffers: Default::default(),
 792            path_to_buffer_id: Default::default(),
 793            shared_buffers: Default::default(),
 794            loading_buffers: Default::default(),
 795            non_searchable_buffers: Default::default(),
 796            worktree_store,
 797            project_search: Default::default(),
 798        }
 799    }
 800
 801    pub fn remote(
 802        worktree_store: Entity<WorktreeStore>,
 803        upstream_client: AnyProtoClient,
 804        remote_id: u64,
 805        _cx: &mut Context<Self>,
 806    ) -> Self {
 807        Self {
 808            state: BufferStoreState::Remote(RemoteBufferStore {
 809                shared_with_me: Default::default(),
 810                loading_remote_buffers_by_id: Default::default(),
 811                remote_buffer_listeners: Default::default(),
 812                project_id: remote_id,
 813                upstream_client,
 814                worktree_store: worktree_store.clone(),
 815            }),
 816            downstream_client: None,
 817            opened_buffers: Default::default(),
 818            path_to_buffer_id: Default::default(),
 819            loading_buffers: Default::default(),
 820            shared_buffers: Default::default(),
 821            non_searchable_buffers: Default::default(),
 822            worktree_store,
 823            project_search: Default::default(),
 824        }
 825    }
 826
 827    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
 828        match &mut self.state {
 829            BufferStoreState::Local(state) => Some(state),
 830            _ => None,
 831        }
 832    }
 833
 834    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
 835        match &mut self.state {
 836            BufferStoreState::Remote(state) => Some(state),
 837            _ => None,
 838        }
 839    }
 840
 841    fn as_remote(&self) -> Option<&RemoteBufferStore> {
 842        match &self.state {
 843            BufferStoreState::Remote(state) => Some(state),
 844            _ => None,
 845        }
 846    }
 847
 848    #[ztracing::instrument(skip_all)]
 849    pub fn open_buffer(
 850        &mut self,
 851        project_path: ProjectPath,
 852        cx: &mut Context<Self>,
 853    ) -> Task<Result<Entity<Buffer>>> {
 854        if let Some(buffer) = self.get_by_path(&project_path) {
 855            return Task::ready(Ok(buffer));
 856        }
 857
 858        let task = match self.loading_buffers.entry(project_path.clone()) {
 859            hash_map::Entry::Occupied(e) => e.get().clone(),
 860            hash_map::Entry::Vacant(entry) => {
 861                let path = project_path.path.clone();
 862                let Some(worktree) = self
 863                    .worktree_store
 864                    .read(cx)
 865                    .worktree_for_id(project_path.worktree_id, cx)
 866                else {
 867                    return Task::ready(Err(anyhow!("no such worktree")));
 868                };
 869                let load_buffer = match &self.state {
 870                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
 871                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
 872                };
 873
 874                entry
 875                    .insert(
 876                        cx.spawn(async move |this, cx| {
 877                            let load_result = load_buffer.await;
 878                            this.update(cx, |this, _cx| {
 879                                // Record the fact that the buffer is no longer loading.
 880                                this.loading_buffers.remove(&project_path);
 881
 882                                let buffer = load_result.map_err(Arc::new)?;
 883                                Ok(buffer)
 884                            })?
 885                        })
 886                        .shared(),
 887                    )
 888                    .clone()
 889            }
 890        };
 891
 892        cx.background_spawn(async move {
 893            task.await.map_err(|e| {
 894                if e.error_code() != ErrorCode::Internal {
 895                    anyhow!(e.error_code())
 896                } else {
 897                    anyhow!("{e}")
 898                }
 899            })
 900        })
 901    }
 902
 903    pub fn create_buffer(
 904        &mut self,
 905        language: Option<Arc<Language>>,
 906        project_searchable: bool,
 907        cx: &mut Context<Self>,
 908    ) -> Task<Result<Entity<Buffer>>> {
 909        match &self.state {
 910            BufferStoreState::Local(this) => this.create_buffer(language, project_searchable, cx),
 911            BufferStoreState::Remote(this) => this.create_buffer(language, project_searchable, cx),
 912        }
 913    }
 914
 915    pub fn save_buffer(
 916        &mut self,
 917        buffer: Entity<Buffer>,
 918        cx: &mut Context<Self>,
 919    ) -> Task<Result<()>> {
 920        match &mut self.state {
 921            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
 922            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer, None, cx),
 923        }
 924    }
 925
 926    pub fn save_buffer_as(
 927        &mut self,
 928        buffer: Entity<Buffer>,
 929        path: ProjectPath,
 930        cx: &mut Context<Self>,
 931    ) -> Task<Result<()>> {
 932        let old_file = buffer.read(cx).file().cloned();
 933        let task = match &self.state {
 934            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
 935            BufferStoreState::Remote(this) => {
 936                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
 937            }
 938        };
 939        cx.spawn(async move |this, cx| {
 940            task.await?;
 941            this.update(cx, |this, cx| {
 942                old_file.clone().and_then(|file| {
 943                    this.path_to_buffer_id.remove(&ProjectPath {
 944                        worktree_id: file.worktree_id(cx),
 945                        path: file.path().clone(),
 946                    })
 947                });
 948
 949                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 950            })
 951        })
 952    }
 953
 954    fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
 955        let buffer = buffer_entity.read(cx);
 956        let remote_id = buffer.remote_id();
 957        let path = File::from_dyn(buffer.file()).map(|file| ProjectPath {
 958            path: file.path.clone(),
 959            worktree_id: file.worktree_id(cx),
 960        });
 961        let is_remote = buffer.replica_id().is_remote();
 962        let open_buffer = OpenBuffer::Complete {
 963            buffer: buffer_entity.downgrade(),
 964        };
 965
 966        let handle = cx.entity().downgrade();
 967        buffer_entity.update(cx, move |_, cx| {
 968            cx.on_release(move |buffer, cx| {
 969                handle
 970                    .update(cx, |_, cx| {
 971                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
 972                    })
 973                    .ok();
 974            })
 975            .detach()
 976        });
 977        let _expect_path_to_exist;
 978        match self.opened_buffers.entry(remote_id) {
 979            hash_map::Entry::Vacant(entry) => {
 980                entry.insert(open_buffer);
 981                _expect_path_to_exist = false;
 982            }
 983            hash_map::Entry::Occupied(mut entry) => {
 984                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 985                    buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
 986                } else if entry.get().upgrade().is_some() {
 987                    if is_remote {
 988                        return Ok(());
 989                    } else {
 990                        debug_panic!("buffer {remote_id} was already registered");
 991                        anyhow::bail!("buffer {remote_id} was already registered");
 992                    }
 993                }
 994                entry.insert(open_buffer);
 995                _expect_path_to_exist = true;
 996            }
 997        }
 998
 999        if let Some(path) = path {
1000            self.path_to_buffer_id.insert(path, remote_id);
1001        }
1002
1003        cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1004        cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1005        Ok(())
1006    }
1007
1008    pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1009        self.opened_buffers
1010            .values()
1011            .filter_map(|buffer| buffer.upgrade())
1012    }
1013
1014    pub(crate) fn is_searchable(&self, id: &BufferId) -> bool {
1015        !self.non_searchable_buffers.contains(&id)
1016    }
1017
1018    pub fn loading_buffers(
1019        &self,
1020    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1021        self.loading_buffers.iter().map(|(path, task)| {
1022            let task = task.clone();
1023            (path, async move {
1024                task.await.map_err(|e| {
1025                    if e.error_code() != ErrorCode::Internal {
1026                        anyhow!(e.error_code())
1027                    } else {
1028                        anyhow!("{e}")
1029                    }
1030                })
1031            })
1032        })
1033    }
1034
1035    pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
1036        self.path_to_buffer_id.get(project_path)
1037    }
1038
1039    pub fn get_by_path(&self, path: &ProjectPath) -> Option<Entity<Buffer>> {
1040        self.path_to_buffer_id
1041            .get(path)
1042            .and_then(|buffer_id| self.get(*buffer_id))
1043    }
1044
1045    pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1046        self.opened_buffers.get(&buffer_id)?.upgrade()
1047    }
1048
1049    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1050        self.get(buffer_id)
1051            .with_context(|| format!("unknown buffer id {buffer_id}"))
1052    }
1053
1054    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1055        self.get(buffer_id).or_else(|| {
1056            self.as_remote()
1057                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1058        })
1059    }
1060
1061    pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1062        let buffers = self
1063            .buffers()
1064            .map(|buffer| {
1065                let buffer = buffer.read(cx);
1066                proto::BufferVersion {
1067                    id: buffer.remote_id().into(),
1068                    version: language::proto::serialize_version(&buffer.version),
1069                }
1070            })
1071            .collect();
1072        let incomplete_buffer_ids = self
1073            .as_remote()
1074            .map(|remote| remote.incomplete_buffer_ids())
1075            .unwrap_or_default();
1076        (buffers, incomplete_buffer_ids)
1077    }
1078
1079    pub fn disconnected_from_host(&mut self, cx: &mut App) {
1080        for open_buffer in self.opened_buffers.values_mut() {
1081            if let Some(buffer) = open_buffer.upgrade() {
1082                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1083            }
1084        }
1085
1086        for buffer in self.buffers() {
1087            buffer.update(cx, |buffer, cx| {
1088                buffer.set_capability(Capability::ReadOnly, cx)
1089            });
1090        }
1091
1092        if let Some(remote) = self.as_remote_mut() {
1093            // Wake up all futures currently waiting on a buffer to get opened,
1094            // to give them a chance to fail now that we've disconnected.
1095            remote.remote_buffer_listeners.clear()
1096        }
1097    }
1098
1099    pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1100        self.downstream_client = Some((downstream_client, remote_id));
1101    }
1102
1103    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1104        self.downstream_client.take();
1105        self.forget_shared_buffers();
1106    }
1107
1108    pub fn discard_incomplete(&mut self) {
1109        self.opened_buffers
1110            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1111    }
1112
1113    fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1114        let file = File::from_dyn(buffer.read(cx).file())?;
1115
1116        let remote_id = buffer.read(cx).remote_id();
1117        if let Some(entry_id) = file.entry_id {
1118            if let Some(local) = self.as_local_mut() {
1119                match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1120                    Some(_) => {
1121                        return None;
1122                    }
1123                    None => {
1124                        local
1125                            .local_buffer_ids_by_entry_id
1126                            .insert(entry_id, remote_id);
1127                    }
1128                }
1129            }
1130            self.path_to_buffer_id.insert(
1131                ProjectPath {
1132                    worktree_id: file.worktree_id(cx),
1133                    path: file.path.clone(),
1134                },
1135                remote_id,
1136            );
1137        };
1138
1139        Some(())
1140    }
1141
1142    fn on_buffer_event(
1143        &mut self,
1144        buffer: Entity<Buffer>,
1145        event: &BufferEvent,
1146        cx: &mut Context<Self>,
1147    ) {
1148        match event {
1149            BufferEvent::FileHandleChanged => {
1150                self.buffer_changed_file(buffer, cx);
1151            }
1152            BufferEvent::Reloaded => {
1153                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1154                    return;
1155                };
1156                let buffer = buffer.read(cx);
1157                downstream_client
1158                    .send(proto::BufferReloaded {
1159                        project_id: *project_id,
1160                        buffer_id: buffer.remote_id().to_proto(),
1161                        version: serialize_version(&buffer.version()),
1162                        mtime: buffer.saved_mtime().map(|t| t.into()),
1163                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1164                    })
1165                    .log_err();
1166            }
1167            BufferEvent::LanguageChanged(_) => {}
1168            _ => {}
1169        }
1170    }
1171
1172    pub async fn handle_update_buffer(
1173        this: Entity<Self>,
1174        envelope: TypedEnvelope<proto::UpdateBuffer>,
1175        mut cx: AsyncApp,
1176    ) -> Result<proto::Ack> {
1177        let payload = envelope.payload;
1178        let buffer_id = BufferId::new(payload.buffer_id)?;
1179        let ops = payload
1180            .operations
1181            .into_iter()
1182            .map(language::proto::deserialize_operation)
1183            .collect::<Result<Vec<_>, _>>()?;
1184        this.update(&mut cx, |this, cx| {
1185            match this.opened_buffers.entry(buffer_id) {
1186                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1187                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1188                    OpenBuffer::Complete { buffer, .. } => {
1189                        if let Some(buffer) = buffer.upgrade() {
1190                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1191                        }
1192                    }
1193                },
1194                hash_map::Entry::Vacant(e) => {
1195                    e.insert(OpenBuffer::Operations(ops));
1196                }
1197            }
1198            Ok(proto::Ack {})
1199        })
1200    }
1201
1202    pub fn register_shared_lsp_handle(
1203        &mut self,
1204        peer_id: proto::PeerId,
1205        buffer_id: BufferId,
1206        handle: OpenLspBufferHandle,
1207    ) {
1208        if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id)
1209            && let Some(buffer) = shared_buffers.get_mut(&buffer_id)
1210        {
1211            buffer.lsp_handle = Some(handle);
1212            return;
1213        }
1214        debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1215    }
1216
1217    pub fn handle_synchronize_buffers(
1218        &mut self,
1219        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1220        cx: &mut Context<Self>,
1221        client: Arc<Client>,
1222    ) -> Result<proto::SynchronizeBuffersResponse> {
1223        let project_id = envelope.payload.project_id;
1224        let mut response = proto::SynchronizeBuffersResponse {
1225            buffers: Default::default(),
1226        };
1227        let Some(guest_id) = envelope.original_sender_id else {
1228            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1229        };
1230
1231        self.shared_buffers.entry(guest_id).or_default().clear();
1232        for buffer in envelope.payload.buffers {
1233            let buffer_id = BufferId::new(buffer.id)?;
1234            let remote_version = language::proto::deserialize_version(&buffer.version);
1235            if let Some(buffer) = self.get(buffer_id) {
1236                self.shared_buffers
1237                    .entry(guest_id)
1238                    .or_default()
1239                    .entry(buffer_id)
1240                    .or_insert_with(|| SharedBuffer {
1241                        buffer: buffer.clone(),
1242                        lsp_handle: None,
1243                    });
1244
1245                let buffer = buffer.read(cx);
1246                response.buffers.push(proto::BufferVersion {
1247                    id: buffer_id.into(),
1248                    version: language::proto::serialize_version(&buffer.version),
1249                });
1250
1251                let operations = buffer.serialize_ops(Some(remote_version), cx);
1252                let client = client.clone();
1253                if let Some(file) = buffer.file() {
1254                    client
1255                        .send(proto::UpdateBufferFile {
1256                            project_id,
1257                            buffer_id: buffer_id.into(),
1258                            file: Some(file.to_proto(cx)),
1259                        })
1260                        .log_err();
1261                }
1262
1263                // TODO(max): do something
1264                // client
1265                //     .send(proto::UpdateStagedText {
1266                //         project_id,
1267                //         buffer_id: buffer_id.into(),
1268                //         diff_base: buffer.diff_base().map(ToString::to_string),
1269                //     })
1270                //     .log_err();
1271
1272                client
1273                    .send(proto::BufferReloaded {
1274                        project_id,
1275                        buffer_id: buffer_id.into(),
1276                        version: language::proto::serialize_version(buffer.saved_version()),
1277                        mtime: buffer.saved_mtime().map(|time| time.into()),
1278                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1279                            as i32,
1280                    })
1281                    .log_err();
1282
1283                cx.background_spawn(
1284                    async move {
1285                        let operations = operations.await;
1286                        for chunk in split_operations(operations) {
1287                            client
1288                                .request(proto::UpdateBuffer {
1289                                    project_id,
1290                                    buffer_id: buffer_id.into(),
1291                                    operations: chunk,
1292                                })
1293                                .await?;
1294                        }
1295                        anyhow::Ok(())
1296                    }
1297                    .log_err(),
1298                )
1299                .detach();
1300            }
1301        }
1302        Ok(response)
1303    }
1304
1305    pub fn handle_create_buffer_for_peer(
1306        &mut self,
1307        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1308        replica_id: ReplicaId,
1309        capability: Capability,
1310        cx: &mut Context<Self>,
1311    ) -> Result<()> {
1312        let remote = self
1313            .as_remote_mut()
1314            .context("buffer store is not a remote")?;
1315
1316        if let Some(buffer) =
1317            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1318        {
1319            self.add_buffer(buffer, cx)?;
1320        }
1321
1322        Ok(())
1323    }
1324
1325    pub async fn handle_update_buffer_file(
1326        this: Entity<Self>,
1327        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1328        mut cx: AsyncApp,
1329    ) -> Result<()> {
1330        let buffer_id = envelope.payload.buffer_id;
1331        let buffer_id = BufferId::new(buffer_id)?;
1332
1333        this.update(&mut cx, |this, cx| {
1334            let payload = envelope.payload.clone();
1335            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1336                let file = payload.file.context("invalid file")?;
1337                let worktree = this
1338                    .worktree_store
1339                    .read(cx)
1340                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1341                    .context("no such worktree")?;
1342                let file = File::from_proto(file, worktree, cx)?;
1343                let old_file = buffer.update(cx, |buffer, cx| {
1344                    let old_file = buffer.file().cloned();
1345                    let new_path = file.path.clone();
1346
1347                    buffer.file_updated(Arc::new(file), cx);
1348                    if old_file.as_ref().is_none_or(|old| *old.path() != new_path) {
1349                        Some(old_file)
1350                    } else {
1351                        None
1352                    }
1353                });
1354                if let Some(old_file) = old_file {
1355                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1356                }
1357            }
1358            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1359                downstream_client
1360                    .send(proto::UpdateBufferFile {
1361                        project_id: *project_id,
1362                        buffer_id: buffer_id.into(),
1363                        file: envelope.payload.file,
1364                    })
1365                    .log_err();
1366            }
1367            Ok(())
1368        })
1369    }
1370
1371    pub async fn handle_save_buffer(
1372        this: Entity<Self>,
1373        envelope: TypedEnvelope<proto::SaveBuffer>,
1374        mut cx: AsyncApp,
1375    ) -> Result<proto::BufferSaved> {
1376        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1377        let (buffer, project_id) = this.read_with(&cx, |this, _| {
1378            anyhow::Ok((
1379                this.get_existing(buffer_id)?,
1380                this.downstream_client
1381                    .as_ref()
1382                    .map(|(_, project_id)| *project_id)
1383                    .context("project is not shared")?,
1384            ))
1385        })?;
1386        buffer
1387            .update(&mut cx, |buffer, _| {
1388                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1389            })
1390            .await?;
1391        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
1392
1393        if let Some(new_path) = envelope.payload.new_path
1394            && let Some(new_path) = ProjectPath::from_proto(new_path)
1395        {
1396            this.update(&mut cx, |this, cx| {
1397                this.save_buffer_as(buffer.clone(), new_path, cx)
1398            })
1399            .await?;
1400        } else {
1401            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))
1402                .await?;
1403        }
1404
1405        Ok(buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
1406            project_id,
1407            buffer_id: buffer_id.into(),
1408            version: serialize_version(buffer.saved_version()),
1409            mtime: buffer.saved_mtime().map(|time| time.into()),
1410        }))
1411    }
1412
1413    pub async fn handle_close_buffer(
1414        this: Entity<Self>,
1415        envelope: TypedEnvelope<proto::CloseBuffer>,
1416        mut cx: AsyncApp,
1417    ) -> Result<()> {
1418        let peer_id = envelope.sender_id;
1419        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1420        this.update(&mut cx, |this, cx| {
1421            if let Some(shared) = this.shared_buffers.get_mut(&peer_id)
1422                && shared.remove(&buffer_id).is_some()
1423            {
1424                cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1425                if shared.is_empty() {
1426                    this.shared_buffers.remove(&peer_id);
1427                }
1428                return;
1429            }
1430            debug_panic!(
1431                "peer_id {} closed buffer_id {} which was either not open or already closed",
1432                peer_id,
1433                buffer_id
1434            )
1435        });
1436        Ok(())
1437    }
1438
1439    pub async fn handle_buffer_saved(
1440        this: Entity<Self>,
1441        envelope: TypedEnvelope<proto::BufferSaved>,
1442        mut cx: AsyncApp,
1443    ) -> Result<()> {
1444        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1445        let version = deserialize_version(&envelope.payload.version);
1446        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1447        this.update(&mut cx, move |this, cx| {
1448            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1449                buffer.update(cx, |buffer, cx| {
1450                    buffer.did_save(version, mtime, cx);
1451                });
1452            }
1453
1454            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1455                downstream_client
1456                    .send(proto::BufferSaved {
1457                        project_id: *project_id,
1458                        buffer_id: buffer_id.into(),
1459                        mtime: envelope.payload.mtime,
1460                        version: envelope.payload.version,
1461                    })
1462                    .log_err();
1463            }
1464        });
1465        Ok(())
1466    }
1467
1468    pub async fn handle_buffer_reloaded(
1469        this: Entity<Self>,
1470        envelope: TypedEnvelope<proto::BufferReloaded>,
1471        mut cx: AsyncApp,
1472    ) -> Result<()> {
1473        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1474        let version = deserialize_version(&envelope.payload.version);
1475        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1476        let line_ending = deserialize_line_ending(
1477            proto::LineEnding::from_i32(envelope.payload.line_ending)
1478                .context("missing line ending")?,
1479        );
1480        this.update(&mut cx, |this, cx| {
1481            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1482                buffer.update(cx, |buffer, cx| {
1483                    buffer.did_reload(version, line_ending, mtime, cx);
1484                });
1485            }
1486
1487            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1488                downstream_client
1489                    .send(proto::BufferReloaded {
1490                        project_id: *project_id,
1491                        buffer_id: buffer_id.into(),
1492                        mtime: envelope.payload.mtime,
1493                        version: envelope.payload.version,
1494                        line_ending: envelope.payload.line_ending,
1495                    })
1496                    .log_err();
1497            }
1498        });
1499        Ok(())
1500    }
1501
1502    pub fn reload_buffers(
1503        &self,
1504        buffers: HashSet<Entity<Buffer>>,
1505        push_to_history: bool,
1506        cx: &mut Context<Self>,
1507    ) -> Task<Result<ProjectTransaction>> {
1508        if buffers.is_empty() {
1509            return Task::ready(Ok(ProjectTransaction::default()));
1510        }
1511        match &self.state {
1512            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1513            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1514        }
1515    }
1516
1517    async fn handle_reload_buffers(
1518        this: Entity<Self>,
1519        envelope: TypedEnvelope<proto::ReloadBuffers>,
1520        mut cx: AsyncApp,
1521    ) -> Result<proto::ReloadBuffersResponse> {
1522        let sender_id = envelope.original_sender_id().unwrap_or_default();
1523        let reload = this.update(&mut cx, |this, cx| {
1524            let mut buffers = HashSet::default();
1525            for buffer_id in &envelope.payload.buffer_ids {
1526                let buffer_id = BufferId::new(*buffer_id)?;
1527                buffers.insert(this.get_existing(buffer_id)?);
1528            }
1529            anyhow::Ok(this.reload_buffers(buffers, false, cx))
1530        })?;
1531
1532        let project_transaction = reload.await?;
1533        let project_transaction = this.update(&mut cx, |this, cx| {
1534            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1535        });
1536        Ok(proto::ReloadBuffersResponse {
1537            transaction: Some(project_transaction),
1538        })
1539    }
1540
1541    pub fn create_buffer_for_peer(
1542        &mut self,
1543        buffer: &Entity<Buffer>,
1544        peer_id: proto::PeerId,
1545        cx: &mut Context<Self>,
1546    ) -> Task<Result<()>> {
1547        let buffer_id = buffer.read(cx).remote_id();
1548        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1549        if shared_buffers.contains_key(&buffer_id) {
1550            return Task::ready(Ok(()));
1551        }
1552        shared_buffers.insert(
1553            buffer_id,
1554            SharedBuffer {
1555                buffer: buffer.clone(),
1556                lsp_handle: None,
1557            },
1558        );
1559
1560        let Some((client, project_id)) = self.downstream_client.clone() else {
1561            return Task::ready(Ok(()));
1562        };
1563
1564        cx.spawn(async move |this, cx| {
1565            let Some(buffer) = this.read_with(cx, |this, _| this.get(buffer_id))? else {
1566                return anyhow::Ok(());
1567            };
1568
1569            let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx));
1570            let operations = operations.await;
1571            let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx));
1572
1573            let initial_state = proto::CreateBufferForPeer {
1574                project_id,
1575                peer_id: Some(peer_id),
1576                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1577            };
1578
1579            if client.send(initial_state).log_err().is_some() {
1580                let client = client.clone();
1581                cx.background_spawn(async move {
1582                    let mut chunks = split_operations(operations).peekable();
1583                    while let Some(chunk) = chunks.next() {
1584                        let is_last = chunks.peek().is_none();
1585                        client.send(proto::CreateBufferForPeer {
1586                            project_id,
1587                            peer_id: Some(peer_id),
1588                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1589                                proto::BufferChunk {
1590                                    buffer_id: buffer_id.into(),
1591                                    operations: chunk,
1592                                    is_last,
1593                                },
1594                            )),
1595                        })?;
1596                    }
1597                    anyhow::Ok(())
1598                })
1599                .await
1600                .log_err();
1601            }
1602            Ok(())
1603        })
1604    }
1605
1606    pub fn forget_shared_buffers(&mut self) {
1607        self.shared_buffers.clear();
1608    }
1609
1610    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1611        self.shared_buffers.remove(peer_id);
1612    }
1613
1614    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1615        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1616            self.shared_buffers.insert(new_peer_id, buffers);
1617        }
1618    }
1619
1620    pub fn has_shared_buffers(&self) -> bool {
1621        !self.shared_buffers.is_empty()
1622    }
1623
1624    pub fn create_local_buffer(
1625        &mut self,
1626        text: &str,
1627        language: Option<Arc<Language>>,
1628        project_searchable: bool,
1629        cx: &mut Context<Self>,
1630    ) -> Entity<Buffer> {
1631        let buffer = cx.new(|cx| {
1632            Buffer::local(text, cx)
1633                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1634        });
1635
1636        self.add_buffer(buffer.clone(), cx).log_err();
1637        let buffer_id = buffer.read(cx).remote_id();
1638        if !project_searchable {
1639            self.non_searchable_buffers.insert(buffer_id);
1640        }
1641
1642        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1643            self.path_to_buffer_id.insert(
1644                ProjectPath {
1645                    worktree_id: file.worktree_id(cx),
1646                    path: file.path.clone(),
1647                },
1648                buffer_id,
1649            );
1650            let this = self
1651                .as_local_mut()
1652                .expect("local-only method called in a non-local context");
1653            if let Some(entry_id) = file.entry_id {
1654                this.local_buffer_ids_by_entry_id
1655                    .insert(entry_id, buffer_id);
1656            }
1657        }
1658        buffer
1659    }
1660
1661    pub fn deserialize_project_transaction(
1662        &mut self,
1663        message: proto::ProjectTransaction,
1664        push_to_history: bool,
1665        cx: &mut Context<Self>,
1666    ) -> Task<Result<ProjectTransaction>> {
1667        if let Some(this) = self.as_remote_mut() {
1668            this.deserialize_project_transaction(message, push_to_history, cx)
1669        } else {
1670            debug_panic!("not a remote buffer store");
1671            Task::ready(Err(anyhow!("not a remote buffer store")))
1672        }
1673    }
1674
1675    pub fn wait_for_remote_buffer(
1676        &mut self,
1677        id: BufferId,
1678        cx: &mut Context<BufferStore>,
1679    ) -> Task<Result<Entity<Buffer>>> {
1680        if let Some(this) = self.as_remote_mut() {
1681            this.wait_for_remote_buffer(id, cx)
1682        } else {
1683            debug_panic!("not a remote buffer store");
1684            Task::ready(Err(anyhow!("not a remote buffer store")))
1685        }
1686    }
1687
1688    pub fn serialize_project_transaction_for_peer(
1689        &mut self,
1690        project_transaction: ProjectTransaction,
1691        peer_id: proto::PeerId,
1692        cx: &mut Context<Self>,
1693    ) -> proto::ProjectTransaction {
1694        let mut serialized_transaction = proto::ProjectTransaction {
1695            buffer_ids: Default::default(),
1696            transactions: Default::default(),
1697        };
1698        for (buffer, transaction) in project_transaction.0 {
1699            self.create_buffer_for_peer(&buffer, peer_id, cx)
1700                .detach_and_log_err(cx);
1701            serialized_transaction
1702                .buffer_ids
1703                .push(buffer.read(cx).remote_id().into());
1704            serialized_transaction
1705                .transactions
1706                .push(language::proto::serialize_transaction(&transaction));
1707        }
1708        serialized_transaction
1709    }
1710
1711    pub(crate) fn register_project_search_result_handle(
1712        &mut self,
1713    ) -> (u64, smol::channel::Receiver<BufferId>) {
1714        let (tx, rx) = smol::channel::unbounded();
1715        let handle = util::post_inc(&mut self.project_search.next_id);
1716        let _old_entry = self.project_search.chunks.insert(handle, tx);
1717        debug_assert!(_old_entry.is_none());
1718        (handle, rx)
1719    }
1720
1721    pub fn register_ongoing_project_search(
1722        &mut self,
1723        id: (PeerId, u64),
1724        search: Task<anyhow::Result<()>>,
1725    ) {
1726        let _old = self.project_search.searches_in_progress.insert(id, search);
1727        debug_assert!(_old.is_none());
1728    }
1729
1730    pub async fn handle_find_search_candidates_cancel(
1731        this: Entity<Self>,
1732        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
1733        mut cx: AsyncApp,
1734    ) -> Result<()> {
1735        let id = (
1736            envelope.original_sender_id.unwrap_or(envelope.sender_id),
1737            envelope.payload.handle,
1738        );
1739        let _ = this.update(&mut cx, |this, _| {
1740            this.project_search.searches_in_progress.remove(&id)
1741        });
1742        Ok(())
1743    }
1744
1745    pub(crate) async fn handle_find_search_candidates_chunk(
1746        this: Entity<Self>,
1747        envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
1748        mut cx: AsyncApp,
1749    ) -> Result<proto::Ack> {
1750        use proto::find_search_candidates_chunk::Variant;
1751        let handle = envelope.payload.handle;
1752
1753        let buffer_ids = match envelope
1754            .payload
1755            .variant
1756            .context("Expected non-null variant")?
1757        {
1758            Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches
1759                .buffer_ids
1760                .into_iter()
1761                .filter_map(|buffer_id| BufferId::new(buffer_id).ok())
1762                .collect::<Vec<_>>(),
1763            Variant::Done(_) => {
1764                this.update(&mut cx, |this, _| {
1765                    this.project_search.chunks.remove(&handle)
1766                });
1767                return Ok(proto::Ack {});
1768            }
1769        };
1770        let Some(sender) = this.read_with(&mut cx, |this, _| {
1771            this.project_search.chunks.get(&handle).cloned()
1772        }) else {
1773            return Ok(proto::Ack {});
1774        };
1775
1776        for buffer_id in buffer_ids {
1777            let Ok(_) = sender.send(buffer_id).await else {
1778                this.update(&mut cx, |this, _| {
1779                    this.project_search.chunks.remove(&handle)
1780                });
1781                return Ok(proto::Ack {});
1782            };
1783        }
1784        Ok(proto::Ack {})
1785    }
1786}
1787
1788impl OpenBuffer {
1789    fn upgrade(&self) -> Option<Entity<Buffer>> {
1790        match self {
1791            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1792            OpenBuffer::Operations(_) => None,
1793        }
1794    }
1795}
1796
1797fn is_not_found_error(error: &anyhow::Error) -> bool {
1798    error
1799        .root_cause()
1800        .downcast_ref::<io::Error>()
1801        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1802}