buffer_store.rs

   1use crate::{
   2    ProjectPath,
   3    lsp_store::OpenLspBufferHandle,
   4    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   5};
   6use anyhow::{Context as _, Result, anyhow};
   7use client::Client;
   8use collections::{HashMap, HashSet, hash_map};
   9use futures::{Future, FutureExt as _, channel::oneshot, future::Shared};
  10use gpui::{
  11    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
  12};
  13use language::{
  14    Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
  15    proto::{
  16        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  17        split_operations,
  18    },
  19};
  20use rpc::{
  21    AnyProtoClient, ErrorCode, ErrorExt as _, TypedEnvelope,
  22    proto::{self},
  23};
  24
  25use std::{io, sync::Arc, time::Instant};
  26use text::{BufferId, ReplicaId};
  27use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath};
  28use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId};
  29
  30/// A set of open buffers.
  31pub struct BufferStore {
  32    state: BufferStoreState,
  33    #[allow(clippy::type_complexity)]
  34    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
  35    worktree_store: Entity<WorktreeStore>,
  36    opened_buffers: HashMap<BufferId, OpenBuffer>,
  37    path_to_buffer_id: HashMap<ProjectPath, BufferId>,
  38    downstream_client: Option<(AnyProtoClient, u64)>,
  39    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  40    non_searchable_buffers: HashSet<BufferId>,
  41}
  42
  43#[derive(Hash, Eq, PartialEq, Clone)]
  44struct SharedBuffer {
  45    buffer: Entity<Buffer>,
  46    lsp_handle: Option<OpenLspBufferHandle>,
  47}
  48
  49enum BufferStoreState {
  50    Local(LocalBufferStore),
  51    Remote(RemoteBufferStore),
  52}
  53
  54struct RemoteBufferStore {
  55    shared_with_me: HashSet<Entity<Buffer>>,
  56    upstream_client: AnyProtoClient,
  57    project_id: u64,
  58    loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
  59    remote_buffer_listeners:
  60        HashMap<BufferId, Vec<oneshot::Sender<anyhow::Result<Entity<Buffer>>>>>,
  61    worktree_store: Entity<WorktreeStore>,
  62}
  63
  64struct LocalBufferStore {
  65    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  66    worktree_store: Entity<WorktreeStore>,
  67    _subscription: Subscription,
  68}
  69
  70enum OpenBuffer {
  71    Complete { buffer: WeakEntity<Buffer> },
  72    Operations(Vec<Operation>),
  73}
  74
  75pub enum BufferStoreEvent {
  76    BufferAdded(Entity<Buffer>),
  77    BufferOpened {
  78        buffer: Entity<Buffer>,
  79        project_path: ProjectPath,
  80    },
  81    SharedBufferClosed(proto::PeerId, BufferId),
  82    BufferDropped(BufferId),
  83    BufferChangedFilePath {
  84        buffer: Entity<Buffer>,
  85        old_file: Option<Arc<dyn language::File>>,
  86    },
  87}
  88
  89#[derive(Default, Debug, Clone)]
  90pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
  91
  92impl PartialEq for ProjectTransaction {
  93    fn eq(&self, other: &Self) -> bool {
  94        self.0.len() == other.0.len()
  95            && self.0.iter().all(|(buffer, transaction)| {
  96                other.0.get(buffer).is_some_and(|t| t.id == transaction.id)
  97            })
  98    }
  99}
 100
 101impl EventEmitter<BufferStoreEvent> for BufferStore {}
 102
 103impl RemoteBufferStore {
 104    pub fn wait_for_remote_buffer(
 105        &mut self,
 106        id: BufferId,
 107        cx: &mut Context<BufferStore>,
 108    ) -> Task<Result<Entity<Buffer>>> {
 109        let (tx, rx) = oneshot::channel();
 110        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 111
 112        cx.spawn(async move |this, cx| {
 113            if let Some(buffer) = this
 114                .read_with(cx, |buffer_store, _| buffer_store.get(id))
 115                .ok()
 116                .flatten()
 117            {
 118                return Ok(buffer);
 119            }
 120
 121            cx.background_spawn(async move { rx.await? }).await
 122        })
 123    }
 124
 125    fn save_remote_buffer(
 126        &self,
 127        buffer_handle: Entity<Buffer>,
 128        new_path: Option<proto::ProjectPath>,
 129        cx: &Context<BufferStore>,
 130    ) -> Task<Result<()>> {
 131        let buffer = buffer_handle.read(cx);
 132        let buffer_id = buffer.remote_id().into();
 133        let version = buffer.version();
 134        let rpc = self.upstream_client.clone();
 135        let project_id = self.project_id;
 136        cx.spawn(async move |_, cx| {
 137            let response = rpc
 138                .request(proto::SaveBuffer {
 139                    project_id,
 140                    buffer_id,
 141                    new_path,
 142                    version: serialize_version(&version),
 143                })
 144                .await?;
 145            let version = deserialize_version(&response.version);
 146            let mtime = response.mtime.map(|mtime| mtime.into());
 147
 148            buffer_handle.update(cx, |buffer, cx| {
 149                buffer.did_save(version.clone(), mtime, cx);
 150            })?;
 151
 152            Ok(())
 153        })
 154    }
 155
 156    pub fn handle_create_buffer_for_peer(
 157        &mut self,
 158        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 159        replica_id: ReplicaId,
 160        capability: Capability,
 161        cx: &mut Context<BufferStore>,
 162    ) -> Result<Option<Entity<Buffer>>> {
 163        match envelope.payload.variant.context("missing variant")? {
 164            proto::create_buffer_for_peer::Variant::State(mut state) => {
 165                let buffer_id = BufferId::new(state.id)?;
 166
 167                let buffer_result = maybe!({
 168                    let mut buffer_file = None;
 169                    if let Some(file) = state.file.take() {
 170                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 171                        let worktree = self
 172                            .worktree_store
 173                            .read(cx)
 174                            .worktree_for_id(worktree_id, cx)
 175                            .with_context(|| {
 176                                format!("no worktree found for id {}", file.worktree_id)
 177                            })?;
 178                        buffer_file = Some(Arc::new(File::from_proto(file, worktree, cx)?)
 179                            as Arc<dyn language::File>);
 180                    }
 181                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 182                });
 183
 184                match buffer_result {
 185                    Ok(buffer) => {
 186                        let buffer = cx.new(|_| buffer);
 187                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 188                    }
 189                    Err(error) => {
 190                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 191                            for listener in listeners {
 192                                listener.send(Err(anyhow!(error.cloned()))).ok();
 193                            }
 194                        }
 195                    }
 196                }
 197            }
 198            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 199                let buffer_id = BufferId::new(chunk.buffer_id)?;
 200                let buffer = self
 201                    .loading_remote_buffers_by_id
 202                    .get(&buffer_id)
 203                    .cloned()
 204                    .with_context(|| {
 205                        format!(
 206                            "received chunk for buffer {} without initial state",
 207                            chunk.buffer_id
 208                        )
 209                    })?;
 210
 211                let result = maybe!({
 212                    let operations = chunk
 213                        .operations
 214                        .into_iter()
 215                        .map(language::proto::deserialize_operation)
 216                        .collect::<Result<Vec<_>>>()?;
 217                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 218                    anyhow::Ok(())
 219                });
 220
 221                if let Err(error) = result {
 222                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 223                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 224                        for listener in listeners {
 225                            listener.send(Err(error.cloned())).ok();
 226                        }
 227                    }
 228                } else if chunk.is_last {
 229                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 230                    if self.upstream_client.is_via_collab() {
 231                        // retain buffers sent by peers to avoid races.
 232                        self.shared_with_me.insert(buffer.clone());
 233                    }
 234
 235                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 236                        for sender in senders {
 237                            sender.send(Ok(buffer.clone())).ok();
 238                        }
 239                    }
 240                    return Ok(Some(buffer));
 241                }
 242            }
 243        }
 244        Ok(None)
 245    }
 246
 247    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 248        self.loading_remote_buffers_by_id
 249            .keys()
 250            .copied()
 251            .collect::<Vec<_>>()
 252    }
 253
 254    pub fn deserialize_project_transaction(
 255        &self,
 256        message: proto::ProjectTransaction,
 257        push_to_history: bool,
 258        cx: &mut Context<BufferStore>,
 259    ) -> Task<Result<ProjectTransaction>> {
 260        cx.spawn(async move |this, cx| {
 261            let mut project_transaction = ProjectTransaction::default();
 262            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 263            {
 264                let buffer_id = BufferId::new(buffer_id)?;
 265                let buffer = this
 266                    .update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
 267                    .await?;
 268                let transaction = language::proto::deserialize_transaction(transaction)?;
 269                project_transaction.0.insert(buffer, transaction);
 270            }
 271
 272            for (buffer, transaction) in &project_transaction.0 {
 273                buffer
 274                    .update(cx, |buffer, _| {
 275                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 276                    })?
 277                    .await?;
 278
 279                if push_to_history {
 280                    buffer.update(cx, |buffer, _| {
 281                        buffer.push_transaction(transaction.clone(), Instant::now());
 282                        buffer.finalize_last_transaction();
 283                    })?;
 284                }
 285            }
 286
 287            Ok(project_transaction)
 288        })
 289    }
 290
 291    fn open_buffer(
 292        &self,
 293        path: Arc<RelPath>,
 294        worktree: Entity<Worktree>,
 295        cx: &mut Context<BufferStore>,
 296    ) -> Task<Result<Entity<Buffer>>> {
 297        let worktree_id = worktree.read(cx).id().to_proto();
 298        let project_id = self.project_id;
 299        let client = self.upstream_client.clone();
 300        cx.spawn(async move |this, cx| {
 301            let response = client
 302                .request(proto::OpenBufferByPath {
 303                    project_id,
 304                    worktree_id,
 305                    path: path.to_proto(),
 306                })
 307                .await?;
 308            let buffer_id = BufferId::new(response.buffer_id)?;
 309
 310            let buffer = this
 311                .update(cx, {
 312                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 313                })?
 314                .await?;
 315
 316            Ok(buffer)
 317        })
 318    }
 319
 320    fn create_buffer(
 321        &self,
 322        project_searchable: bool,
 323        cx: &mut Context<BufferStore>,
 324    ) -> Task<Result<Entity<Buffer>>> {
 325        let create = self.upstream_client.request(proto::OpenNewBuffer {
 326            project_id: self.project_id,
 327        });
 328        cx.spawn(async move |this, cx| {
 329            let response = create.await?;
 330            let buffer_id = BufferId::new(response.buffer_id)?;
 331
 332            this.update(cx, |this, cx| {
 333                if !project_searchable {
 334                    this.non_searchable_buffers.insert(buffer_id);
 335                }
 336                this.wait_for_remote_buffer(buffer_id, cx)
 337            })?
 338            .await
 339        })
 340    }
 341
 342    fn reload_buffers(
 343        &self,
 344        buffers: HashSet<Entity<Buffer>>,
 345        push_to_history: bool,
 346        cx: &mut Context<BufferStore>,
 347    ) -> Task<Result<ProjectTransaction>> {
 348        let request = self.upstream_client.request(proto::ReloadBuffers {
 349            project_id: self.project_id,
 350            buffer_ids: buffers
 351                .iter()
 352                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 353                .collect(),
 354        });
 355
 356        cx.spawn(async move |this, cx| {
 357            let response = request.await?.transaction.context("missing transaction")?;
 358            this.update(cx, |this, cx| {
 359                this.deserialize_project_transaction(response, push_to_history, cx)
 360            })?
 361            .await
 362        })
 363    }
 364}
 365
 366impl LocalBufferStore {
 367    fn save_local_buffer(
 368        &self,
 369        buffer_handle: Entity<Buffer>,
 370        worktree: Entity<Worktree>,
 371        path: Arc<RelPath>,
 372        mut has_changed_file: bool,
 373        cx: &mut Context<BufferStore>,
 374    ) -> Task<Result<()>> {
 375        let buffer = buffer_handle.read(cx);
 376
 377        let text = buffer.as_rope().clone();
 378        let line_ending = buffer.line_ending();
 379        let encoding = buffer.encoding();
 380        let has_bom = buffer.has_bom();
 381        let version = buffer.version();
 382        let buffer_id = buffer.remote_id();
 383        let file = buffer.file().cloned();
 384        if file
 385            .as_ref()
 386            .is_some_and(|file| file.disk_state() == DiskState::New)
 387        {
 388            has_changed_file = true;
 389        }
 390
 391        let save = worktree.update(cx, |worktree, cx| {
 392            worktree.write_file(path, text, line_ending, encoding, has_bom, cx)
 393        });
 394
 395        cx.spawn(async move |this, cx| {
 396            let new_file = save.await?;
 397            let mtime = new_file.disk_state().mtime();
 398            this.update(cx, |this, cx| {
 399                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 400                    if has_changed_file {
 401                        downstream_client
 402                            .send(proto::UpdateBufferFile {
 403                                project_id,
 404                                buffer_id: buffer_id.to_proto(),
 405                                file: Some(language::File::to_proto(&*new_file, cx)),
 406                            })
 407                            .log_err();
 408                    }
 409                    downstream_client
 410                        .send(proto::BufferSaved {
 411                            project_id,
 412                            buffer_id: buffer_id.to_proto(),
 413                            version: serialize_version(&version),
 414                            mtime: mtime.map(|time| time.into()),
 415                        })
 416                        .log_err();
 417                }
 418            })?;
 419            buffer_handle.update(cx, |buffer, cx| {
 420                if has_changed_file {
 421                    buffer.file_updated(new_file, cx);
 422                }
 423                buffer.did_save(version.clone(), mtime, cx);
 424            })
 425        })
 426    }
 427
 428    fn subscribe_to_worktree(
 429        &mut self,
 430        worktree: &Entity<Worktree>,
 431        cx: &mut Context<BufferStore>,
 432    ) {
 433        cx.subscribe(worktree, |this, worktree, event, cx| {
 434            if worktree.read(cx).is_local()
 435                && let worktree::Event::UpdatedEntries(changes) = event
 436            {
 437                Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 438            }
 439        })
 440        .detach();
 441    }
 442
 443    fn local_worktree_entries_changed(
 444        this: &mut BufferStore,
 445        worktree_handle: &Entity<Worktree>,
 446        changes: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
 447        cx: &mut Context<BufferStore>,
 448    ) {
 449        let snapshot = worktree_handle.read(cx).snapshot();
 450        for (path, entry_id, _) in changes {
 451            Self::local_worktree_entry_changed(
 452                this,
 453                *entry_id,
 454                path,
 455                worktree_handle,
 456                &snapshot,
 457                cx,
 458            );
 459        }
 460    }
 461
 462    fn local_worktree_entry_changed(
 463        this: &mut BufferStore,
 464        entry_id: ProjectEntryId,
 465        path: &Arc<RelPath>,
 466        worktree: &Entity<worktree::Worktree>,
 467        snapshot: &worktree::Snapshot,
 468        cx: &mut Context<BufferStore>,
 469    ) -> Option<()> {
 470        let project_path = ProjectPath {
 471            worktree_id: snapshot.id(),
 472            path: path.clone(),
 473        };
 474
 475        let buffer_id = this
 476            .as_local_mut()
 477            .and_then(|local| local.local_buffer_ids_by_entry_id.get(&entry_id))
 478            .copied()
 479            .or_else(|| this.path_to_buffer_id.get(&project_path).copied())?;
 480
 481        let buffer = if let Some(buffer) = this.get(buffer_id) {
 482            Some(buffer)
 483        } else {
 484            this.opened_buffers.remove(&buffer_id);
 485            this.non_searchable_buffers.remove(&buffer_id);
 486            None
 487        };
 488
 489        let buffer = if let Some(buffer) = buffer {
 490            buffer
 491        } else {
 492            this.path_to_buffer_id.remove(&project_path);
 493            let this = this.as_local_mut()?;
 494            this.local_buffer_ids_by_entry_id.remove(&entry_id);
 495            return None;
 496        };
 497
 498        let events = buffer.update(cx, |buffer, cx| {
 499            let file = buffer.file()?;
 500            let old_file = File::from_dyn(Some(file))?;
 501            if old_file.worktree != *worktree {
 502                return None;
 503            }
 504
 505            let snapshot_entry = old_file
 506                .entry_id
 507                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 508                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
 509
 510            let new_file = if let Some(entry) = snapshot_entry {
 511                File {
 512                    disk_state: match entry.mtime {
 513                        Some(mtime) => DiskState::Present { mtime },
 514                        None => old_file.disk_state,
 515                    },
 516                    is_local: true,
 517                    entry_id: Some(entry.id),
 518                    path: entry.path.clone(),
 519                    worktree: worktree.clone(),
 520                    is_private: entry.is_private,
 521                }
 522            } else {
 523                File {
 524                    disk_state: DiskState::Deleted,
 525                    is_local: true,
 526                    entry_id: old_file.entry_id,
 527                    path: old_file.path.clone(),
 528                    worktree: worktree.clone(),
 529                    is_private: old_file.is_private,
 530                }
 531            };
 532
 533            if new_file == *old_file {
 534                return None;
 535            }
 536
 537            let mut events = Vec::new();
 538            if new_file.path != old_file.path {
 539                this.path_to_buffer_id.remove(&ProjectPath {
 540                    path: old_file.path.clone(),
 541                    worktree_id: old_file.worktree_id(cx),
 542                });
 543                this.path_to_buffer_id.insert(
 544                    ProjectPath {
 545                        worktree_id: new_file.worktree_id(cx),
 546                        path: new_file.path.clone(),
 547                    },
 548                    buffer_id,
 549                );
 550                events.push(BufferStoreEvent::BufferChangedFilePath {
 551                    buffer: cx.entity(),
 552                    old_file: buffer.file().cloned(),
 553                });
 554            }
 555            let local = this.as_local_mut()?;
 556            if new_file.entry_id != old_file.entry_id {
 557                if let Some(entry_id) = old_file.entry_id {
 558                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
 559                }
 560                if let Some(entry_id) = new_file.entry_id {
 561                    local
 562                        .local_buffer_ids_by_entry_id
 563                        .insert(entry_id, buffer_id);
 564                }
 565            }
 566
 567            if let Some((client, project_id)) = &this.downstream_client {
 568                client
 569                    .send(proto::UpdateBufferFile {
 570                        project_id: *project_id,
 571                        buffer_id: buffer_id.to_proto(),
 572                        file: Some(new_file.to_proto(cx)),
 573                    })
 574                    .ok();
 575            }
 576
 577            buffer.file_updated(Arc::new(new_file), cx);
 578            Some(events)
 579        })?;
 580
 581        for event in events {
 582            cx.emit(event);
 583        }
 584
 585        None
 586    }
 587
 588    fn save_buffer(
 589        &self,
 590        buffer: Entity<Buffer>,
 591        cx: &mut Context<BufferStore>,
 592    ) -> Task<Result<()>> {
 593        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 594            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 595        };
 596        let worktree = file.worktree.clone();
 597        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 598    }
 599
 600    fn save_buffer_as(
 601        &self,
 602        buffer: Entity<Buffer>,
 603        path: ProjectPath,
 604        cx: &mut Context<BufferStore>,
 605    ) -> Task<Result<()>> {
 606        let Some(worktree) = self
 607            .worktree_store
 608            .read(cx)
 609            .worktree_for_id(path.worktree_id, cx)
 610        else {
 611            return Task::ready(Err(anyhow!("no such worktree")));
 612        };
 613        self.save_local_buffer(buffer, worktree, path.path, true, cx)
 614    }
 615
 616    fn open_buffer(
 617        &self,
 618        path: Arc<RelPath>,
 619        worktree: Entity<Worktree>,
 620        cx: &mut Context<BufferStore>,
 621    ) -> Task<Result<Entity<Buffer>>> {
 622        let load_file = worktree.update(cx, |worktree, cx| worktree.load_file(path.as_ref(), cx));
 623        cx.spawn(async move |this, cx| {
 624            let path = path.clone();
 625            let buffer = match load_file.await {
 626                Ok(loaded) => {
 627                    let reservation = cx.reserve_entity::<Buffer>()?;
 628                    let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 629                    let text_buffer = cx
 630                        .background_spawn(async move {
 631                            text::Buffer::new(ReplicaId::LOCAL, buffer_id, loaded.text)
 632                        })
 633                        .await;
 634                    cx.insert_entity(reservation, |_| {
 635                        let mut buffer =
 636                            Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite);
 637                        buffer.set_encoding(loaded.encoding);
 638                        buffer.set_has_bom(loaded.has_bom);
 639                        buffer
 640                    })?
 641                }
 642                Err(error) if is_not_found_error(&error) => cx.new(|cx| {
 643                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 644                    let text_buffer = text::Buffer::new(ReplicaId::LOCAL, buffer_id, "");
 645                    Buffer::build(
 646                        text_buffer,
 647                        Some(Arc::new(File {
 648                            worktree,
 649                            path,
 650                            disk_state: DiskState::New,
 651                            entry_id: None,
 652                            is_local: true,
 653                            is_private: false,
 654                        })),
 655                        Capability::ReadWrite,
 656                    )
 657                })?,
 658                Err(e) => return Err(e),
 659            };
 660            this.update(cx, |this, cx| {
 661                this.add_buffer(buffer.clone(), cx)?;
 662                let buffer_id = buffer.read(cx).remote_id();
 663                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 664                    this.path_to_buffer_id.insert(
 665                        ProjectPath {
 666                            worktree_id: file.worktree_id(cx),
 667                            path: file.path.clone(),
 668                        },
 669                        buffer_id,
 670                    );
 671                    let this = this.as_local_mut().unwrap();
 672                    if let Some(entry_id) = file.entry_id {
 673                        this.local_buffer_ids_by_entry_id
 674                            .insert(entry_id, buffer_id);
 675                    }
 676                }
 677
 678                anyhow::Ok(())
 679            })??;
 680
 681            Ok(buffer)
 682        })
 683    }
 684
 685    fn create_buffer(
 686        &self,
 687        project_searchable: bool,
 688        cx: &mut Context<BufferStore>,
 689    ) -> Task<Result<Entity<Buffer>>> {
 690        cx.spawn(async move |buffer_store, cx| {
 691            let buffer =
 692                cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
 693            buffer_store.update(cx, |buffer_store, cx| {
 694                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 695                if !project_searchable {
 696                    buffer_store
 697                        .non_searchable_buffers
 698                        .insert(buffer.read(cx).remote_id());
 699                }
 700            })?;
 701            Ok(buffer)
 702        })
 703    }
 704
 705    fn reload_buffers(
 706        &self,
 707        buffers: HashSet<Entity<Buffer>>,
 708        push_to_history: bool,
 709        cx: &mut Context<BufferStore>,
 710    ) -> Task<Result<ProjectTransaction>> {
 711        cx.spawn(async move |_, cx| {
 712            let mut project_transaction = ProjectTransaction::default();
 713            for buffer in buffers {
 714                let transaction = buffer.update(cx, |buffer, cx| buffer.reload(cx))?.await?;
 715                buffer.update(cx, |buffer, cx| {
 716                    if let Some(transaction) = transaction {
 717                        if !push_to_history {
 718                            buffer.forget_transaction(transaction.id);
 719                        }
 720                        project_transaction.0.insert(cx.entity(), transaction);
 721                    }
 722                })?;
 723            }
 724
 725            Ok(project_transaction)
 726        })
 727    }
 728}
 729
 730impl BufferStore {
 731    pub fn init(client: &AnyProtoClient) {
 732        client.add_entity_message_handler(Self::handle_buffer_reloaded);
 733        client.add_entity_message_handler(Self::handle_buffer_saved);
 734        client.add_entity_message_handler(Self::handle_update_buffer_file);
 735        client.add_entity_request_handler(Self::handle_save_buffer);
 736        client.add_entity_request_handler(Self::handle_reload_buffers);
 737    }
 738
 739    /// Creates a buffer store, optionally retaining its buffers.
 740    pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
 741        Self {
 742            state: BufferStoreState::Local(LocalBufferStore {
 743                local_buffer_ids_by_entry_id: Default::default(),
 744                worktree_store: worktree_store.clone(),
 745                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
 746                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 747                        let this = this.as_local_mut().unwrap();
 748                        this.subscribe_to_worktree(worktree, cx);
 749                    }
 750                }),
 751            }),
 752            downstream_client: None,
 753            opened_buffers: Default::default(),
 754            path_to_buffer_id: Default::default(),
 755            shared_buffers: Default::default(),
 756            loading_buffers: Default::default(),
 757            non_searchable_buffers: Default::default(),
 758            worktree_store,
 759        }
 760    }
 761
 762    pub fn remote(
 763        worktree_store: Entity<WorktreeStore>,
 764        upstream_client: AnyProtoClient,
 765        remote_id: u64,
 766        _cx: &mut Context<Self>,
 767    ) -> Self {
 768        Self {
 769            state: BufferStoreState::Remote(RemoteBufferStore {
 770                shared_with_me: Default::default(),
 771                loading_remote_buffers_by_id: Default::default(),
 772                remote_buffer_listeners: Default::default(),
 773                project_id: remote_id,
 774                upstream_client,
 775                worktree_store: worktree_store.clone(),
 776            }),
 777            downstream_client: None,
 778            opened_buffers: Default::default(),
 779            path_to_buffer_id: Default::default(),
 780            loading_buffers: Default::default(),
 781            shared_buffers: Default::default(),
 782            non_searchable_buffers: Default::default(),
 783            worktree_store,
 784        }
 785    }
 786
 787    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
 788        match &mut self.state {
 789            BufferStoreState::Local(state) => Some(state),
 790            _ => None,
 791        }
 792    }
 793
 794    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
 795        match &mut self.state {
 796            BufferStoreState::Remote(state) => Some(state),
 797            _ => None,
 798        }
 799    }
 800
 801    fn as_remote(&self) -> Option<&RemoteBufferStore> {
 802        match &self.state {
 803            BufferStoreState::Remote(state) => Some(state),
 804            _ => None,
 805        }
 806    }
 807
 808    pub fn open_buffer(
 809        &mut self,
 810        project_path: ProjectPath,
 811        cx: &mut Context<Self>,
 812    ) -> Task<Result<Entity<Buffer>>> {
 813        if let Some(buffer) = self.get_by_path(&project_path) {
 814            cx.emit(BufferStoreEvent::BufferOpened {
 815                buffer: buffer.clone(),
 816                project_path,
 817            });
 818
 819            return Task::ready(Ok(buffer));
 820        }
 821
 822        let task = match self.loading_buffers.entry(project_path.clone()) {
 823            hash_map::Entry::Occupied(e) => e.get().clone(),
 824            hash_map::Entry::Vacant(entry) => {
 825                let path = project_path.path.clone();
 826                let Some(worktree) = self
 827                    .worktree_store
 828                    .read(cx)
 829                    .worktree_for_id(project_path.worktree_id, cx)
 830                else {
 831                    return Task::ready(Err(anyhow!("no such worktree")));
 832                };
 833                let load_buffer = match &self.state {
 834                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
 835                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
 836                };
 837
 838                entry
 839                    .insert(
 840                        // todo(lw): hot foreground spawn
 841                        cx.spawn(async move |this, cx| {
 842                            let load_result = load_buffer.await;
 843                            this.update(cx, |this, cx| {
 844                                // Record the fact that the buffer is no longer loading.
 845                                this.loading_buffers.remove(&project_path);
 846
 847                                let buffer = load_result.map_err(Arc::new)?;
 848                                cx.emit(BufferStoreEvent::BufferOpened {
 849                                    buffer: buffer.clone(),
 850                                    project_path,
 851                                });
 852
 853                                Ok(buffer)
 854                            })?
 855                        })
 856                        .shared(),
 857                    )
 858                    .clone()
 859            }
 860        };
 861
 862        cx.background_spawn(async move {
 863            task.await.map_err(|e| {
 864                if e.error_code() != ErrorCode::Internal {
 865                    anyhow!(e.error_code())
 866                } else {
 867                    anyhow!("{e}")
 868                }
 869            })
 870        })
 871    }
 872
 873    pub fn create_buffer(
 874        &mut self,
 875        project_searchable: bool,
 876        cx: &mut Context<Self>,
 877    ) -> Task<Result<Entity<Buffer>>> {
 878        match &self.state {
 879            BufferStoreState::Local(this) => this.create_buffer(project_searchable, cx),
 880            BufferStoreState::Remote(this) => this.create_buffer(project_searchable, cx),
 881        }
 882    }
 883
 884    pub fn save_buffer(
 885        &mut self,
 886        buffer: Entity<Buffer>,
 887        cx: &mut Context<Self>,
 888    ) -> Task<Result<()>> {
 889        match &mut self.state {
 890            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
 891            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer, None, cx),
 892        }
 893    }
 894
 895    pub fn save_buffer_as(
 896        &mut self,
 897        buffer: Entity<Buffer>,
 898        path: ProjectPath,
 899        cx: &mut Context<Self>,
 900    ) -> Task<Result<()>> {
 901        let old_file = buffer.read(cx).file().cloned();
 902        let task = match &self.state {
 903            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
 904            BufferStoreState::Remote(this) => {
 905                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
 906            }
 907        };
 908        cx.spawn(async move |this, cx| {
 909            task.await?;
 910            this.update(cx, |this, cx| {
 911                old_file.clone().and_then(|file| {
 912                    this.path_to_buffer_id.remove(&ProjectPath {
 913                        worktree_id: file.worktree_id(cx),
 914                        path: file.path().clone(),
 915                    })
 916                });
 917
 918                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 919            })
 920        })
 921    }
 922
 923    fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
 924        let buffer = buffer_entity.read(cx);
 925        let remote_id = buffer.remote_id();
 926        let path = File::from_dyn(buffer.file()).map(|file| ProjectPath {
 927            path: file.path.clone(),
 928            worktree_id: file.worktree_id(cx),
 929        });
 930        let is_remote = buffer.replica_id().is_remote();
 931        let open_buffer = OpenBuffer::Complete {
 932            buffer: buffer_entity.downgrade(),
 933        };
 934
 935        let handle = cx.entity().downgrade();
 936        buffer_entity.update(cx, move |_, cx| {
 937            cx.on_release(move |buffer, cx| {
 938                handle
 939                    .update(cx, |_, cx| {
 940                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
 941                    })
 942                    .ok();
 943            })
 944            .detach()
 945        });
 946        let _expect_path_to_exist;
 947        match self.opened_buffers.entry(remote_id) {
 948            hash_map::Entry::Vacant(entry) => {
 949                entry.insert(open_buffer);
 950                _expect_path_to_exist = false;
 951            }
 952            hash_map::Entry::Occupied(mut entry) => {
 953                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 954                    buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
 955                } else if entry.get().upgrade().is_some() {
 956                    if is_remote {
 957                        return Ok(());
 958                    } else {
 959                        debug_panic!("buffer {remote_id} was already registered");
 960                        anyhow::bail!("buffer {remote_id} was already registered");
 961                    }
 962                }
 963                entry.insert(open_buffer);
 964                _expect_path_to_exist = true;
 965            }
 966        }
 967
 968        if let Some(path) = path {
 969            self.path_to_buffer_id.insert(path, remote_id);
 970        }
 971
 972        cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
 973        cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
 974        Ok(())
 975    }
 976
 977    pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
 978        self.opened_buffers
 979            .values()
 980            .filter_map(|buffer| buffer.upgrade())
 981    }
 982
 983    pub(crate) fn is_searchable(&self, id: &BufferId) -> bool {
 984        !self.non_searchable_buffers.contains(&id)
 985    }
 986
 987    pub fn loading_buffers(
 988        &self,
 989    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
 990        self.loading_buffers.iter().map(|(path, task)| {
 991            let task = task.clone();
 992            (path, async move {
 993                task.await.map_err(|e| {
 994                    if e.error_code() != ErrorCode::Internal {
 995                        anyhow!(e.error_code())
 996                    } else {
 997                        anyhow!("{e}")
 998                    }
 999                })
1000            })
1001        })
1002    }
1003
1004    pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
1005        self.path_to_buffer_id.get(project_path)
1006    }
1007
1008    pub fn get_by_path(&self, path: &ProjectPath) -> Option<Entity<Buffer>> {
1009        self.path_to_buffer_id
1010            .get(path)
1011            .and_then(|buffer_id| self.get(*buffer_id))
1012    }
1013
1014    pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1015        self.opened_buffers.get(&buffer_id)?.upgrade()
1016    }
1017
1018    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
1019        self.get(buffer_id)
1020            .with_context(|| format!("unknown buffer id {buffer_id}"))
1021    }
1022
1023    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
1024        self.get(buffer_id).or_else(|| {
1025            self.as_remote()
1026                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
1027        })
1028    }
1029
1030    pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
1031        let buffers = self
1032            .buffers()
1033            .map(|buffer| {
1034                let buffer = buffer.read(cx);
1035                proto::BufferVersion {
1036                    id: buffer.remote_id().into(),
1037                    version: language::proto::serialize_version(&buffer.version),
1038                }
1039            })
1040            .collect();
1041        let incomplete_buffer_ids = self
1042            .as_remote()
1043            .map(|remote| remote.incomplete_buffer_ids())
1044            .unwrap_or_default();
1045        (buffers, incomplete_buffer_ids)
1046    }
1047
1048    pub fn disconnected_from_host(&mut self, cx: &mut App) {
1049        for open_buffer in self.opened_buffers.values_mut() {
1050            if let Some(buffer) = open_buffer.upgrade() {
1051                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
1052            }
1053        }
1054
1055        for buffer in self.buffers() {
1056            buffer.update(cx, |buffer, cx| {
1057                buffer.set_capability(Capability::ReadOnly, cx)
1058            });
1059        }
1060
1061        if let Some(remote) = self.as_remote_mut() {
1062            // Wake up all futures currently waiting on a buffer to get opened,
1063            // to give them a chance to fail now that we've disconnected.
1064            remote.remote_buffer_listeners.clear()
1065        }
1066    }
1067
1068    pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1069        self.downstream_client = Some((downstream_client, remote_id));
1070    }
1071
1072    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1073        self.downstream_client.take();
1074        self.forget_shared_buffers();
1075    }
1076
1077    pub fn discard_incomplete(&mut self) {
1078        self.opened_buffers
1079            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1080    }
1081
1082    fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1083        let file = File::from_dyn(buffer.read(cx).file())?;
1084
1085        let remote_id = buffer.read(cx).remote_id();
1086        if let Some(entry_id) = file.entry_id {
1087            if let Some(local) = self.as_local_mut() {
1088                match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1089                    Some(_) => {
1090                        return None;
1091                    }
1092                    None => {
1093                        local
1094                            .local_buffer_ids_by_entry_id
1095                            .insert(entry_id, remote_id);
1096                    }
1097                }
1098            }
1099            self.path_to_buffer_id.insert(
1100                ProjectPath {
1101                    worktree_id: file.worktree_id(cx),
1102                    path: file.path.clone(),
1103                },
1104                remote_id,
1105            );
1106        };
1107
1108        Some(())
1109    }
1110
1111    fn on_buffer_event(
1112        &mut self,
1113        buffer: Entity<Buffer>,
1114        event: &BufferEvent,
1115        cx: &mut Context<Self>,
1116    ) {
1117        match event {
1118            BufferEvent::FileHandleChanged => {
1119                self.buffer_changed_file(buffer, cx);
1120            }
1121            BufferEvent::Reloaded => {
1122                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1123                    return;
1124                };
1125                let buffer = buffer.read(cx);
1126                downstream_client
1127                    .send(proto::BufferReloaded {
1128                        project_id: *project_id,
1129                        buffer_id: buffer.remote_id().to_proto(),
1130                        version: serialize_version(&buffer.version()),
1131                        mtime: buffer.saved_mtime().map(|t| t.into()),
1132                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1133                    })
1134                    .log_err();
1135            }
1136            BufferEvent::LanguageChanged(_) => {}
1137            _ => {}
1138        }
1139    }
1140
1141    pub async fn handle_update_buffer(
1142        this: Entity<Self>,
1143        envelope: TypedEnvelope<proto::UpdateBuffer>,
1144        mut cx: AsyncApp,
1145    ) -> Result<proto::Ack> {
1146        let payload = envelope.payload;
1147        let buffer_id = BufferId::new(payload.buffer_id)?;
1148        let ops = payload
1149            .operations
1150            .into_iter()
1151            .map(language::proto::deserialize_operation)
1152            .collect::<Result<Vec<_>, _>>()?;
1153        this.update(&mut cx, |this, cx| {
1154            match this.opened_buffers.entry(buffer_id) {
1155                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1156                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1157                    OpenBuffer::Complete { buffer, .. } => {
1158                        if let Some(buffer) = buffer.upgrade() {
1159                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1160                        }
1161                    }
1162                },
1163                hash_map::Entry::Vacant(e) => {
1164                    e.insert(OpenBuffer::Operations(ops));
1165                }
1166            }
1167            Ok(proto::Ack {})
1168        })?
1169    }
1170
1171    pub fn register_shared_lsp_handle(
1172        &mut self,
1173        peer_id: proto::PeerId,
1174        buffer_id: BufferId,
1175        handle: OpenLspBufferHandle,
1176    ) {
1177        if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id)
1178            && let Some(buffer) = shared_buffers.get_mut(&buffer_id)
1179        {
1180            buffer.lsp_handle = Some(handle);
1181            return;
1182        }
1183        debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1184    }
1185
1186    pub fn handle_synchronize_buffers(
1187        &mut self,
1188        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1189        cx: &mut Context<Self>,
1190        client: Arc<Client>,
1191    ) -> Result<proto::SynchronizeBuffersResponse> {
1192        let project_id = envelope.payload.project_id;
1193        let mut response = proto::SynchronizeBuffersResponse {
1194            buffers: Default::default(),
1195        };
1196        let Some(guest_id) = envelope.original_sender_id else {
1197            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1198        };
1199
1200        self.shared_buffers.entry(guest_id).or_default().clear();
1201        for buffer in envelope.payload.buffers {
1202            let buffer_id = BufferId::new(buffer.id)?;
1203            let remote_version = language::proto::deserialize_version(&buffer.version);
1204            if let Some(buffer) = self.get(buffer_id) {
1205                self.shared_buffers
1206                    .entry(guest_id)
1207                    .or_default()
1208                    .entry(buffer_id)
1209                    .or_insert_with(|| SharedBuffer {
1210                        buffer: buffer.clone(),
1211                        lsp_handle: None,
1212                    });
1213
1214                let buffer = buffer.read(cx);
1215                response.buffers.push(proto::BufferVersion {
1216                    id: buffer_id.into(),
1217                    version: language::proto::serialize_version(&buffer.version),
1218                });
1219
1220                let operations = buffer.serialize_ops(Some(remote_version), cx);
1221                let client = client.clone();
1222                if let Some(file) = buffer.file() {
1223                    client
1224                        .send(proto::UpdateBufferFile {
1225                            project_id,
1226                            buffer_id: buffer_id.into(),
1227                            file: Some(file.to_proto(cx)),
1228                        })
1229                        .log_err();
1230                }
1231
1232                // TODO(max): do something
1233                // client
1234                //     .send(proto::UpdateStagedText {
1235                //         project_id,
1236                //         buffer_id: buffer_id.into(),
1237                //         diff_base: buffer.diff_base().map(ToString::to_string),
1238                //     })
1239                //     .log_err();
1240
1241                client
1242                    .send(proto::BufferReloaded {
1243                        project_id,
1244                        buffer_id: buffer_id.into(),
1245                        version: language::proto::serialize_version(buffer.saved_version()),
1246                        mtime: buffer.saved_mtime().map(|time| time.into()),
1247                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1248                            as i32,
1249                    })
1250                    .log_err();
1251
1252                cx.background_spawn(
1253                    async move {
1254                        let operations = operations.await;
1255                        for chunk in split_operations(operations) {
1256                            client
1257                                .request(proto::UpdateBuffer {
1258                                    project_id,
1259                                    buffer_id: buffer_id.into(),
1260                                    operations: chunk,
1261                                })
1262                                .await?;
1263                        }
1264                        anyhow::Ok(())
1265                    }
1266                    .log_err(),
1267                )
1268                .detach();
1269            }
1270        }
1271        Ok(response)
1272    }
1273
1274    pub fn handle_create_buffer_for_peer(
1275        &mut self,
1276        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1277        replica_id: ReplicaId,
1278        capability: Capability,
1279        cx: &mut Context<Self>,
1280    ) -> Result<()> {
1281        let remote = self
1282            .as_remote_mut()
1283            .context("buffer store is not a remote")?;
1284
1285        if let Some(buffer) =
1286            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1287        {
1288            self.add_buffer(buffer, cx)?;
1289        }
1290
1291        Ok(())
1292    }
1293
1294    pub async fn handle_update_buffer_file(
1295        this: Entity<Self>,
1296        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1297        mut cx: AsyncApp,
1298    ) -> Result<()> {
1299        let buffer_id = envelope.payload.buffer_id;
1300        let buffer_id = BufferId::new(buffer_id)?;
1301
1302        this.update(&mut cx, |this, cx| {
1303            let payload = envelope.payload.clone();
1304            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1305                let file = payload.file.context("invalid file")?;
1306                let worktree = this
1307                    .worktree_store
1308                    .read(cx)
1309                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1310                    .context("no such worktree")?;
1311                let file = File::from_proto(file, worktree, cx)?;
1312                let old_file = buffer.update(cx, |buffer, cx| {
1313                    let old_file = buffer.file().cloned();
1314                    let new_path = file.path.clone();
1315
1316                    buffer.file_updated(Arc::new(file), cx);
1317                    if old_file.as_ref().is_none_or(|old| *old.path() != new_path) {
1318                        Some(old_file)
1319                    } else {
1320                        None
1321                    }
1322                });
1323                if let Some(old_file) = old_file {
1324                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1325                }
1326            }
1327            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1328                downstream_client
1329                    .send(proto::UpdateBufferFile {
1330                        project_id: *project_id,
1331                        buffer_id: buffer_id.into(),
1332                        file: envelope.payload.file,
1333                    })
1334                    .log_err();
1335            }
1336            Ok(())
1337        })?
1338    }
1339
1340    pub async fn handle_save_buffer(
1341        this: Entity<Self>,
1342        envelope: TypedEnvelope<proto::SaveBuffer>,
1343        mut cx: AsyncApp,
1344    ) -> Result<proto::BufferSaved> {
1345        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1346        let (buffer, project_id) = this.read_with(&cx, |this, _| {
1347            anyhow::Ok((
1348                this.get_existing(buffer_id)?,
1349                this.downstream_client
1350                    .as_ref()
1351                    .map(|(_, project_id)| *project_id)
1352                    .context("project is not shared")?,
1353            ))
1354        })??;
1355        buffer
1356            .update(&mut cx, |buffer, _| {
1357                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1358            })?
1359            .await?;
1360        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1361
1362        if let Some(new_path) = envelope.payload.new_path
1363            && let Some(new_path) = ProjectPath::from_proto(new_path)
1364        {
1365            this.update(&mut cx, |this, cx| {
1366                this.save_buffer_as(buffer.clone(), new_path, cx)
1367            })?
1368            .await?;
1369        } else {
1370            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1371                .await?;
1372        }
1373
1374        buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
1375            project_id,
1376            buffer_id: buffer_id.into(),
1377            version: serialize_version(buffer.saved_version()),
1378            mtime: buffer.saved_mtime().map(|time| time.into()),
1379        })
1380    }
1381
1382    pub async fn handle_close_buffer(
1383        this: Entity<Self>,
1384        envelope: TypedEnvelope<proto::CloseBuffer>,
1385        mut cx: AsyncApp,
1386    ) -> Result<()> {
1387        let peer_id = envelope.sender_id;
1388        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1389        this.update(&mut cx, |this, cx| {
1390            if let Some(shared) = this.shared_buffers.get_mut(&peer_id)
1391                && shared.remove(&buffer_id).is_some()
1392            {
1393                cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1394                if shared.is_empty() {
1395                    this.shared_buffers.remove(&peer_id);
1396                }
1397                return;
1398            }
1399            debug_panic!(
1400                "peer_id {} closed buffer_id {} which was either not open or already closed",
1401                peer_id,
1402                buffer_id
1403            )
1404        })
1405    }
1406
1407    pub async fn handle_buffer_saved(
1408        this: Entity<Self>,
1409        envelope: TypedEnvelope<proto::BufferSaved>,
1410        mut cx: AsyncApp,
1411    ) -> Result<()> {
1412        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1413        let version = deserialize_version(&envelope.payload.version);
1414        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1415        this.update(&mut cx, move |this, cx| {
1416            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1417                buffer.update(cx, |buffer, cx| {
1418                    buffer.did_save(version, mtime, cx);
1419                });
1420            }
1421
1422            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1423                downstream_client
1424                    .send(proto::BufferSaved {
1425                        project_id: *project_id,
1426                        buffer_id: buffer_id.into(),
1427                        mtime: envelope.payload.mtime,
1428                        version: envelope.payload.version,
1429                    })
1430                    .log_err();
1431            }
1432        })
1433    }
1434
1435    pub async fn handle_buffer_reloaded(
1436        this: Entity<Self>,
1437        envelope: TypedEnvelope<proto::BufferReloaded>,
1438        mut cx: AsyncApp,
1439    ) -> Result<()> {
1440        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1441        let version = deserialize_version(&envelope.payload.version);
1442        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1443        let line_ending = deserialize_line_ending(
1444            proto::LineEnding::from_i32(envelope.payload.line_ending)
1445                .context("missing line ending")?,
1446        );
1447        this.update(&mut cx, |this, cx| {
1448            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1449                buffer.update(cx, |buffer, cx| {
1450                    buffer.did_reload(version, line_ending, mtime, cx);
1451                });
1452            }
1453
1454            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1455                downstream_client
1456                    .send(proto::BufferReloaded {
1457                        project_id: *project_id,
1458                        buffer_id: buffer_id.into(),
1459                        mtime: envelope.payload.mtime,
1460                        version: envelope.payload.version,
1461                        line_ending: envelope.payload.line_ending,
1462                    })
1463                    .log_err();
1464            }
1465        })
1466    }
1467
1468    pub fn reload_buffers(
1469        &self,
1470        buffers: HashSet<Entity<Buffer>>,
1471        push_to_history: bool,
1472        cx: &mut Context<Self>,
1473    ) -> Task<Result<ProjectTransaction>> {
1474        if buffers.is_empty() {
1475            return Task::ready(Ok(ProjectTransaction::default()));
1476        }
1477        match &self.state {
1478            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1479            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1480        }
1481    }
1482
1483    async fn handle_reload_buffers(
1484        this: Entity<Self>,
1485        envelope: TypedEnvelope<proto::ReloadBuffers>,
1486        mut cx: AsyncApp,
1487    ) -> Result<proto::ReloadBuffersResponse> {
1488        let sender_id = envelope.original_sender_id().unwrap_or_default();
1489        let reload = this.update(&mut cx, |this, cx| {
1490            let mut buffers = HashSet::default();
1491            for buffer_id in &envelope.payload.buffer_ids {
1492                let buffer_id = BufferId::new(*buffer_id)?;
1493                buffers.insert(this.get_existing(buffer_id)?);
1494            }
1495            anyhow::Ok(this.reload_buffers(buffers, false, cx))
1496        })??;
1497
1498        let project_transaction = reload.await?;
1499        let project_transaction = this.update(&mut cx, |this, cx| {
1500            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1501        })?;
1502        Ok(proto::ReloadBuffersResponse {
1503            transaction: Some(project_transaction),
1504        })
1505    }
1506
1507    pub fn create_buffer_for_peer(
1508        &mut self,
1509        buffer: &Entity<Buffer>,
1510        peer_id: proto::PeerId,
1511        cx: &mut Context<Self>,
1512    ) -> Task<Result<()>> {
1513        let buffer_id = buffer.read(cx).remote_id();
1514        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1515        if shared_buffers.contains_key(&buffer_id) {
1516            return Task::ready(Ok(()));
1517        }
1518        shared_buffers.insert(
1519            buffer_id,
1520            SharedBuffer {
1521                buffer: buffer.clone(),
1522                lsp_handle: None,
1523            },
1524        );
1525
1526        let Some((client, project_id)) = self.downstream_client.clone() else {
1527            return Task::ready(Ok(()));
1528        };
1529
1530        cx.spawn(async move |this, cx| {
1531            let Some(buffer) = this.read_with(cx, |this, _| this.get(buffer_id))? else {
1532                return anyhow::Ok(());
1533            };
1534
1535            let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
1536            let operations = operations.await;
1537            let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
1538
1539            let initial_state = proto::CreateBufferForPeer {
1540                project_id,
1541                peer_id: Some(peer_id),
1542                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1543            };
1544
1545            if client.send(initial_state).log_err().is_some() {
1546                let client = client.clone();
1547                cx.background_spawn(async move {
1548                    let mut chunks = split_operations(operations).peekable();
1549                    while let Some(chunk) = chunks.next() {
1550                        let is_last = chunks.peek().is_none();
1551                        client.send(proto::CreateBufferForPeer {
1552                            project_id,
1553                            peer_id: Some(peer_id),
1554                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1555                                proto::BufferChunk {
1556                                    buffer_id: buffer_id.into(),
1557                                    operations: chunk,
1558                                    is_last,
1559                                },
1560                            )),
1561                        })?;
1562                    }
1563                    anyhow::Ok(())
1564                })
1565                .await
1566                .log_err();
1567            }
1568            Ok(())
1569        })
1570    }
1571
1572    pub fn forget_shared_buffers(&mut self) {
1573        self.shared_buffers.clear();
1574    }
1575
1576    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1577        self.shared_buffers.remove(peer_id);
1578    }
1579
1580    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1581        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1582            self.shared_buffers.insert(new_peer_id, buffers);
1583        }
1584    }
1585
1586    pub fn has_shared_buffers(&self) -> bool {
1587        !self.shared_buffers.is_empty()
1588    }
1589
1590    pub fn create_local_buffer(
1591        &mut self,
1592        text: &str,
1593        language: Option<Arc<Language>>,
1594        project_searchable: bool,
1595        cx: &mut Context<Self>,
1596    ) -> Entity<Buffer> {
1597        let buffer = cx.new(|cx| {
1598            Buffer::local(text, cx)
1599                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1600        });
1601
1602        self.add_buffer(buffer.clone(), cx).log_err();
1603        let buffer_id = buffer.read(cx).remote_id();
1604        if !project_searchable {
1605            self.non_searchable_buffers.insert(buffer_id);
1606        }
1607
1608        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1609            self.path_to_buffer_id.insert(
1610                ProjectPath {
1611                    worktree_id: file.worktree_id(cx),
1612                    path: file.path.clone(),
1613                },
1614                buffer_id,
1615            );
1616            let this = self
1617                .as_local_mut()
1618                .expect("local-only method called in a non-local context");
1619            if let Some(entry_id) = file.entry_id {
1620                this.local_buffer_ids_by_entry_id
1621                    .insert(entry_id, buffer_id);
1622            }
1623        }
1624        buffer
1625    }
1626
1627    pub fn deserialize_project_transaction(
1628        &mut self,
1629        message: proto::ProjectTransaction,
1630        push_to_history: bool,
1631        cx: &mut Context<Self>,
1632    ) -> Task<Result<ProjectTransaction>> {
1633        if let Some(this) = self.as_remote_mut() {
1634            this.deserialize_project_transaction(message, push_to_history, cx)
1635        } else {
1636            debug_panic!("not a remote buffer store");
1637            Task::ready(Err(anyhow!("not a remote buffer store")))
1638        }
1639    }
1640
1641    pub fn wait_for_remote_buffer(
1642        &mut self,
1643        id: BufferId,
1644        cx: &mut Context<BufferStore>,
1645    ) -> Task<Result<Entity<Buffer>>> {
1646        if let Some(this) = self.as_remote_mut() {
1647            this.wait_for_remote_buffer(id, cx)
1648        } else {
1649            debug_panic!("not a remote buffer store");
1650            Task::ready(Err(anyhow!("not a remote buffer store")))
1651        }
1652    }
1653
1654    pub fn serialize_project_transaction_for_peer(
1655        &mut self,
1656        project_transaction: ProjectTransaction,
1657        peer_id: proto::PeerId,
1658        cx: &mut Context<Self>,
1659    ) -> proto::ProjectTransaction {
1660        let mut serialized_transaction = proto::ProjectTransaction {
1661            buffer_ids: Default::default(),
1662            transactions: Default::default(),
1663        };
1664        for (buffer, transaction) in project_transaction.0 {
1665            self.create_buffer_for_peer(&buffer, peer_id, cx)
1666                .detach_and_log_err(cx);
1667            serialized_transaction
1668                .buffer_ids
1669                .push(buffer.read(cx).remote_id().into());
1670            serialized_transaction
1671                .transactions
1672                .push(language::proto::serialize_transaction(&transaction));
1673        }
1674        serialized_transaction
1675    }
1676}
1677
1678impl OpenBuffer {
1679    fn upgrade(&self) -> Option<Entity<Buffer>> {
1680        match self {
1681            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1682            OpenBuffer::Operations(_) => None,
1683        }
1684    }
1685}
1686
1687fn is_not_found_error(error: &anyhow::Error) -> bool {
1688    error
1689        .root_cause()
1690        .downcast_ref::<io::Error>()
1691        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1692}