buffer_store.rs

   1use crate::{
   2    ProjectPath,
   3    lsp_store::OpenLspBufferHandle,
   4    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   5};
   6use anyhow::{Context as _, Result, anyhow};
   7use client::Client;
   8use collections::{HashMap, HashSet, hash_map};
   9use futures::{Future, FutureExt as _, channel::oneshot, future::Shared};
  10use gpui::{
  11    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
  12};
  13use language::{
  14    Buffer, BufferEvent, Capability, DiskState, File as _, Language, LineEnding, Operation,
  15    language_settings::{AllLanguageSettings, LineEndingSetting},
  16    proto::{
  17        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  18        split_operations,
  19    },
  20};
  21use rpc::{
  22    AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope,
  23    proto::{self, PeerId},
  24};
  25
  26use settings::Settings;
  27use std::{io, sync::Arc, time::Instant};
  28use text::{BufferId, ReplicaId};
  29use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath};
  30use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
  31
  32/// A set of open buffers.
  33pub struct BufferStore {
  34    state: BufferStoreState,
  35    #[allow(clippy::type_complexity)]
  36    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
  37    worktree_store: Entity<WorktreeStore>,
  38    opened_buffers: HashMap<BufferId, OpenBuffer>,
  39    path_to_buffer_id: HashMap<ProjectPath, BufferId>,
  40    downstream_client: Option<(AnyProtoClient, u64)>,
  41    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  42    non_searchable_buffers: HashSet<BufferId>,
  43    project_search: RemoteProjectSearchState,
  44}
  45
  46#[derive(Default)]
  47struct RemoteProjectSearchState {
  48    // List of ongoing project search chunks from our remote host. Used by the side issuing a search RPC request.
  49    chunks: HashMap<u64, smol::channel::Sender<BufferId>>,
  50    // Monotonously-increasing handle to hand out to remote host in order to identify the project search result chunk.
  51    next_id: u64,
  52    // Used by the side running the actual search for match candidates to potentially cancel the search prematurely.
  53    searches_in_progress: HashMap<(PeerId, u64), Task<Result<()>>>,
  54}
  55
  56#[derive(Hash, Eq, PartialEq, Clone)]
  57struct SharedBuffer {
  58    buffer: Entity<Buffer>,
  59    lsp_handle: Option<OpenLspBufferHandle>,
  60}
  61
  62enum BufferStoreState {
  63    Local(LocalBufferStore),
  64    Remote(RemoteBufferStore),
  65}
  66
  67struct RemoteBufferStore {
  68    shared_with_me: HashSet<Entity<Buffer>>,
  69    upstream_client: AnyProtoClient,
  70    project_id: u64,
  71    loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
  72    remote_buffer_listeners:
  73        HashMap<BufferId, Vec<oneshot::Sender<anyhow::Result<Entity<Buffer>>>>>,
  74    worktree_store: Entity<WorktreeStore>,
  75}
  76
  77struct LocalBufferStore {
  78    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  79    worktree_store: Entity<WorktreeStore>,
  80    _subscription: Subscription,
  81}
  82
  83enum OpenBuffer {
  84    Complete { buffer: WeakEntity<Buffer> },
  85    Operations(Vec<Operation>),
  86}
  87
  88pub enum BufferStoreEvent {
  89    BufferAdded(Entity<Buffer>),
  90    SharedBufferClosed(proto::PeerId, BufferId),
  91    BufferDropped(BufferId),
  92    BufferChangedFilePath {
  93        buffer: Entity<Buffer>,
  94        old_file: Option<Arc<dyn language::File>>,
  95    },
  96}
  97
  98#[derive(Default, Debug, Clone)]
  99pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
 100
 101impl PartialEq for ProjectTransaction {
 102    fn eq(&self, other: &Self) -> bool {
 103        self.0.len() == other.0.len()
 104            && self.0.iter().all(|(buffer, transaction)| {
 105                other.0.get(buffer).is_some_and(|t| t.id == transaction.id)
 106            })
 107    }
 108}
 109
 110impl EventEmitter<BufferStoreEvent> for BufferStore {}
 111
 112impl RemoteBufferStore {
 113    pub fn wait_for_remote_buffer(
 114        &mut self,
 115        id: BufferId,
 116        cx: &mut Context<BufferStore>,
 117    ) -> Task<Result<Entity<Buffer>>> {
 118        let (tx, rx) = oneshot::channel();
 119        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 120
 121        cx.spawn(async move |this, cx| {
 122            if let Some(buffer) = this
 123                .read_with(cx, |buffer_store, _| buffer_store.get(id))
 124                .ok()
 125                .flatten()
 126            {
 127                return Ok(buffer);
 128            }
 129
 130            cx.background_spawn(async move { rx.await? }).await
 131        })
 132    }
 133
 134    fn save_remote_buffer(
 135        &self,
 136        buffer_handle: Entity<Buffer>,
 137        new_path: Option<proto::ProjectPath>,
 138        cx: &Context<BufferStore>,
 139    ) -> Task<Result<()>> {
 140        let buffer = buffer_handle.read(cx);
 141        let buffer_id = buffer.remote_id().into();
 142        let version = buffer.version();
 143        let rpc = self.upstream_client.clone();
 144        let project_id = self.project_id;
 145        cx.spawn(async move |_, cx| {
 146            let response = rpc
 147                .request(proto::SaveBuffer {
 148                    project_id,
 149                    buffer_id,
 150                    new_path,
 151                    version: serialize_version(&version),
 152                })
 153                .await?;
 154            let version = deserialize_version(&response.version);
 155            let mtime = response.mtime.map(|mtime| mtime.into());
 156
 157            buffer_handle.update(cx, |buffer, cx| {
 158                buffer.did_save(version.clone(), mtime, cx);
 159            });
 160
 161            Ok(())
 162        })
 163    }
 164
 165    pub fn handle_create_buffer_for_peer(
 166        &mut self,
 167        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 168        replica_id: ReplicaId,
 169        capability: Capability,
 170        cx: &mut Context<BufferStore>,
 171    ) -> Result<Option<Entity<Buffer>>> {
 172        match envelope.payload.variant.context("missing variant")? {
 173            proto::create_buffer_for_peer::Variant::State(mut state) => {
 174                let buffer_id = BufferId::new(state.id)?;
 175
 176                let buffer_result = maybe!({
 177                    let mut buffer_file = None;
 178                    if let Some(file) = state.file.take() {
 179                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 180                        let worktree = self
 181                            .worktree_store
 182                            .read(cx)
 183                            .worktree_for_id(worktree_id, cx)
 184                            .with_context(|| {
 185                                format!("no worktree found for id {}", file.worktree_id)
 186                            })?;
 187                        buffer_file = Some(Arc::new(File::from_proto(file, worktree, cx)?)
 188                            as Arc<dyn language::File>);
 189                    }
 190                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 191                });
 192
 193                match buffer_result {
 194                    Ok(buffer) => {
 195                        let buffer = cx.new(|_| buffer);
 196                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 197                    }
 198                    Err(error) => {
 199                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 200                            for listener in listeners {
 201                                listener.send(Err(anyhow!(error.cloned()))).ok();
 202                            }
 203                        }
 204                    }
 205                }
 206            }
 207            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 208                let buffer_id = BufferId::new(chunk.buffer_id)?;
 209                let buffer = self
 210                    .loading_remote_buffers_by_id
 211                    .get(&buffer_id)
 212                    .cloned()
 213                    .with_context(|| {
 214                        format!(
 215                            "received chunk for buffer {} without initial state",
 216                            chunk.buffer_id
 217                        )
 218                    })?;
 219
 220                let result = maybe!({
 221                    let operations = chunk
 222                        .operations
 223                        .into_iter()
 224                        .map(language::proto::deserialize_operation)
 225                        .collect::<Result<Vec<_>>>()?;
 226                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 227                    anyhow::Ok(())
 228                });
 229
 230                if let Err(error) = result {
 231                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 232                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 233                        for listener in listeners {
 234                            listener.send(Err(error.cloned())).ok();
 235                        }
 236                    }
 237                } else if chunk.is_last {
 238                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 239                    if self.upstream_client.is_via_collab() {
 240                        // retain buffers sent by peers to avoid races.
 241                        self.shared_with_me.insert(buffer.clone());
 242                    }
 243
 244                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 245                        for sender in senders {
 246                            sender.send(Ok(buffer.clone())).ok();
 247                        }
 248                    }
 249                    return Ok(Some(buffer));
 250                }
 251            }
 252        }
 253        Ok(None)
 254    }
 255
 256    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 257        self.loading_remote_buffers_by_id
 258            .keys()
 259            .copied()
 260            .collect::<Vec<_>>()
 261    }
 262
 263    pub fn deserialize_project_transaction(
 264        &self,
 265        message: proto::ProjectTransaction,
 266        push_to_history: bool,
 267        cx: &mut Context<BufferStore>,
 268    ) -> Task<Result<ProjectTransaction>> {
 269        cx.spawn(async move |this, cx| {
 270            let mut project_transaction = ProjectTransaction::default();
 271            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 272            {
 273                let buffer_id = BufferId::new(buffer_id)?;
 274                let buffer = this
 275                    .update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
 276                    .await?;
 277                let transaction = language::proto::deserialize_transaction(transaction)?;
 278                project_transaction.0.insert(buffer, transaction);
 279            }
 280
 281            for (buffer, transaction) in &project_transaction.0 {
 282                buffer
 283                    .update(cx, |buffer, _| {
 284                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 285                    })
 286                    .await?;
 287
 288                if push_to_history {
 289                    buffer.update(cx, |buffer, _| {
 290                        buffer.push_transaction(transaction.clone(), Instant::now());
 291                        buffer.finalize_last_transaction();
 292                    });
 293                }
 294            }
 295
 296            Ok(project_transaction)
 297        })
 298    }
 299
 300    fn open_buffer(
 301        &self,
 302        path: Arc<RelPath>,
 303        worktree: Entity<Worktree>,
 304        cx: &mut Context<BufferStore>,
 305    ) -> Task<Result<Entity<Buffer>>> {
 306        let worktree_id = worktree.read(cx).id().to_proto();
 307        let project_id = self.project_id;
 308        let client = self.upstream_client.clone();
 309        cx.spawn(async move |this, cx| {
 310            let response = client
 311                .request(proto::OpenBufferByPath {
 312                    project_id,
 313                    worktree_id,
 314                    path: path.to_proto(),
 315                })
 316                .await?;
 317            let buffer_id = BufferId::new(response.buffer_id)?;
 318
 319            let buffer = this
 320                .update(cx, {
 321                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 322                })?
 323                .await?;
 324
 325            Ok(buffer)
 326        })
 327    }
 328
 329    fn create_buffer(
 330        &self,
 331        language: Option<Arc<Language>>,
 332        project_searchable: bool,
 333        cx: &mut Context<BufferStore>,
 334    ) -> Task<Result<Entity<Buffer>>> {
 335        let create = self.upstream_client.request(proto::OpenNewBuffer {
 336            project_id: self.project_id,
 337        });
 338        cx.spawn(async move |this, cx| {
 339            let response = create.await?;
 340            let buffer_id = BufferId::new(response.buffer_id)?;
 341
 342            let buffer = this
 343                .update(cx, |this, cx| {
 344                    if !project_searchable {
 345                        this.non_searchable_buffers.insert(buffer_id);
 346                    }
 347                    this.wait_for_remote_buffer(buffer_id, cx)
 348                })?
 349                .await?;
 350            if let Some(language) = language {
 351                buffer.update(cx, |buffer, cx| {
 352                    buffer.set_language(Some(language), cx);
 353                });
 354            }
 355            Ok(buffer)
 356        })
 357    }
 358
 359    fn reload_buffers(
 360        &self,
 361        buffers: HashSet<Entity<Buffer>>,
 362        push_to_history: bool,
 363        cx: &mut Context<BufferStore>,
 364    ) -> Task<Result<ProjectTransaction>> {
 365        let request = self.upstream_client.request(proto::ReloadBuffers {
 366            project_id: self.project_id,
 367            buffer_ids: buffers
 368                .iter()
 369                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 370                .collect(),
 371        });
 372
 373        cx.spawn(async move |this, cx| {
 374            let response = request.await?.transaction.context("missing transaction")?;
 375            this.update(cx, |this, cx| {
 376                this.deserialize_project_transaction(response, push_to_history, cx)
 377            })?
 378            .await
 379        })
 380    }
 381}
 382
 383impl LocalBufferStore {
 384    fn save_local_buffer(
 385        &self,
 386        buffer_handle: Entity<Buffer>,
 387        worktree: Entity<Worktree>,
 388        path: Arc<RelPath>,
 389        mut has_changed_file: bool,
 390        cx: &mut Context<BufferStore>,
 391    ) -> Task<Result<()>> {
 392        let buffer = buffer_handle.read(cx);
 393
 394        let text = buffer.as_rope().clone();
 395        let line_ending = buffer.line_ending();
 396        let encoding = buffer.encoding();
 397        let has_bom = buffer.has_bom();
 398        let version = buffer.version();
 399        let buffer_id = buffer.remote_id();
 400        let file = buffer.file().cloned();
 401        if file
 402            .as_ref()
 403            .is_some_and(|file| file.disk_state() == DiskState::New)
 404        {
 405            has_changed_file = true;
 406        }
 407
 408        let save = worktree.update(cx, |worktree, cx| {
 409            worktree.write_file(path, text, line_ending, encoding, has_bom, cx)
 410        });
 411
 412        cx.spawn(async move |this, cx| {
 413            let new_file = save.await?;
 414            let mtime = new_file.disk_state().mtime();
 415            this.update(cx, |this, cx| {
 416                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 417                    if has_changed_file {
 418                        downstream_client
 419                            .send(proto::UpdateBufferFile {
 420                                project_id,
 421                                buffer_id: buffer_id.to_proto(),
 422                                file: Some(language::File::to_proto(&*new_file, cx)),
 423                            })
 424                            .log_err();
 425                    }
 426                    downstream_client
 427                        .send(proto::BufferSaved {
 428                            project_id,
 429                            buffer_id: buffer_id.to_proto(),
 430                            version: serialize_version(&version),
 431                            mtime: mtime.map(|time| time.into()),
 432                        })
 433                        .log_err();
 434                }
 435            })?;
 436            buffer_handle.update(cx, |buffer, cx| {
 437                if has_changed_file {
 438                    buffer.file_updated(new_file, cx);
 439                }
 440                buffer.did_save(version.clone(), mtime, cx);
 441            });
 442            Ok(())
 443        })
 444    }
 445
 446    fn subscribe_to_worktree(
 447        &mut self,
 448        worktree: &Entity<Worktree>,
 449        cx: &mut Context<BufferStore>,
 450    ) {
 451        cx.subscribe(worktree, |this, worktree, event, cx| {
 452            if worktree.read(cx).is_local()
 453                && let worktree::Event::UpdatedEntries(changes) = event
 454            {
 455                Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 456            }
 457        })
 458        .detach();
 459    }
 460
 461    fn local_worktree_entries_changed(
 462        this: &mut BufferStore,
 463        worktree_handle: &Entity<Worktree>,
 464        changes: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
 465        cx: &mut Context<BufferStore>,
 466    ) {
 467        let snapshot = worktree_handle.read(cx).snapshot();
 468        for (path, entry_id, _) in changes {
 469            Self::local_worktree_entry_changed(
 470                this,
 471                *entry_id,
 472                path,
 473                worktree_handle,
 474                &snapshot,
 475                cx,
 476            );
 477        }
 478    }
 479
 480    fn local_worktree_entry_changed(
 481        this: &mut BufferStore,
 482        entry_id: ProjectEntryId,
 483        path: &Arc<RelPath>,
 484        worktree: &Entity<worktree::Worktree>,
 485        snapshot: &worktree::Snapshot,
 486        cx: &mut Context<BufferStore>,
 487    ) -> Option<()> {
 488        let project_path = ProjectPath {
 489            worktree_id: snapshot.id(),
 490            path: path.clone(),
 491        };
 492
 493        let buffer_id = this
 494            .as_local_mut()
 495            .and_then(|local| local.local_buffer_ids_by_entry_id.get(&entry_id))
 496            .copied()
 497            .or_else(|| this.path_to_buffer_id.get(&project_path).copied())?;
 498
 499        let buffer = if let Some(buffer) = this.get(buffer_id) {
 500            Some(buffer)
 501        } else {
 502            this.opened_buffers.remove(&buffer_id);
 503            this.non_searchable_buffers.remove(&buffer_id);
 504            None
 505        };
 506
 507        let buffer = if let Some(buffer) = buffer {
 508            buffer
 509        } else {
 510            this.path_to_buffer_id.remove(&project_path);
 511            let this = this.as_local_mut()?;
 512            this.local_buffer_ids_by_entry_id.remove(&entry_id);
 513            return None;
 514        };
 515
 516        let events = buffer.update(cx, |buffer, cx| {
 517            let file = buffer.file()?;
 518            let old_file = File::from_dyn(Some(file))?;
 519            if old_file.worktree != *worktree {
 520                return None;
 521            }
 522
 523            let snapshot_entry = old_file
 524                .entry_id
 525                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 526                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
 527
 528            let new_file = if let Some(entry) = snapshot_entry {
 529                File {
 530                    disk_state: match entry.mtime {
 531                        Some(mtime) => DiskState::Present {
 532                            mtime,
 533                            size: entry.size,
 534                        },
 535                        None => old_file.disk_state,
 536                    },
 537                    is_local: true,
 538                    entry_id: Some(entry.id),
 539                    path: entry.path.clone(),
 540                    worktree: worktree.clone(),
 541                    is_private: entry.is_private,
 542                }
 543            } else {
 544                File {
 545                    disk_state: DiskState::Deleted,
 546                    is_local: true,
 547                    entry_id: old_file.entry_id,
 548                    path: old_file.path.clone(),
 549                    worktree: worktree.clone(),
 550                    is_private: old_file.is_private,
 551                }
 552            };
 553
 554            if new_file == *old_file {
 555                return None;
 556            }
 557
 558            let mut events = Vec::new();
 559            if new_file.path != old_file.path {
 560                this.path_to_buffer_id.remove(&ProjectPath {
 561                    path: old_file.path.clone(),
 562                    worktree_id: old_file.worktree_id(cx),
 563                });
 564                this.path_to_buffer_id.insert(
 565                    ProjectPath {
 566                        worktree_id: new_file.worktree_id(cx),
 567                        path: new_file.path.clone(),
 568                    },
 569                    buffer_id,
 570                );
 571                events.push(BufferStoreEvent::BufferChangedFilePath {
 572                    buffer: cx.entity(),
 573                    old_file: buffer.file().cloned(),
 574                });
 575            }
 576            let local = this.as_local_mut()?;
 577            if new_file.entry_id != old_file.entry_id {
 578                if let Some(entry_id) = old_file.entry_id {
 579                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
 580                }
 581                if let Some(entry_id) = new_file.entry_id {
 582                    local
 583                        .local_buffer_ids_by_entry_id
 584                        .insert(entry_id, buffer_id);
 585                }
 586            }
 587
 588            if let Some((client, project_id)) = &this.downstream_client {
 589                client
 590                    .send(proto::UpdateBufferFile {
 591                        project_id: *project_id,
 592                        buffer_id: buffer_id.to_proto(),
 593                        file: Some(new_file.to_proto(cx)),
 594                    })
 595                    .ok();
 596            }
 597
 598            buffer.file_updated(Arc::new(new_file), cx);
 599            Some(events)
 600        })?;
 601
 602        for event in events {
 603            cx.emit(event);
 604        }
 605
 606        None
 607    }
 608
 609    fn save_buffer(
 610        &self,
 611        buffer: Entity<Buffer>,
 612        cx: &mut Context<BufferStore>,
 613    ) -> Task<Result<()>> {
 614        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 615            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 616        };
 617        let worktree = file.worktree.clone();
 618        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 619    }
 620
 621    fn save_buffer_as(
 622        &self,
 623        buffer: Entity<Buffer>,
 624        path: ProjectPath,
 625        cx: &mut Context<BufferStore>,
 626    ) -> Task<Result<()>> {
 627        let Some(worktree) = self
 628            .worktree_store
 629            .read(cx)
 630            .worktree_for_id(path.worktree_id, cx)
 631        else {
 632            return Task::ready(Err(anyhow!("no such worktree")));
 633        };
 634        self.save_local_buffer(buffer, worktree, path.path, true, cx)
 635    }
 636
 637    #[ztracing::instrument(skip_all)]
 638    fn open_buffer(
 639        &self,
 640        path: Arc<RelPath>,
 641        worktree: Entity<Worktree>,
 642        cx: &mut Context<BufferStore>,
 643    ) -> Task<Result<Entity<Buffer>>> {
 644        let load_file = worktree.update(cx, |worktree, cx| worktree.load_file(path.as_ref(), cx));
 645        cx.spawn(async move |this, cx| {
 646            let path = path.clone();
 647            let buffer = match load_file.await {
 648                Ok(loaded) => {
 649                    let reservation = cx.reserve_entity::<Buffer>();
 650                    let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 651                    let text_buffer = cx
 652                        .background_spawn(async move {
 653                            text::Buffer::new(ReplicaId::LOCAL, buffer_id, loaded.text)
 654                        })
 655                        .await;
 656                    cx.insert_entity(reservation, |_| {
 657                        let mut buffer =
 658                            Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite);
 659                        buffer.set_encoding(loaded.encoding);
 660                        buffer.set_has_bom(loaded.has_bom);
 661                        buffer
 662                    })
 663                }
 664                Err(error) if is_not_found_error(&error) => cx.new(|cx| {
 665                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 666                    let text_buffer = text::Buffer::new(ReplicaId::LOCAL, buffer_id, "");
 667                    let mut buffer = Buffer::build(
 668                        text_buffer,
 669                        Some(Arc::new(File {
 670                            worktree,
 671                            path,
 672                            disk_state: DiskState::New,
 673                            entry_id: None,
 674                            is_local: true,
 675                            is_private: false,
 676                        })),
 677                        Capability::ReadWrite,
 678                    );
 679                    apply_initial_line_ending(&mut buffer, cx);
 680                    buffer
 681                }),
 682                Err(e) => return Err(e),
 683            };
 684            this.update(cx, |this, cx| {
 685                this.add_buffer(buffer.clone(), cx)?;
 686                let buffer_id = buffer.read(cx).remote_id();
 687                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 688                    let project_path = ProjectPath {
 689                        worktree_id: file.worktree_id(cx),
 690                        path: file.path.clone(),
 691                    };
 692                    let entry_id = file.entry_id;
 693
 694                    // Check if the file should be read-only based on settings
 695                    let settings = WorktreeSettings::get(Some((&project_path).into()), cx);
 696                    let is_read_only = if project_path.path.is_empty() {
 697                        settings.is_std_path_read_only(&file.full_path(cx))
 698                    } else {
 699                        settings.is_path_read_only(&project_path.path)
 700                    };
 701                    if is_read_only {
 702                        buffer.update(cx, |buffer, cx| {
 703                            buffer.set_capability(Capability::Read, cx);
 704                        });
 705                    }
 706
 707                    this.path_to_buffer_id.insert(project_path, buffer_id);
 708                    let this = this.as_local_mut().unwrap();
 709                    if let Some(entry_id) = entry_id {
 710                        this.local_buffer_ids_by_entry_id
 711                            .insert(entry_id, buffer_id);
 712                    }
 713                }
 714
 715                anyhow::Ok(())
 716            })??;
 717
 718            Ok(buffer)
 719        })
 720    }
 721
 722    fn create_buffer(
 723        &self,
 724        language: Option<Arc<Language>>,
 725        project_searchable: bool,
 726        cx: &mut Context<BufferStore>,
 727    ) -> Task<Result<Entity<Buffer>>> {
 728        cx.spawn(async move |buffer_store, cx| {
 729            let buffer = cx.new(|cx| {
 730                let mut buffer = Buffer::local("", cx)
 731                    .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx);
 732                apply_initial_line_ending(&mut buffer, cx);
 733                buffer
 734            });
 735            buffer_store.update(cx, |buffer_store, cx| {
 736                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 737                if !project_searchable {
 738                    buffer_store
 739                        .non_searchable_buffers
 740                        .insert(buffer.read(cx).remote_id());
 741                }
 742            })?;
 743            Ok(buffer)
 744        })
 745    }
 746
 747    fn reload_buffers(
 748        &self,
 749        buffers: HashSet<Entity<Buffer>>,
 750        push_to_history: bool,
 751        cx: &mut Context<BufferStore>,
 752    ) -> Task<Result<ProjectTransaction>> {
 753        cx.spawn(async move |_, cx| {
 754            let mut project_transaction = ProjectTransaction::default();
 755            for buffer in buffers {
 756                let transaction = buffer.update(cx, |buffer, cx| buffer.reload(cx)).await?;
 757                buffer.update(cx, |buffer, cx| {
 758                    if let Some(transaction) = transaction {
 759                        if !push_to_history {
 760                            buffer.forget_transaction(transaction.id);
 761                        }
 762                        project_transaction.0.insert(cx.entity(), transaction);
 763                    }
 764                });
 765            }
 766
 767            Ok(project_transaction)
 768        })
 769    }
 770}
 771
 772impl BufferStore {
 773    pub fn init(client: &AnyProtoClient) {
 774        client.add_entity_message_handler(Self::handle_buffer_reloaded);
 775        client.add_entity_message_handler(Self::handle_buffer_saved);
 776        client.add_entity_message_handler(Self::handle_update_buffer_file);
 777        client.add_entity_request_handler(Self::handle_save_buffer);
 778        client.add_entity_request_handler(Self::handle_reload_buffers);
 779    }
 780
 781    /// Creates a buffer store, optionally retaining its buffers.
 782    pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
 783        Self {
 784            state: BufferStoreState::Local(LocalBufferStore {
 785                local_buffer_ids_by_entry_id: Default::default(),
 786                worktree_store: worktree_store.clone(),
 787                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
 788                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 789                        let this = this.as_local_mut().unwrap();
 790                        this.subscribe_to_worktree(worktree, cx);
 791                    }
 792                }),
 793            }),
 794            downstream_client: None,
 795            opened_buffers: Default::default(),
 796            path_to_buffer_id: Default::default(),
 797            shared_buffers: Default::default(),
 798            loading_buffers: Default::default(),
 799            non_searchable_buffers: Default::default(),
 800            worktree_store,
 801            project_search: Default::default(),
 802        }
 803    }
 804
 805    pub fn remote(
 806        worktree_store: Entity<WorktreeStore>,
 807        upstream_client: AnyProtoClient,
 808        remote_id: u64,
 809        _cx: &mut Context<Self>,
 810    ) -> Self {
 811        Self {
 812            state: BufferStoreState::Remote(RemoteBufferStore {
 813                shared_with_me: Default::default(),
 814                loading_remote_buffers_by_id: Default::default(),
 815                remote_buffer_listeners: Default::default(),
 816                project_id: remote_id,
 817                upstream_client,
 818                worktree_store: worktree_store.clone(),
 819            }),
 820            downstream_client: None,
 821            opened_buffers: Default::default(),
 822            path_to_buffer_id: Default::default(),
 823            loading_buffers: Default::default(),
 824            shared_buffers: Default::default(),
 825            non_searchable_buffers: Default::default(),
 826            worktree_store,
 827            project_search: Default::default(),
 828        }
 829    }
 830
 831    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
 832        match &mut self.state {
 833            BufferStoreState::Local(state) => Some(state),
 834            _ => None,
 835        }
 836    }
 837
 838    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
 839        match &mut self.state {
 840            BufferStoreState::Remote(state) => Some(state),
 841            _ => None,
 842        }
 843    }
 844
 845    fn as_remote(&self) -> Option<&RemoteBufferStore> {
 846        match &self.state {
 847            BufferStoreState::Remote(state) => Some(state),
 848            _ => None,
 849        }
 850    }
 851
 852    #[ztracing::instrument(skip_all)]
 853    pub fn open_buffer(
 854        &mut self,
 855        project_path: ProjectPath,
 856        cx: &mut Context<Self>,
 857    ) -> Task<Result<Entity<Buffer>>> {
 858        if let Some(buffer) = self.get_by_path(&project_path) {
 859            return Task::ready(Ok(buffer));
 860        }
 861
 862        let task = match self.loading_buffers.entry(project_path.clone()) {
 863            hash_map::Entry::Occupied(e) => e.get().clone(),
 864            hash_map::Entry::Vacant(entry) => {
 865                let path = project_path.path.clone();
 866                let Some(worktree) = self
 867                    .worktree_store
 868                    .read(cx)
 869                    .worktree_for_id(project_path.worktree_id, cx)
 870                else {
 871                    return Task::ready(Err(anyhow!("no such worktree")));
 872                };
 873                let load_buffer = match &self.state {
 874                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
 875                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
 876                };
 877
 878                entry
 879                    .insert(
 880                        cx.spawn(async move |this, cx| {
 881                            let load_result = load_buffer.await;
 882                            this.update(cx, |this, _cx| {
 883                                // Record the fact that the buffer is no longer loading.
 884                                this.loading_buffers.remove(&project_path);
 885
 886                                let buffer = load_result.map_err(Arc::new)?;
 887                                Ok(buffer)
 888                            })?
 889                        })
 890                        .shared(),
 891                    )
 892                    .clone()
 893            }
 894        };
 895
 896        cx.background_spawn(async move {
 897            task.await.map_err(|e| {
 898                if e.error_code() != ErrorCode::Internal {
 899                    anyhow!(e.error_code())
 900                } else {
 901                    anyhow!("{e}")
 902                }
 903            })
 904        })
 905    }
 906
 907    pub fn create_buffer(
 908        &mut self,
 909        language: Option<Arc<Language>>,
 910        project_searchable: bool,
 911        cx: &mut Context<Self>,
 912    ) -> Task<Result<Entity<Buffer>>> {
 913        match &self.state {
 914            BufferStoreState::Local(this) => this.create_buffer(language, project_searchable, cx),
 915            BufferStoreState::Remote(this) => this.create_buffer(language, project_searchable, cx),
 916        }
 917    }
 918
 919    pub fn save_buffer(
 920        &mut self,
 921        buffer: Entity<Buffer>,
 922        cx: &mut Context<Self>,
 923    ) -> Task<Result<()>> {
 924        match &mut self.state {
 925            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
 926            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer, None, cx),
 927        }
 928    }
 929
 930    pub fn save_buffer_as(
 931        &mut self,
 932        buffer: Entity<Buffer>,
 933        path: ProjectPath,
 934        cx: &mut Context<Self>,
 935    ) -> Task<Result<()>> {
 936        let old_file = buffer.read(cx).file().cloned();
 937        let task = match &self.state {
 938            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
 939            BufferStoreState::Remote(this) => {
 940                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
 941            }
 942        };
 943        cx.spawn(async move |this, cx| {
 944            task.await?;
 945            this.update(cx, |this, cx| {
 946                old_file.clone().and_then(|file| {
 947                    this.path_to_buffer_id.remove(&ProjectPath {
 948                        worktree_id: file.worktree_id(cx),
 949                        path: file.path().clone(),
 950                    })
 951                });
 952
 953                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 954            })
 955        })
 956    }
 957
 958    fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
 959        let buffer = buffer_entity.read(cx);
 960        let remote_id = buffer.remote_id();
 961        let path = File::from_dyn(buffer.file()).map(|file| ProjectPath {
 962            path: file.path.clone(),
 963            worktree_id: file.worktree_id(cx),
 964        });
 965        let is_remote = buffer.replica_id().is_remote();
 966        let open_buffer = OpenBuffer::Complete {
 967            buffer: buffer_entity.downgrade(),
 968        };
 969
 970        let handle = cx.entity().downgrade();
 971        buffer_entity.update(cx, move |_, cx| {
 972            cx.on_release(move |buffer, cx| {
 973                handle
 974                    .update(cx, |_, cx| {
 975                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
 976                    })
 977                    .ok();
 978            })
 979            .detach()
 980        });
 981        let _expect_path_to_exist;
 982        match self.opened_buffers.entry(remote_id) {
 983            hash_map::Entry::Vacant(entry) => {
 984                entry.insert(open_buffer);
 985                _expect_path_to_exist = false;
 986            }
 987            hash_map::Entry::Occupied(mut entry) => {
 988                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 989                    buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
 990                } else if entry.get().upgrade().is_some() {
 991                    if is_remote {
 992                        return Ok(());
 993                    } else {
 994                        debug_panic!("buffer {remote_id} was already registered");
 995                        anyhow::bail!("buffer {remote_id} was already registered");
 996                    }
 997                }
 998                entry.insert(open_buffer);
 999                _expect_path_to_exist = true;
1000            }
1001        }
1002
1003        if let Some(path) = path {
1004            self.path_to_buffer_id.insert(path, remote_id);
1005        }
1006
1007        cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
1008        cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
1009        Ok(())
1010    }
1011
1012    pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
1013        self.opened_buffers
1014            .values()
1015            .filter_map(|buffer| buffer.upgrade())
1016    }
1017
1018    pub(crate) fn is_searchable(&self, id: &BufferId) -> bool {
1019        !self.non_searchable_buffers.contains(&id)
1020    }
1021
1022    pub fn loading_buffers(
1023        &self,
1024    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
1025        self.loading_buffers.iter().map(|(path, task)| {
1026            let task = task.clone();
1027            (path, async move {
1028                task.await.map_err(|e| {
1029                    if e.error_code() != ErrorCode::Internal {
1030                        anyhow!(e.error_code())
1031                    } else {
1032                        anyhow!("{e}")
1033                    }
1034                })
1035            })
1036        })
1037    }
1038
1039    pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
1040        self.path_to_buffer_id.get(project_path)
1041    }
1042
1043    pub fn get_by_path(&self, path: &ProjectPath) -> Option<Entity<Buffer>> {
1044        self.path_to_buffer_id
1045            .get(path)
1046            .and_then(|buffer_id| self.get(*buffer_id))
1047    }
1048
1049    pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1050        self.opened_buffers.get(&buffer_id)?.upgrade()
1051    }
1052
1053    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1054        self.get(buffer_id)
1055            .with_context(|| format!("unknown buffer id {buffer_id}"))
1056    }
1057
1058    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1059        self.get(buffer_id).or_else(|| {
1060            self.as_remote()
1061                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1062        })
1063    }
1064
1065    pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1066        let buffers = self
1067            .buffers()
1068            .map(|buffer| {
1069                let buffer = buffer.read(cx);
1070                proto::BufferVersion {
1071                    id: buffer.remote_id().into(),
1072                    version: language::proto::serialize_version(&buffer.version),
1073                }
1074            })
1075            .collect();
1076        let incomplete_buffer_ids = self
1077            .as_remote()
1078            .map(|remote| remote.incomplete_buffer_ids())
1079            .unwrap_or_default();
1080        (buffers, incomplete_buffer_ids)
1081    }
1082
1083    pub fn disconnected_from_host(&mut self, cx: &mut App) {
1084        for open_buffer in self.opened_buffers.values_mut() {
1085            if let Some(buffer) = open_buffer.upgrade() {
1086                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1087            }
1088        }
1089
1090        for buffer in self.buffers() {
1091            buffer.update(cx, |buffer, cx| {
1092                buffer.set_capability(Capability::ReadOnly, cx)
1093            });
1094        }
1095
1096        if let Some(remote) = self.as_remote_mut() {
1097            // Wake up all futures currently waiting on a buffer to get opened,
1098            // to give them a chance to fail now that we've disconnected.
1099            remote.remote_buffer_listeners.clear()
1100        }
1101    }
1102
1103    pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1104        self.downstream_client = Some((downstream_client, remote_id));
1105    }
1106
1107    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1108        self.downstream_client.take();
1109        self.forget_shared_buffers();
1110    }
1111
1112    pub fn discard_incomplete(&mut self) {
1113        self.opened_buffers
1114            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1115    }
1116
1117    fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1118        let file = File::from_dyn(buffer.read(cx).file())?;
1119
1120        let remote_id = buffer.read(cx).remote_id();
1121        if let Some(entry_id) = file.entry_id {
1122            if let Some(local) = self.as_local_mut() {
1123                match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1124                    Some(_) => {
1125                        return None;
1126                    }
1127                    None => {
1128                        local
1129                            .local_buffer_ids_by_entry_id
1130                            .insert(entry_id, remote_id);
1131                    }
1132                }
1133            }
1134            self.path_to_buffer_id.insert(
1135                ProjectPath {
1136                    worktree_id: file.worktree_id(cx),
1137                    path: file.path.clone(),
1138                },
1139                remote_id,
1140            );
1141        };
1142
1143        Some(())
1144    }
1145
1146    fn on_buffer_event(
1147        &mut self,
1148        buffer: Entity<Buffer>,
1149        event: &BufferEvent,
1150        cx: &mut Context<Self>,
1151    ) {
1152        match event {
1153            BufferEvent::FileHandleChanged => {
1154                self.buffer_changed_file(buffer, cx);
1155            }
1156            BufferEvent::Reloaded => {
1157                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1158                    return;
1159                };
1160                let buffer = buffer.read(cx);
1161                downstream_client
1162                    .send(proto::BufferReloaded {
1163                        project_id: *project_id,
1164                        buffer_id: buffer.remote_id().to_proto(),
1165                        version: serialize_version(&buffer.version()),
1166                        mtime: buffer.saved_mtime().map(|t| t.into()),
1167                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1168                    })
1169                    .log_err();
1170            }
1171            BufferEvent::LanguageChanged(_) => {}
1172            _ => {}
1173        }
1174    }
1175
1176    pub async fn handle_update_buffer(
1177        this: Entity<Self>,
1178        envelope: TypedEnvelope<proto::UpdateBuffer>,
1179        mut cx: AsyncApp,
1180    ) -> Result<proto::Ack> {
1181        let payload = envelope.payload;
1182        let buffer_id = BufferId::new(payload.buffer_id)?;
1183        let ops = payload
1184            .operations
1185            .into_iter()
1186            .map(language::proto::deserialize_operation)
1187            .collect::<Result<Vec<_>, _>>()?;
1188        this.update(&mut cx, |this, cx| {
1189            match this.opened_buffers.entry(buffer_id) {
1190                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1191                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1192                    OpenBuffer::Complete { buffer, .. } => {
1193                        if let Some(buffer) = buffer.upgrade() {
1194                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1195                        }
1196                    }
1197                },
1198                hash_map::Entry::Vacant(e) => {
1199                    e.insert(OpenBuffer::Operations(ops));
1200                }
1201            }
1202            Ok(proto::Ack {})
1203        })
1204    }
1205
1206    pub fn register_shared_lsp_handle(
1207        &mut self,
1208        peer_id: proto::PeerId,
1209        buffer_id: BufferId,
1210        handle: OpenLspBufferHandle,
1211    ) {
1212        if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id)
1213            && let Some(buffer) = shared_buffers.get_mut(&buffer_id)
1214        {
1215            buffer.lsp_handle = Some(handle);
1216            return;
1217        }
1218        debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1219    }
1220
1221    pub fn handle_synchronize_buffers(
1222        &mut self,
1223        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1224        cx: &mut Context<Self>,
1225        client: Arc<Client>,
1226    ) -> Result<proto::SynchronizeBuffersResponse> {
1227        let project_id = envelope.payload.project_id;
1228        let mut response = proto::SynchronizeBuffersResponse {
1229            buffers: Default::default(),
1230        };
1231        let Some(guest_id) = envelope.original_sender_id else {
1232            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1233        };
1234
1235        self.shared_buffers.entry(guest_id).or_default().clear();
1236        for buffer in envelope.payload.buffers {
1237            let buffer_id = BufferId::new(buffer.id)?;
1238            let remote_version = language::proto::deserialize_version(&buffer.version);
1239            if let Some(buffer) = self.get(buffer_id) {
1240                self.shared_buffers
1241                    .entry(guest_id)
1242                    .or_default()
1243                    .entry(buffer_id)
1244                    .or_insert_with(|| SharedBuffer {
1245                        buffer: buffer.clone(),
1246                        lsp_handle: None,
1247                    });
1248
1249                let buffer = buffer.read(cx);
1250                response.buffers.push(proto::BufferVersion {
1251                    id: buffer_id.into(),
1252                    version: language::proto::serialize_version(&buffer.version),
1253                });
1254
1255                let operations = buffer.serialize_ops(Some(remote_version), cx);
1256                let client = client.clone();
1257                if let Some(file) = buffer.file() {
1258                    client
1259                        .send(proto::UpdateBufferFile {
1260                            project_id,
1261                            buffer_id: buffer_id.into(),
1262                            file: Some(file.to_proto(cx)),
1263                        })
1264                        .log_err();
1265                }
1266
1267                // TODO(max): do something
1268                // client
1269                //     .send(proto::UpdateStagedText {
1270                //         project_id,
1271                //         buffer_id: buffer_id.into(),
1272                //         diff_base: buffer.diff_base().map(ToString::to_string),
1273                //     })
1274                //     .log_err();
1275
1276                client
1277                    .send(proto::BufferReloaded {
1278                        project_id,
1279                        buffer_id: buffer_id.into(),
1280                        version: language::proto::serialize_version(buffer.saved_version()),
1281                        mtime: buffer.saved_mtime().map(|time| time.into()),
1282                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1283                            as i32,
1284                    })
1285                    .log_err();
1286
1287                cx.background_spawn(
1288                    async move {
1289                        let operations = operations.await;
1290                        for chunk in split_operations(operations) {
1291                            client
1292                                .request(proto::UpdateBuffer {
1293                                    project_id,
1294                                    buffer_id: buffer_id.into(),
1295                                    operations: chunk,
1296                                })
1297                                .await?;
1298                        }
1299                        anyhow::Ok(())
1300                    }
1301                    .log_err(),
1302                )
1303                .detach();
1304            }
1305        }
1306        Ok(response)
1307    }
1308
1309    pub fn handle_create_buffer_for_peer(
1310        &mut self,
1311        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1312        replica_id: ReplicaId,
1313        capability: Capability,
1314        cx: &mut Context<Self>,
1315    ) -> Result<()> {
1316        let remote = self
1317            .as_remote_mut()
1318            .context("buffer store is not a remote")?;
1319
1320        if let Some(buffer) =
1321            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1322        {
1323            self.add_buffer(buffer, cx)?;
1324        }
1325
1326        Ok(())
1327    }
1328
1329    pub async fn handle_update_buffer_file(
1330        this: Entity<Self>,
1331        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1332        mut cx: AsyncApp,
1333    ) -> Result<()> {
1334        let buffer_id = envelope.payload.buffer_id;
1335        let buffer_id = BufferId::new(buffer_id)?;
1336
1337        this.update(&mut cx, |this, cx| {
1338            let payload = envelope.payload.clone();
1339            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1340                let file = payload.file.context("invalid file")?;
1341                let worktree = this
1342                    .worktree_store
1343                    .read(cx)
1344                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1345                    .context("no such worktree")?;
1346                let file = File::from_proto(file, worktree, cx)?;
1347                let old_file = buffer.update(cx, |buffer, cx| {
1348                    let old_file = buffer.file().cloned();
1349                    let new_path = file.path.clone();
1350
1351                    buffer.file_updated(Arc::new(file), cx);
1352                    if old_file.as_ref().is_none_or(|old| *old.path() != new_path) {
1353                        Some(old_file)
1354                    } else {
1355                        None
1356                    }
1357                });
1358                if let Some(old_file) = old_file {
1359                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1360                }
1361            }
1362            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1363                downstream_client
1364                    .send(proto::UpdateBufferFile {
1365                        project_id: *project_id,
1366                        buffer_id: buffer_id.into(),
1367                        file: envelope.payload.file,
1368                    })
1369                    .log_err();
1370            }
1371            Ok(())
1372        })
1373    }
1374
1375    pub async fn handle_save_buffer(
1376        this: Entity<Self>,
1377        envelope: TypedEnvelope<proto::SaveBuffer>,
1378        mut cx: AsyncApp,
1379    ) -> Result<proto::BufferSaved> {
1380        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1381        let (buffer, project_id) = this.read_with(&cx, |this, _| {
1382            anyhow::Ok((
1383                this.get_existing(buffer_id)?,
1384                this.downstream_client
1385                    .as_ref()
1386                    .map(|(_, project_id)| *project_id)
1387                    .context("project is not shared")?,
1388            ))
1389        })?;
1390        buffer
1391            .update(&mut cx, |buffer, _| {
1392                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1393            })
1394            .await?;
1395        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
1396
1397        if let Some(new_path) = envelope.payload.new_path
1398            && let Some(new_path) = ProjectPath::from_proto(new_path)
1399        {
1400            this.update(&mut cx, |this, cx| {
1401                this.save_buffer_as(buffer.clone(), new_path, cx)
1402            })
1403            .await?;
1404        } else {
1405            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))
1406                .await?;
1407        }
1408
1409        Ok(buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
1410            project_id,
1411            buffer_id: buffer_id.into(),
1412            version: serialize_version(buffer.saved_version()),
1413            mtime: buffer.saved_mtime().map(|time| time.into()),
1414        }))
1415    }
1416
1417    pub async fn handle_close_buffer(
1418        this: Entity<Self>,
1419        envelope: TypedEnvelope<proto::CloseBuffer>,
1420        mut cx: AsyncApp,
1421    ) -> Result<()> {
1422        let peer_id = envelope.sender_id;
1423        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1424        this.update(&mut cx, |this, cx| {
1425            if let Some(shared) = this.shared_buffers.get_mut(&peer_id)
1426                && shared.remove(&buffer_id).is_some()
1427            {
1428                cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1429                if shared.is_empty() {
1430                    this.shared_buffers.remove(&peer_id);
1431                }
1432                return;
1433            }
1434            debug_panic!(
1435                "peer_id {} closed buffer_id {} which was either not open or already closed",
1436                peer_id,
1437                buffer_id
1438            )
1439        });
1440        Ok(())
1441    }
1442
1443    pub async fn handle_buffer_saved(
1444        this: Entity<Self>,
1445        envelope: TypedEnvelope<proto::BufferSaved>,
1446        mut cx: AsyncApp,
1447    ) -> Result<()> {
1448        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1449        let version = deserialize_version(&envelope.payload.version);
1450        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1451        this.update(&mut cx, move |this, cx| {
1452            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1453                buffer.update(cx, |buffer, cx| {
1454                    buffer.did_save(version, mtime, cx);
1455                });
1456            }
1457
1458            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1459                downstream_client
1460                    .send(proto::BufferSaved {
1461                        project_id: *project_id,
1462                        buffer_id: buffer_id.into(),
1463                        mtime: envelope.payload.mtime,
1464                        version: envelope.payload.version,
1465                    })
1466                    .log_err();
1467            }
1468        });
1469        Ok(())
1470    }
1471
1472    pub async fn handle_buffer_reloaded(
1473        this: Entity<Self>,
1474        envelope: TypedEnvelope<proto::BufferReloaded>,
1475        mut cx: AsyncApp,
1476    ) -> Result<()> {
1477        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1478        let version = deserialize_version(&envelope.payload.version);
1479        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1480        let line_ending = deserialize_line_ending(
1481            proto::LineEnding::from_i32(envelope.payload.line_ending)
1482                .context("missing line ending")?,
1483        );
1484        this.update(&mut cx, |this, cx| {
1485            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1486                buffer.update(cx, |buffer, cx| {
1487                    buffer.did_reload(version, line_ending, mtime, cx);
1488                });
1489            }
1490
1491            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1492                downstream_client
1493                    .send(proto::BufferReloaded {
1494                        project_id: *project_id,
1495                        buffer_id: buffer_id.into(),
1496                        mtime: envelope.payload.mtime,
1497                        version: envelope.payload.version,
1498                        line_ending: envelope.payload.line_ending,
1499                    })
1500                    .log_err();
1501            }
1502        });
1503        Ok(())
1504    }
1505
1506    pub fn reload_buffers(
1507        &self,
1508        buffers: HashSet<Entity<Buffer>>,
1509        push_to_history: bool,
1510        cx: &mut Context<Self>,
1511    ) -> Task<Result<ProjectTransaction>> {
1512        if buffers.is_empty() {
1513            return Task::ready(Ok(ProjectTransaction::default()));
1514        }
1515        match &self.state {
1516            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1517            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1518        }
1519    }
1520
1521    async fn handle_reload_buffers(
1522        this: Entity<Self>,
1523        envelope: TypedEnvelope<proto::ReloadBuffers>,
1524        mut cx: AsyncApp,
1525    ) -> Result<proto::ReloadBuffersResponse> {
1526        let sender_id = envelope.original_sender_id().unwrap_or_default();
1527        let reload = this.update(&mut cx, |this, cx| {
1528            let mut buffers = HashSet::default();
1529            for buffer_id in &envelope.payload.buffer_ids {
1530                let buffer_id = BufferId::new(*buffer_id)?;
1531                buffers.insert(this.get_existing(buffer_id)?);
1532            }
1533            anyhow::Ok(this.reload_buffers(buffers, false, cx))
1534        })?;
1535
1536        let project_transaction = reload.await?;
1537        let project_transaction = this.update(&mut cx, |this, cx| {
1538            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1539        });
1540        Ok(proto::ReloadBuffersResponse {
1541            transaction: Some(project_transaction),
1542        })
1543    }
1544
1545    pub fn create_buffer_for_peer(
1546        &mut self,
1547        buffer: &Entity<Buffer>,
1548        peer_id: proto::PeerId,
1549        cx: &mut Context<Self>,
1550    ) -> Task<Result<()>> {
1551        let buffer_id = buffer.read(cx).remote_id();
1552        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1553        if shared_buffers.contains_key(&buffer_id) {
1554            return Task::ready(Ok(()));
1555        }
1556        shared_buffers.insert(
1557            buffer_id,
1558            SharedBuffer {
1559                buffer: buffer.clone(),
1560                lsp_handle: None,
1561            },
1562        );
1563
1564        let Some((client, project_id)) = self.downstream_client.clone() else {
1565            return Task::ready(Ok(()));
1566        };
1567
1568        cx.spawn(async move |this, cx| {
1569            let Some(buffer) = this.read_with(cx, |this, _| this.get(buffer_id))? else {
1570                return anyhow::Ok(());
1571            };
1572
1573            let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx));
1574            let operations = operations.await;
1575            let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx));
1576
1577            let initial_state = proto::CreateBufferForPeer {
1578                project_id,
1579                peer_id: Some(peer_id),
1580                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1581            };
1582
1583            if client.send(initial_state).log_err().is_some() {
1584                let client = client.clone();
1585                cx.background_spawn(async move {
1586                    let mut chunks = split_operations(operations).peekable();
1587                    while let Some(chunk) = chunks.next() {
1588                        let is_last = chunks.peek().is_none();
1589                        client.send(proto::CreateBufferForPeer {
1590                            project_id,
1591                            peer_id: Some(peer_id),
1592                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1593                                proto::BufferChunk {
1594                                    buffer_id: buffer_id.into(),
1595                                    operations: chunk,
1596                                    is_last,
1597                                },
1598                            )),
1599                        })?;
1600                    }
1601                    anyhow::Ok(())
1602                })
1603                .await
1604                .log_err();
1605            }
1606            Ok(())
1607        })
1608    }
1609
1610    pub fn forget_shared_buffers(&mut self) {
1611        self.shared_buffers.clear();
1612    }
1613
1614    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1615        self.shared_buffers.remove(peer_id);
1616    }
1617
1618    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1619        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1620            self.shared_buffers.insert(new_peer_id, buffers);
1621        }
1622    }
1623
1624    pub fn has_shared_buffers(&self) -> bool {
1625        !self.shared_buffers.is_empty()
1626    }
1627
1628    pub fn create_local_buffer(
1629        &mut self,
1630        text: &str,
1631        language: Option<Arc<Language>>,
1632        project_searchable: bool,
1633        cx: &mut Context<Self>,
1634    ) -> Entity<Buffer> {
1635        let buffer = cx.new(|cx| {
1636            let mut buffer = Buffer::local(text, cx)
1637                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx);
1638            apply_initial_line_ending(&mut buffer, cx);
1639            buffer
1640        });
1641
1642        self.add_buffer(buffer.clone(), cx).log_err();
1643        let buffer_id = buffer.read(cx).remote_id();
1644        if !project_searchable {
1645            self.non_searchable_buffers.insert(buffer_id);
1646        }
1647
1648        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1649            self.path_to_buffer_id.insert(
1650                ProjectPath {
1651                    worktree_id: file.worktree_id(cx),
1652                    path: file.path.clone(),
1653                },
1654                buffer_id,
1655            );
1656            let this = self
1657                .as_local_mut()
1658                .expect("local-only method called in a non-local context");
1659            if let Some(entry_id) = file.entry_id {
1660                this.local_buffer_ids_by_entry_id
1661                    .insert(entry_id, buffer_id);
1662            }
1663        }
1664        buffer
1665    }
1666
1667    pub fn deserialize_project_transaction(
1668        &mut self,
1669        message: proto::ProjectTransaction,
1670        push_to_history: bool,
1671        cx: &mut Context<Self>,
1672    ) -> Task<Result<ProjectTransaction>> {
1673        if let Some(this) = self.as_remote_mut() {
1674            this.deserialize_project_transaction(message, push_to_history, cx)
1675        } else {
1676            debug_panic!("not a remote buffer store");
1677            Task::ready(Err(anyhow!("not a remote buffer store")))
1678        }
1679    }
1680
1681    pub fn wait_for_remote_buffer(
1682        &mut self,
1683        id: BufferId,
1684        cx: &mut Context<BufferStore>,
1685    ) -> Task<Result<Entity<Buffer>>> {
1686        if let Some(this) = self.as_remote_mut() {
1687            this.wait_for_remote_buffer(id, cx)
1688        } else {
1689            debug_panic!("not a remote buffer store");
1690            Task::ready(Err(anyhow!("not a remote buffer store")))
1691        }
1692    }
1693
1694    pub fn serialize_project_transaction_for_peer(
1695        &mut self,
1696        project_transaction: ProjectTransaction,
1697        peer_id: proto::PeerId,
1698        cx: &mut Context<Self>,
1699    ) -> proto::ProjectTransaction {
1700        let mut serialized_transaction = proto::ProjectTransaction {
1701            buffer_ids: Default::default(),
1702            transactions: Default::default(),
1703        };
1704        for (buffer, transaction) in project_transaction.0 {
1705            self.create_buffer_for_peer(&buffer, peer_id, cx)
1706                .detach_and_log_err(cx);
1707            serialized_transaction
1708                .buffer_ids
1709                .push(buffer.read(cx).remote_id().into());
1710            serialized_transaction
1711                .transactions
1712                .push(language::proto::serialize_transaction(&transaction));
1713        }
1714        serialized_transaction
1715    }
1716
1717    pub(crate) fn register_project_search_result_handle(
1718        &mut self,
1719    ) -> (u64, smol::channel::Receiver<BufferId>) {
1720        let (tx, rx) = smol::channel::unbounded();
1721        let handle = util::post_inc(&mut self.project_search.next_id);
1722        let _old_entry = self.project_search.chunks.insert(handle, tx);
1723        debug_assert!(_old_entry.is_none());
1724        (handle, rx)
1725    }
1726
1727    pub fn register_ongoing_project_search(
1728        &mut self,
1729        id: (PeerId, u64),
1730        search: Task<anyhow::Result<()>>,
1731    ) {
1732        let _old = self.project_search.searches_in_progress.insert(id, search);
1733        debug_assert!(_old.is_none());
1734    }
1735
1736    pub async fn handle_find_search_candidates_cancel(
1737        this: Entity<Self>,
1738        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
1739        mut cx: AsyncApp,
1740    ) -> Result<()> {
1741        let id = (
1742            envelope.original_sender_id.unwrap_or(envelope.sender_id),
1743            envelope.payload.handle,
1744        );
1745        let _ = this.update(&mut cx, |this, _| {
1746            this.project_search.searches_in_progress.remove(&id)
1747        });
1748        Ok(())
1749    }
1750
1751    pub(crate) async fn handle_find_search_candidates_chunk(
1752        this: Entity<Self>,
1753        envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
1754        mut cx: AsyncApp,
1755    ) -> Result<proto::Ack> {
1756        use proto::find_search_candidates_chunk::Variant;
1757        let handle = envelope.payload.handle;
1758
1759        let buffer_ids = match envelope
1760            .payload
1761            .variant
1762            .context("Expected non-null variant")?
1763        {
1764            Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches
1765                .buffer_ids
1766                .into_iter()
1767                .filter_map(|buffer_id| BufferId::new(buffer_id).ok())
1768                .collect::<Vec<_>>(),
1769            Variant::Done(_) => {
1770                this.update(&mut cx, |this, _| {
1771                    this.project_search.chunks.remove(&handle)
1772                });
1773                return Ok(proto::Ack {});
1774            }
1775        };
1776        let Some(sender) = this.read_with(&mut cx, |this, _| {
1777            this.project_search.chunks.get(&handle).cloned()
1778        }) else {
1779            return Ok(proto::Ack {});
1780        };
1781
1782        for buffer_id in buffer_ids {
1783            let Ok(_) = sender.send(buffer_id).await else {
1784                this.update(&mut cx, |this, _| {
1785                    this.project_search.chunks.remove(&handle)
1786                });
1787                return Ok(proto::Ack {});
1788            };
1789        }
1790        Ok(proto::Ack {})
1791    }
1792}
1793
1794impl OpenBuffer {
1795    fn upgrade(&self) -> Option<Entity<Buffer>> {
1796        match self {
1797            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1798            OpenBuffer::Operations(_) => None,
1799        }
1800    }
1801}
1802
1803fn is_not_found_error(error: &anyhow::Error) -> bool {
1804    error
1805        .root_cause()
1806        .downcast_ref::<io::Error>()
1807        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1808}
1809
1810fn apply_initial_line_ending(buffer: &mut Buffer, cx: &mut Context<Buffer>) {
1811    // Only applies for empty rope or a single line with no trailing newline.
1812    if buffer.max_point().row > 0 {
1813        return;
1814    }
1815    let location = buffer.file().map(|file| settings::SettingsLocation {
1816        worktree_id: file.worktree_id(cx),
1817        path: file.path().as_ref(),
1818    });
1819    let language = buffer.language().map(|l| l.name());
1820    let settings = AllLanguageSettings::get(location, cx).language(location, language.as_ref(), cx);
1821    let desired = match settings.line_ending {
1822        LineEndingSetting::Detect => return,
1823        LineEndingSetting::PreferLf | LineEndingSetting::EnforceLf => LineEnding::Unix,
1824        LineEndingSetting::PreferCrlf | LineEndingSetting::EnforceCrlf => LineEnding::Windows,
1825    };
1826    if buffer.line_ending() != desired {
1827        buffer.set_line_ending(desired, cx);
1828    }
1829}