buffer_store.rs

   1use crate::{
   2    ProjectItem as _, ProjectPath,
   3    lsp_store::OpenLspBufferHandle,
   4    search::SearchQuery,
   5    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   6};
   7use anyhow::{Context as _, Result, anyhow};
   8use client::Client;
   9use collections::{HashMap, HashSet, hash_map};
  10use fs::Fs;
  11use futures::{Future, FutureExt as _, StreamExt, channel::oneshot, future::Shared};
  12use gpui::{
  13    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity,
  14};
  15use language::{
  16    Buffer, BufferEvent, Capability, DiskState, File as _, Language, Operation,
  17    proto::{
  18        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  19        split_operations,
  20    },
  21};
  22use rpc::{
  23    AnyProtoClient, ErrorExt as _, TypedEnvelope,
  24    proto::{self, ToProto},
  25};
  26use smol::channel::Receiver;
  27use std::{io, path::Path, pin::pin, sync::Arc, time::Instant};
  28use text::BufferId;
  29use util::{ResultExt as _, TryFutureExt, debug_panic, maybe};
  30use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId};
  31
  32/// A set of open buffers.
  33pub struct BufferStore {
  34    state: BufferStoreState,
  35    #[allow(clippy::type_complexity)]
  36    loading_buffers: HashMap<ProjectPath, Shared<Task<Result<Entity<Buffer>, Arc<anyhow::Error>>>>>,
  37    worktree_store: Entity<WorktreeStore>,
  38    opened_buffers: HashMap<BufferId, OpenBuffer>,
  39    path_to_buffer_id: HashMap<ProjectPath, BufferId>,
  40    downstream_client: Option<(AnyProtoClient, u64)>,
  41    shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
  42    non_searchable_buffers: HashSet<BufferId>,
  43}
  44
  45#[derive(Hash, Eq, PartialEq, Clone)]
  46struct SharedBuffer {
  47    buffer: Entity<Buffer>,
  48    lsp_handle: Option<OpenLspBufferHandle>,
  49}
  50
  51enum BufferStoreState {
  52    Local(LocalBufferStore),
  53    Remote(RemoteBufferStore),
  54}
  55
  56struct RemoteBufferStore {
  57    shared_with_me: HashSet<Entity<Buffer>>,
  58    upstream_client: AnyProtoClient,
  59    project_id: u64,
  60    loading_remote_buffers_by_id: HashMap<BufferId, Entity<Buffer>>,
  61    remote_buffer_listeners:
  62        HashMap<BufferId, Vec<oneshot::Sender<anyhow::Result<Entity<Buffer>>>>>,
  63    worktree_store: Entity<WorktreeStore>,
  64}
  65
  66struct LocalBufferStore {
  67    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  68    worktree_store: Entity<WorktreeStore>,
  69    _subscription: Subscription,
  70}
  71
  72enum OpenBuffer {
  73    Complete { buffer: WeakEntity<Buffer> },
  74    Operations(Vec<Operation>),
  75}
  76
  77pub enum BufferStoreEvent {
  78    BufferAdded(Entity<Buffer>),
  79    BufferOpened {
  80        buffer: Entity<Buffer>,
  81        project_path: ProjectPath,
  82    },
  83    SharedBufferClosed(proto::PeerId, BufferId),
  84    BufferDropped(BufferId),
  85    BufferChangedFilePath {
  86        buffer: Entity<Buffer>,
  87        old_file: Option<Arc<dyn language::File>>,
  88    },
  89}
  90
  91#[derive(Default, Debug)]
  92pub struct ProjectTransaction(pub HashMap<Entity<Buffer>, language::Transaction>);
  93
  94impl EventEmitter<BufferStoreEvent> for BufferStore {}
  95
  96impl RemoteBufferStore {
  97    pub fn wait_for_remote_buffer(
  98        &mut self,
  99        id: BufferId,
 100        cx: &mut Context<BufferStore>,
 101    ) -> Task<Result<Entity<Buffer>>> {
 102        let (tx, rx) = oneshot::channel();
 103        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 104
 105        cx.spawn(async move |this, cx| {
 106            if let Some(buffer) = this
 107                .read_with(cx, |buffer_store, _| buffer_store.get(id))
 108                .ok()
 109                .flatten()
 110            {
 111                return Ok(buffer);
 112            }
 113
 114            cx.background_spawn(async move { rx.await? }).await
 115        })
 116    }
 117
 118    fn save_remote_buffer(
 119        &self,
 120        buffer_handle: Entity<Buffer>,
 121        new_path: Option<proto::ProjectPath>,
 122        cx: &Context<BufferStore>,
 123    ) -> Task<Result<()>> {
 124        let buffer = buffer_handle.read(cx);
 125        let buffer_id = buffer.remote_id().into();
 126        let version = buffer.version();
 127        let rpc = self.upstream_client.clone();
 128        let project_id = self.project_id;
 129        cx.spawn(async move |_, cx| {
 130            let response = rpc
 131                .request(proto::SaveBuffer {
 132                    project_id,
 133                    buffer_id,
 134                    new_path,
 135                    version: serialize_version(&version),
 136                })
 137                .await?;
 138            let version = deserialize_version(&response.version);
 139            let mtime = response.mtime.map(|mtime| mtime.into());
 140
 141            buffer_handle.update(cx, |buffer, cx| {
 142                buffer.did_save(version.clone(), mtime, cx);
 143            })?;
 144
 145            Ok(())
 146        })
 147    }
 148
 149    pub fn handle_create_buffer_for_peer(
 150        &mut self,
 151        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 152        replica_id: u16,
 153        capability: Capability,
 154        cx: &mut Context<BufferStore>,
 155    ) -> Result<Option<Entity<Buffer>>> {
 156        match envelope.payload.variant.context("missing variant")? {
 157            proto::create_buffer_for_peer::Variant::State(mut state) => {
 158                let buffer_id = BufferId::new(state.id)?;
 159
 160                let buffer_result = maybe!({
 161                    let mut buffer_file = None;
 162                    if let Some(file) = state.file.take() {
 163                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 164                        let worktree = self
 165                            .worktree_store
 166                            .read(cx)
 167                            .worktree_for_id(worktree_id, cx)
 168                            .with_context(|| {
 169                                format!("no worktree found for id {}", file.worktree_id)
 170                            })?;
 171                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 172                            as Arc<dyn language::File>);
 173                    }
 174                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 175                });
 176
 177                match buffer_result {
 178                    Ok(buffer) => {
 179                        let buffer = cx.new(|_| buffer);
 180                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 181                    }
 182                    Err(error) => {
 183                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 184                            for listener in listeners {
 185                                listener.send(Err(anyhow!(error.cloned()))).ok();
 186                            }
 187                        }
 188                    }
 189                }
 190            }
 191            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 192                let buffer_id = BufferId::new(chunk.buffer_id)?;
 193                let buffer = self
 194                    .loading_remote_buffers_by_id
 195                    .get(&buffer_id)
 196                    .cloned()
 197                    .with_context(|| {
 198                        format!(
 199                            "received chunk for buffer {} without initial state",
 200                            chunk.buffer_id
 201                        )
 202                    })?;
 203
 204                let result = maybe!({
 205                    let operations = chunk
 206                        .operations
 207                        .into_iter()
 208                        .map(language::proto::deserialize_operation)
 209                        .collect::<Result<Vec<_>>>()?;
 210                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
 211                    anyhow::Ok(())
 212                });
 213
 214                if let Err(error) = result {
 215                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 216                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 217                        for listener in listeners {
 218                            listener.send(Err(error.cloned())).ok();
 219                        }
 220                    }
 221                } else if chunk.is_last {
 222                    self.loading_remote_buffers_by_id.remove(&buffer_id);
 223                    if self.upstream_client.is_via_collab() {
 224                        // retain buffers sent by peers to avoid races.
 225                        self.shared_with_me.insert(buffer.clone());
 226                    }
 227
 228                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
 229                        for sender in senders {
 230                            sender.send(Ok(buffer.clone())).ok();
 231                        }
 232                    }
 233                    return Ok(Some(buffer));
 234                }
 235            }
 236        }
 237        return Ok(None);
 238    }
 239
 240    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
 241        self.loading_remote_buffers_by_id
 242            .keys()
 243            .copied()
 244            .collect::<Vec<_>>()
 245    }
 246
 247    pub fn deserialize_project_transaction(
 248        &self,
 249        message: proto::ProjectTransaction,
 250        push_to_history: bool,
 251        cx: &mut Context<BufferStore>,
 252    ) -> Task<Result<ProjectTransaction>> {
 253        cx.spawn(async move |this, cx| {
 254            let mut project_transaction = ProjectTransaction::default();
 255            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
 256            {
 257                let buffer_id = BufferId::new(buffer_id)?;
 258                let buffer = this
 259                    .update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
 260                    .await?;
 261                let transaction = language::proto::deserialize_transaction(transaction)?;
 262                project_transaction.0.insert(buffer, transaction);
 263            }
 264
 265            for (buffer, transaction) in &project_transaction.0 {
 266                buffer
 267                    .update(cx, |buffer, _| {
 268                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
 269                    })?
 270                    .await?;
 271
 272                if push_to_history {
 273                    buffer.update(cx, |buffer, _| {
 274                        buffer.push_transaction(transaction.clone(), Instant::now());
 275                        buffer.finalize_last_transaction();
 276                    })?;
 277                }
 278            }
 279
 280            Ok(project_transaction)
 281        })
 282    }
 283
 284    fn open_buffer(
 285        &self,
 286        path: Arc<Path>,
 287        worktree: Entity<Worktree>,
 288        cx: &mut Context<BufferStore>,
 289    ) -> Task<Result<Entity<Buffer>>> {
 290        let worktree_id = worktree.read(cx).id().to_proto();
 291        let project_id = self.project_id;
 292        let client = self.upstream_client.clone();
 293        cx.spawn(async move |this, cx| {
 294            let response = client
 295                .request(proto::OpenBufferByPath {
 296                    project_id,
 297                    worktree_id,
 298                    path: path.to_proto(),
 299                })
 300                .await?;
 301            let buffer_id = BufferId::new(response.buffer_id)?;
 302
 303            let buffer = this
 304                .update(cx, {
 305                    |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
 306                })?
 307                .await?;
 308
 309            Ok(buffer)
 310        })
 311    }
 312
 313    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
 314        let create = self.upstream_client.request(proto::OpenNewBuffer {
 315            project_id: self.project_id,
 316        });
 317        cx.spawn(async move |this, cx| {
 318            let response = create.await?;
 319            let buffer_id = BufferId::new(response.buffer_id)?;
 320
 321            this.update(cx, |this, cx| this.wait_for_remote_buffer(buffer_id, cx))?
 322                .await
 323        })
 324    }
 325
 326    fn reload_buffers(
 327        &self,
 328        buffers: HashSet<Entity<Buffer>>,
 329        push_to_history: bool,
 330        cx: &mut Context<BufferStore>,
 331    ) -> Task<Result<ProjectTransaction>> {
 332        let request = self.upstream_client.request(proto::ReloadBuffers {
 333            project_id: self.project_id,
 334            buffer_ids: buffers
 335                .iter()
 336                .map(|buffer| buffer.read(cx).remote_id().to_proto())
 337                .collect(),
 338        });
 339
 340        cx.spawn(async move |this, cx| {
 341            let response = request.await?.transaction.context("missing transaction")?;
 342            this.update(cx, |this, cx| {
 343                this.deserialize_project_transaction(response, push_to_history, cx)
 344            })?
 345            .await
 346        })
 347    }
 348}
 349
 350impl LocalBufferStore {
 351    fn save_local_buffer(
 352        &self,
 353        buffer_handle: Entity<Buffer>,
 354        worktree: Entity<Worktree>,
 355        path: Arc<Path>,
 356        mut has_changed_file: bool,
 357        cx: &mut Context<BufferStore>,
 358    ) -> Task<Result<()>> {
 359        let buffer = buffer_handle.read(cx);
 360
 361        let text = buffer.as_rope().clone();
 362        let line_ending = buffer.line_ending();
 363        let version = buffer.version();
 364        let buffer_id = buffer.remote_id();
 365        let file = buffer.file().cloned();
 366        if file
 367            .as_ref()
 368            .is_some_and(|file| file.disk_state() == DiskState::New)
 369        {
 370            has_changed_file = true;
 371        }
 372
 373        let save = worktree.update(cx, |worktree, cx| {
 374            worktree.write_file(path.as_ref(), text, line_ending, cx)
 375        });
 376
 377        cx.spawn(async move |this, cx| {
 378            let new_file = save.await?;
 379            let mtime = new_file.disk_state().mtime();
 380            this.update(cx, |this, cx| {
 381                if let Some((downstream_client, project_id)) = this.downstream_client.clone() {
 382                    if has_changed_file {
 383                        downstream_client
 384                            .send(proto::UpdateBufferFile {
 385                                project_id,
 386                                buffer_id: buffer_id.to_proto(),
 387                                file: Some(language::File::to_proto(&*new_file, cx)),
 388                            })
 389                            .log_err();
 390                    }
 391                    downstream_client
 392                        .send(proto::BufferSaved {
 393                            project_id,
 394                            buffer_id: buffer_id.to_proto(),
 395                            version: serialize_version(&version),
 396                            mtime: mtime.map(|time| time.into()),
 397                        })
 398                        .log_err();
 399                }
 400            })?;
 401            buffer_handle.update(cx, |buffer, cx| {
 402                if has_changed_file {
 403                    buffer.file_updated(new_file, cx);
 404                }
 405                buffer.did_save(version.clone(), mtime, cx);
 406            })
 407        })
 408    }
 409
 410    fn subscribe_to_worktree(
 411        &mut self,
 412        worktree: &Entity<Worktree>,
 413        cx: &mut Context<BufferStore>,
 414    ) {
 415        cx.subscribe(worktree, |this, worktree, event, cx| {
 416            if worktree.read(cx).is_local() {
 417                match event {
 418                    worktree::Event::UpdatedEntries(changes) => {
 419                        Self::local_worktree_entries_changed(this, &worktree, changes, cx);
 420                    }
 421                    _ => {}
 422                }
 423            }
 424        })
 425        .detach();
 426    }
 427
 428    fn local_worktree_entries_changed(
 429        this: &mut BufferStore,
 430        worktree_handle: &Entity<Worktree>,
 431        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 432        cx: &mut Context<BufferStore>,
 433    ) {
 434        let snapshot = worktree_handle.read(cx).snapshot();
 435        for (path, entry_id, _) in changes {
 436            Self::local_worktree_entry_changed(
 437                this,
 438                *entry_id,
 439                path,
 440                worktree_handle,
 441                &snapshot,
 442                cx,
 443            );
 444        }
 445    }
 446
 447    fn local_worktree_entry_changed(
 448        this: &mut BufferStore,
 449        entry_id: ProjectEntryId,
 450        path: &Arc<Path>,
 451        worktree: &Entity<worktree::Worktree>,
 452        snapshot: &worktree::Snapshot,
 453        cx: &mut Context<BufferStore>,
 454    ) -> Option<()> {
 455        let project_path = ProjectPath {
 456            worktree_id: snapshot.id(),
 457            path: path.clone(),
 458        };
 459
 460        let buffer_id = this
 461            .as_local_mut()
 462            .and_then(|local| local.local_buffer_ids_by_entry_id.get(&entry_id))
 463            .copied()
 464            .or_else(|| this.path_to_buffer_id.get(&project_path).copied())?;
 465
 466        let buffer = if let Some(buffer) = this.get(buffer_id) {
 467            Some(buffer)
 468        } else {
 469            this.opened_buffers.remove(&buffer_id);
 470            None
 471        };
 472
 473        let buffer = if let Some(buffer) = buffer {
 474            buffer
 475        } else {
 476            this.path_to_buffer_id.remove(&project_path);
 477            let this = this.as_local_mut()?;
 478            this.local_buffer_ids_by_entry_id.remove(&entry_id);
 479            return None;
 480        };
 481
 482        let events = buffer.update(cx, |buffer, cx| {
 483            let file = buffer.file()?;
 484            let old_file = File::from_dyn(Some(file))?;
 485            if old_file.worktree != *worktree {
 486                return None;
 487            }
 488
 489            let snapshot_entry = old_file
 490                .entry_id
 491                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 492                .or_else(|| snapshot.entry_for_path(old_file.path.as_ref()));
 493
 494            let new_file = if let Some(entry) = snapshot_entry {
 495                File {
 496                    disk_state: match entry.mtime {
 497                        Some(mtime) => DiskState::Present { mtime },
 498                        None => old_file.disk_state,
 499                    },
 500                    is_local: true,
 501                    entry_id: Some(entry.id),
 502                    path: entry.path.clone(),
 503                    worktree: worktree.clone(),
 504                    is_private: entry.is_private,
 505                }
 506            } else {
 507                File {
 508                    disk_state: DiskState::Deleted,
 509                    is_local: true,
 510                    entry_id: old_file.entry_id,
 511                    path: old_file.path.clone(),
 512                    worktree: worktree.clone(),
 513                    is_private: old_file.is_private,
 514                }
 515            };
 516
 517            if new_file == *old_file {
 518                return None;
 519            }
 520
 521            let mut events = Vec::new();
 522            if new_file.path != old_file.path {
 523                this.path_to_buffer_id.remove(&ProjectPath {
 524                    path: old_file.path.clone(),
 525                    worktree_id: old_file.worktree_id(cx),
 526                });
 527                this.path_to_buffer_id.insert(
 528                    ProjectPath {
 529                        worktree_id: new_file.worktree_id(cx),
 530                        path: new_file.path.clone(),
 531                    },
 532                    buffer_id,
 533                );
 534                events.push(BufferStoreEvent::BufferChangedFilePath {
 535                    buffer: cx.entity(),
 536                    old_file: buffer.file().cloned(),
 537                });
 538            }
 539            let local = this.as_local_mut()?;
 540            if new_file.entry_id != old_file.entry_id {
 541                if let Some(entry_id) = old_file.entry_id {
 542                    local.local_buffer_ids_by_entry_id.remove(&entry_id);
 543                }
 544                if let Some(entry_id) = new_file.entry_id {
 545                    local
 546                        .local_buffer_ids_by_entry_id
 547                        .insert(entry_id, buffer_id);
 548                }
 549            }
 550
 551            if let Some((client, project_id)) = &this.downstream_client {
 552                client
 553                    .send(proto::UpdateBufferFile {
 554                        project_id: *project_id,
 555                        buffer_id: buffer_id.to_proto(),
 556                        file: Some(new_file.to_proto(cx)),
 557                    })
 558                    .ok();
 559            }
 560
 561            buffer.file_updated(Arc::new(new_file), cx);
 562            Some(events)
 563        })?;
 564
 565        for event in events {
 566            cx.emit(event);
 567        }
 568
 569        None
 570    }
 571
 572    fn save_buffer(
 573        &self,
 574        buffer: Entity<Buffer>,
 575        cx: &mut Context<BufferStore>,
 576    ) -> Task<Result<()>> {
 577        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 578            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 579        };
 580        let worktree = file.worktree.clone();
 581        self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
 582    }
 583
 584    fn save_buffer_as(
 585        &self,
 586        buffer: Entity<Buffer>,
 587        path: ProjectPath,
 588        cx: &mut Context<BufferStore>,
 589    ) -> Task<Result<()>> {
 590        let Some(worktree) = self
 591            .worktree_store
 592            .read(cx)
 593            .worktree_for_id(path.worktree_id, cx)
 594        else {
 595            return Task::ready(Err(anyhow!("no such worktree")));
 596        };
 597        self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
 598    }
 599
 600    fn open_buffer(
 601        &self,
 602        path: Arc<Path>,
 603        worktree: Entity<Worktree>,
 604        cx: &mut Context<BufferStore>,
 605    ) -> Task<Result<Entity<Buffer>>> {
 606        let load_buffer = worktree.update(cx, |worktree, cx| {
 607            let load_file = worktree.load_file(path.as_ref(), cx);
 608            let reservation = cx.reserve_entity();
 609            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 610            cx.spawn(async move |_, cx| {
 611                let loaded = load_file.await?;
 612                let text_buffer = cx
 613                    .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 614                    .await;
 615                cx.insert_entity(reservation, |_| {
 616                    Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
 617                })
 618            })
 619        });
 620
 621        cx.spawn(async move |this, cx| {
 622            let buffer = match load_buffer.await {
 623                Ok(buffer) => Ok(buffer),
 624                Err(error) if is_not_found_error(&error) => cx.new(|cx| {
 625                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 626                    let text_buffer = text::Buffer::new(0, buffer_id, "");
 627                    Buffer::build(
 628                        text_buffer,
 629                        Some(Arc::new(File {
 630                            worktree,
 631                            path,
 632                            disk_state: DiskState::New,
 633                            entry_id: None,
 634                            is_local: true,
 635                            is_private: false,
 636                        })),
 637                        Capability::ReadWrite,
 638                    )
 639                }),
 640                Err(e) => Err(e),
 641            }?;
 642            this.update(cx, |this, cx| {
 643                this.add_buffer(buffer.clone(), cx)?;
 644                let buffer_id = buffer.read(cx).remote_id();
 645                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 646                    this.path_to_buffer_id.insert(
 647                        ProjectPath {
 648                            worktree_id: file.worktree_id(cx),
 649                            path: file.path.clone(),
 650                        },
 651                        buffer_id,
 652                    );
 653                    let this = this.as_local_mut().unwrap();
 654                    if let Some(entry_id) = file.entry_id {
 655                        this.local_buffer_ids_by_entry_id
 656                            .insert(entry_id, buffer_id);
 657                    }
 658                }
 659
 660                anyhow::Ok(())
 661            })??;
 662
 663            Ok(buffer)
 664        })
 665    }
 666
 667    fn create_buffer(&self, cx: &mut Context<BufferStore>) -> Task<Result<Entity<Buffer>>> {
 668        cx.spawn(async move |buffer_store, cx| {
 669            let buffer =
 670                cx.new(|cx| Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx))?;
 671            buffer_store.update(cx, |buffer_store, cx| {
 672                buffer_store.add_buffer(buffer.clone(), cx).log_err();
 673            })?;
 674            Ok(buffer)
 675        })
 676    }
 677
 678    fn reload_buffers(
 679        &self,
 680        buffers: HashSet<Entity<Buffer>>,
 681        push_to_history: bool,
 682        cx: &mut Context<BufferStore>,
 683    ) -> Task<Result<ProjectTransaction>> {
 684        cx.spawn(async move |_, cx| {
 685            let mut project_transaction = ProjectTransaction::default();
 686            for buffer in buffers {
 687                let transaction = buffer.update(cx, |buffer, cx| buffer.reload(cx))?.await?;
 688                buffer.update(cx, |buffer, cx| {
 689                    if let Some(transaction) = transaction {
 690                        if !push_to_history {
 691                            buffer.forget_transaction(transaction.id);
 692                        }
 693                        project_transaction.0.insert(cx.entity(), transaction);
 694                    }
 695                })?;
 696            }
 697
 698            Ok(project_transaction)
 699        })
 700    }
 701}
 702
 703impl BufferStore {
 704    pub fn init(client: &AnyProtoClient) {
 705        client.add_entity_message_handler(Self::handle_buffer_reloaded);
 706        client.add_entity_message_handler(Self::handle_buffer_saved);
 707        client.add_entity_message_handler(Self::handle_update_buffer_file);
 708        client.add_entity_request_handler(Self::handle_save_buffer);
 709        client.add_entity_request_handler(Self::handle_reload_buffers);
 710    }
 711
 712    /// Creates a buffer store, optionally retaining its buffers.
 713    pub fn local(worktree_store: Entity<WorktreeStore>, cx: &mut Context<Self>) -> Self {
 714        Self {
 715            state: BufferStoreState::Local(LocalBufferStore {
 716                local_buffer_ids_by_entry_id: Default::default(),
 717                worktree_store: worktree_store.clone(),
 718                _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| {
 719                    if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 720                        let this = this.as_local_mut().unwrap();
 721                        this.subscribe_to_worktree(worktree, cx);
 722                    }
 723                }),
 724            }),
 725            downstream_client: None,
 726            opened_buffers: Default::default(),
 727            path_to_buffer_id: Default::default(),
 728            shared_buffers: Default::default(),
 729            loading_buffers: Default::default(),
 730            non_searchable_buffers: Default::default(),
 731            worktree_store,
 732        }
 733    }
 734
 735    pub fn remote(
 736        worktree_store: Entity<WorktreeStore>,
 737        upstream_client: AnyProtoClient,
 738        remote_id: u64,
 739        _cx: &mut Context<Self>,
 740    ) -> Self {
 741        Self {
 742            state: BufferStoreState::Remote(RemoteBufferStore {
 743                shared_with_me: Default::default(),
 744                loading_remote_buffers_by_id: Default::default(),
 745                remote_buffer_listeners: Default::default(),
 746                project_id: remote_id,
 747                upstream_client,
 748                worktree_store: worktree_store.clone(),
 749            }),
 750            downstream_client: None,
 751            opened_buffers: Default::default(),
 752            path_to_buffer_id: Default::default(),
 753            loading_buffers: Default::default(),
 754            shared_buffers: Default::default(),
 755            non_searchable_buffers: Default::default(),
 756            worktree_store,
 757        }
 758    }
 759
 760    fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> {
 761        match &mut self.state {
 762            BufferStoreState::Local(state) => Some(state),
 763            _ => None,
 764        }
 765    }
 766
 767    fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> {
 768        match &mut self.state {
 769            BufferStoreState::Remote(state) => Some(state),
 770            _ => None,
 771        }
 772    }
 773
 774    fn as_remote(&self) -> Option<&RemoteBufferStore> {
 775        match &self.state {
 776            BufferStoreState::Remote(state) => Some(state),
 777            _ => None,
 778        }
 779    }
 780
 781    pub fn open_buffer(
 782        &mut self,
 783        project_path: ProjectPath,
 784        cx: &mut Context<Self>,
 785    ) -> Task<Result<Entity<Buffer>>> {
 786        if let Some(buffer) = self.get_by_path(&project_path) {
 787            cx.emit(BufferStoreEvent::BufferOpened {
 788                buffer: buffer.clone(),
 789                project_path,
 790            });
 791
 792            return Task::ready(Ok(buffer));
 793        }
 794
 795        let task = match self.loading_buffers.entry(project_path.clone()) {
 796            hash_map::Entry::Occupied(e) => e.get().clone(),
 797            hash_map::Entry::Vacant(entry) => {
 798                let path = project_path.path.clone();
 799                let Some(worktree) = self
 800                    .worktree_store
 801                    .read(cx)
 802                    .worktree_for_id(project_path.worktree_id, cx)
 803                else {
 804                    return Task::ready(Err(anyhow!("no such worktree")));
 805                };
 806                let load_buffer = match &self.state {
 807                    BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx),
 808                    BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx),
 809                };
 810
 811                entry
 812                    .insert(
 813                        cx.spawn(async move |this, cx| {
 814                            let load_result = load_buffer.await;
 815                            this.update(cx, |this, cx| {
 816                                // Record the fact that the buffer is no longer loading.
 817                                this.loading_buffers.remove(&project_path);
 818
 819                                let buffer = load_result.map_err(Arc::new)?;
 820                                cx.emit(BufferStoreEvent::BufferOpened {
 821                                    buffer: buffer.clone(),
 822                                    project_path,
 823                                });
 824
 825                                Ok(buffer)
 826                            })?
 827                        })
 828                        .shared(),
 829                    )
 830                    .clone()
 831            }
 832        };
 833
 834        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 835    }
 836
 837    pub fn create_buffer(&mut self, cx: &mut Context<Self>) -> Task<Result<Entity<Buffer>>> {
 838        match &self.state {
 839            BufferStoreState::Local(this) => this.create_buffer(cx),
 840            BufferStoreState::Remote(this) => this.create_buffer(cx),
 841        }
 842    }
 843
 844    pub fn save_buffer(
 845        &mut self,
 846        buffer: Entity<Buffer>,
 847        cx: &mut Context<Self>,
 848    ) -> Task<Result<()>> {
 849        match &mut self.state {
 850            BufferStoreState::Local(this) => this.save_buffer(buffer, cx),
 851            BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx),
 852        }
 853    }
 854
 855    pub fn save_buffer_as(
 856        &mut self,
 857        buffer: Entity<Buffer>,
 858        path: ProjectPath,
 859        cx: &mut Context<Self>,
 860    ) -> Task<Result<()>> {
 861        let old_file = buffer.read(cx).file().cloned();
 862        let task = match &self.state {
 863            BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx),
 864            BufferStoreState::Remote(this) => {
 865                this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
 866            }
 867        };
 868        cx.spawn(async move |this, cx| {
 869            task.await?;
 870            this.update(cx, |_, cx| {
 871                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 872            })
 873        })
 874    }
 875
 876    fn add_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) -> Result<()> {
 877        let buffer = buffer_entity.read(cx);
 878        let remote_id = buffer.remote_id();
 879        let path = File::from_dyn(buffer.file()).map(|file| ProjectPath {
 880            path: file.path.clone(),
 881            worktree_id: file.worktree_id(cx),
 882        });
 883        let is_remote = buffer.replica_id() != 0;
 884        let open_buffer = OpenBuffer::Complete {
 885            buffer: buffer_entity.downgrade(),
 886        };
 887
 888        let handle = cx.entity().downgrade();
 889        buffer_entity.update(cx, move |_, cx| {
 890            cx.on_release(move |buffer, cx| {
 891                handle
 892                    .update(cx, |_, cx| {
 893                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
 894                    })
 895                    .ok();
 896            })
 897            .detach()
 898        });
 899        let _expect_path_to_exist;
 900        match self.opened_buffers.entry(remote_id) {
 901            hash_map::Entry::Vacant(entry) => {
 902                entry.insert(open_buffer);
 903                _expect_path_to_exist = false;
 904            }
 905            hash_map::Entry::Occupied(mut entry) => {
 906                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 907                    buffer_entity.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
 908                } else if entry.get().upgrade().is_some() {
 909                    if is_remote {
 910                        return Ok(());
 911                    } else {
 912                        debug_panic!("buffer {remote_id} was already registered");
 913                        anyhow::bail!("buffer {remote_id} was already registered");
 914                    }
 915                }
 916                entry.insert(open_buffer);
 917                _expect_path_to_exist = true;
 918            }
 919        }
 920
 921        if let Some(path) = path {
 922            self.path_to_buffer_id.insert(path, remote_id);
 923        }
 924
 925        cx.subscribe(&buffer_entity, Self::on_buffer_event).detach();
 926        cx.emit(BufferStoreEvent::BufferAdded(buffer_entity));
 927        Ok(())
 928    }
 929
 930    pub fn buffers(&self) -> impl '_ + Iterator<Item = Entity<Buffer>> {
 931        self.opened_buffers
 932            .values()
 933            .filter_map(|buffer| buffer.upgrade())
 934    }
 935
 936    pub fn loading_buffers(
 937        &self,
 938    ) -> impl Iterator<Item = (&ProjectPath, impl Future<Output = Result<Entity<Buffer>>>)> {
 939        self.loading_buffers.iter().map(|(path, task)| {
 940            let task = task.clone();
 941            (path, async move { task.await.map_err(|e| anyhow!("{e}")) })
 942        })
 943    }
 944
 945    pub fn buffer_id_for_project_path(&self, project_path: &ProjectPath) -> Option<&BufferId> {
 946        self.path_to_buffer_id.get(project_path)
 947    }
 948
 949    pub fn get_by_path(&self, path: &ProjectPath) -> Option<Entity<Buffer>> {
 950        self.path_to_buffer_id.get(path).and_then(|buffer_id| {
 951            let buffer = self.get(*buffer_id);
 952            buffer
 953        })
 954    }
 955
 956    pub fn get(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
 957        self.opened_buffers.get(&buffer_id)?.upgrade()
 958    }
 959
 960    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Entity<Buffer>> {
 961        self.get(buffer_id)
 962            .with_context(|| format!("unknown buffer id {buffer_id}"))
 963    }
 964
 965    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Entity<Buffer>> {
 966        self.get(buffer_id).or_else(|| {
 967            self.as_remote()
 968                .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 969        })
 970    }
 971
 972    pub fn buffer_version_info(&self, cx: &App) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 973        let buffers = self
 974            .buffers()
 975            .map(|buffer| {
 976                let buffer = buffer.read(cx);
 977                proto::BufferVersion {
 978                    id: buffer.remote_id().into(),
 979                    version: language::proto::serialize_version(&buffer.version),
 980                }
 981            })
 982            .collect();
 983        let incomplete_buffer_ids = self
 984            .as_remote()
 985            .map(|remote| remote.incomplete_buffer_ids())
 986            .unwrap_or_default();
 987        (buffers, incomplete_buffer_ids)
 988    }
 989
 990    pub fn disconnected_from_host(&mut self, cx: &mut App) {
 991        for open_buffer in self.opened_buffers.values_mut() {
 992            if let Some(buffer) = open_buffer.upgrade() {
 993                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 994            }
 995        }
 996
 997        for buffer in self.buffers() {
 998            buffer.update(cx, |buffer, cx| {
 999                buffer.set_capability(Capability::ReadOnly, cx)
1000            });
1001        }
1002
1003        if let Some(remote) = self.as_remote_mut() {
1004            // Wake up all futures currently waiting on a buffer to get opened,
1005            // to give them a chance to fail now that we've disconnected.
1006            remote.remote_buffer_listeners.clear()
1007        }
1008    }
1009
1010    pub fn shared(&mut self, remote_id: u64, downstream_client: AnyProtoClient, _cx: &mut App) {
1011        self.downstream_client = Some((downstream_client, remote_id));
1012    }
1013
1014    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
1015        self.downstream_client.take();
1016        self.forget_shared_buffers();
1017    }
1018
1019    pub fn discard_incomplete(&mut self) {
1020        self.opened_buffers
1021            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
1022    }
1023
1024    fn buffer_changed_file(&mut self, buffer: Entity<Buffer>, cx: &mut App) -> Option<()> {
1025        let file = File::from_dyn(buffer.read(cx).file())?;
1026
1027        let remote_id = buffer.read(cx).remote_id();
1028        if let Some(entry_id) = file.entry_id {
1029            if let Some(local) = self.as_local_mut() {
1030                match local.local_buffer_ids_by_entry_id.get(&entry_id) {
1031                    Some(_) => {
1032                        return None;
1033                    }
1034                    None => {
1035                        local
1036                            .local_buffer_ids_by_entry_id
1037                            .insert(entry_id, remote_id);
1038                    }
1039                }
1040            }
1041            self.path_to_buffer_id.insert(
1042                ProjectPath {
1043                    worktree_id: file.worktree_id(cx),
1044                    path: file.path.clone(),
1045                },
1046                remote_id,
1047            );
1048        };
1049
1050        Some(())
1051    }
1052
1053    pub fn find_search_candidates(
1054        &mut self,
1055        query: &SearchQuery,
1056        mut limit: usize,
1057        fs: Arc<dyn Fs>,
1058        cx: &mut Context<Self>,
1059    ) -> Receiver<Entity<Buffer>> {
1060        let (tx, rx) = smol::channel::unbounded();
1061        let mut open_buffers = HashSet::default();
1062        let mut unnamed_buffers = Vec::new();
1063        for handle in self.buffers() {
1064            let buffer = handle.read(cx);
1065            if self.non_searchable_buffers.contains(&buffer.remote_id()) {
1066                continue;
1067            } else if let Some(entry_id) = buffer.entry_id(cx) {
1068                open_buffers.insert(entry_id);
1069            } else {
1070                limit = limit.saturating_sub(1);
1071                unnamed_buffers.push(handle)
1072            };
1073        }
1074
1075        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
1076        let project_paths_rx = self
1077            .worktree_store
1078            .update(cx, |worktree_store, cx| {
1079                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
1080            })
1081            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
1082
1083        cx.spawn(async move |this, cx| {
1084            for buffer in unnamed_buffers {
1085                tx.send(buffer).await.ok();
1086            }
1087
1088            let mut project_paths_rx = pin!(project_paths_rx);
1089            while let Some(project_paths) = project_paths_rx.next().await {
1090                let buffers = this.update(cx, |this, cx| {
1091                    project_paths
1092                        .into_iter()
1093                        .map(|project_path| this.open_buffer(project_path, cx))
1094                        .collect::<Vec<_>>()
1095                })?;
1096                for buffer_task in buffers {
1097                    if let Some(buffer) = buffer_task.await.log_err()
1098                        && tx.send(buffer).await.is_err()
1099                    {
1100                        return anyhow::Ok(());
1101                    }
1102                }
1103            }
1104            anyhow::Ok(())
1105        })
1106        .detach();
1107        rx
1108    }
1109
1110    fn on_buffer_event(
1111        &mut self,
1112        buffer: Entity<Buffer>,
1113        event: &BufferEvent,
1114        cx: &mut Context<Self>,
1115    ) {
1116        match event {
1117            BufferEvent::FileHandleChanged => {
1118                self.buffer_changed_file(buffer, cx);
1119            }
1120            BufferEvent::Reloaded => {
1121                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
1122                    return;
1123                };
1124                let buffer = buffer.read(cx);
1125                downstream_client
1126                    .send(proto::BufferReloaded {
1127                        project_id: *project_id,
1128                        buffer_id: buffer.remote_id().to_proto(),
1129                        version: serialize_version(&buffer.version()),
1130                        mtime: buffer.saved_mtime().map(|t| t.into()),
1131                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
1132                    })
1133                    .log_err();
1134            }
1135            BufferEvent::LanguageChanged => {}
1136            _ => {}
1137        }
1138    }
1139
1140    pub async fn handle_update_buffer(
1141        this: Entity<Self>,
1142        envelope: TypedEnvelope<proto::UpdateBuffer>,
1143        mut cx: AsyncApp,
1144    ) -> Result<proto::Ack> {
1145        let payload = envelope.payload.clone();
1146        let buffer_id = BufferId::new(payload.buffer_id)?;
1147        let ops = payload
1148            .operations
1149            .into_iter()
1150            .map(language::proto::deserialize_operation)
1151            .collect::<Result<Vec<_>, _>>()?;
1152        this.update(&mut cx, |this, cx| {
1153            match this.opened_buffers.entry(buffer_id) {
1154                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1155                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1156                    OpenBuffer::Complete { buffer, .. } => {
1157                        if let Some(buffer) = buffer.upgrade() {
1158                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1159                        }
1160                    }
1161                },
1162                hash_map::Entry::Vacant(e) => {
1163                    e.insert(OpenBuffer::Operations(ops));
1164                }
1165            }
1166            Ok(proto::Ack {})
1167        })?
1168    }
1169
1170    pub fn register_shared_lsp_handle(
1171        &mut self,
1172        peer_id: proto::PeerId,
1173        buffer_id: BufferId,
1174        handle: OpenLspBufferHandle,
1175    ) {
1176        if let Some(shared_buffers) = self.shared_buffers.get_mut(&peer_id)
1177            && let Some(buffer) = shared_buffers.get_mut(&buffer_id)
1178        {
1179            buffer.lsp_handle = Some(handle);
1180            return;
1181        }
1182        debug_panic!("tried to register shared lsp handle, but buffer was not shared")
1183    }
1184
1185    pub fn handle_synchronize_buffers(
1186        &mut self,
1187        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1188        cx: &mut Context<Self>,
1189        client: Arc<Client>,
1190    ) -> Result<proto::SynchronizeBuffersResponse> {
1191        let project_id = envelope.payload.project_id;
1192        let mut response = proto::SynchronizeBuffersResponse {
1193            buffers: Default::default(),
1194        };
1195        let Some(guest_id) = envelope.original_sender_id else {
1196            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1197        };
1198
1199        self.shared_buffers.entry(guest_id).or_default().clear();
1200        for buffer in envelope.payload.buffers {
1201            let buffer_id = BufferId::new(buffer.id)?;
1202            let remote_version = language::proto::deserialize_version(&buffer.version);
1203            if let Some(buffer) = self.get(buffer_id) {
1204                self.shared_buffers
1205                    .entry(guest_id)
1206                    .or_default()
1207                    .entry(buffer_id)
1208                    .or_insert_with(|| SharedBuffer {
1209                        buffer: buffer.clone(),
1210                        lsp_handle: None,
1211                    });
1212
1213                let buffer = buffer.read(cx);
1214                response.buffers.push(proto::BufferVersion {
1215                    id: buffer_id.into(),
1216                    version: language::proto::serialize_version(&buffer.version),
1217                });
1218
1219                let operations = buffer.serialize_ops(Some(remote_version), cx);
1220                let client = client.clone();
1221                if let Some(file) = buffer.file() {
1222                    client
1223                        .send(proto::UpdateBufferFile {
1224                            project_id,
1225                            buffer_id: buffer_id.into(),
1226                            file: Some(file.to_proto(cx)),
1227                        })
1228                        .log_err();
1229                }
1230
1231                // TODO(max): do something
1232                // client
1233                //     .send(proto::UpdateStagedText {
1234                //         project_id,
1235                //         buffer_id: buffer_id.into(),
1236                //         diff_base: buffer.diff_base().map(ToString::to_string),
1237                //     })
1238                //     .log_err();
1239
1240                client
1241                    .send(proto::BufferReloaded {
1242                        project_id,
1243                        buffer_id: buffer_id.into(),
1244                        version: language::proto::serialize_version(buffer.saved_version()),
1245                        mtime: buffer.saved_mtime().map(|time| time.into()),
1246                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1247                            as i32,
1248                    })
1249                    .log_err();
1250
1251                cx.background_spawn(
1252                    async move {
1253                        let operations = operations.await;
1254                        for chunk in split_operations(operations) {
1255                            client
1256                                .request(proto::UpdateBuffer {
1257                                    project_id,
1258                                    buffer_id: buffer_id.into(),
1259                                    operations: chunk,
1260                                })
1261                                .await?;
1262                        }
1263                        anyhow::Ok(())
1264                    }
1265                    .log_err(),
1266                )
1267                .detach();
1268            }
1269        }
1270        Ok(response)
1271    }
1272
1273    pub fn handle_create_buffer_for_peer(
1274        &mut self,
1275        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1276        replica_id: u16,
1277        capability: Capability,
1278        cx: &mut Context<Self>,
1279    ) -> Result<()> {
1280        let remote = self
1281            .as_remote_mut()
1282            .context("buffer store is not a remote")?;
1283
1284        if let Some(buffer) =
1285            remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)?
1286        {
1287            self.add_buffer(buffer, cx)?;
1288        }
1289
1290        Ok(())
1291    }
1292
1293    pub async fn handle_update_buffer_file(
1294        this: Entity<Self>,
1295        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1296        mut cx: AsyncApp,
1297    ) -> Result<()> {
1298        let buffer_id = envelope.payload.buffer_id;
1299        let buffer_id = BufferId::new(buffer_id)?;
1300
1301        this.update(&mut cx, |this, cx| {
1302            let payload = envelope.payload.clone();
1303            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1304                let file = payload.file.context("invalid file")?;
1305                let worktree = this
1306                    .worktree_store
1307                    .read(cx)
1308                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1309                    .context("no such worktree")?;
1310                let file = File::from_proto(file, worktree, cx)?;
1311                let old_file = buffer.update(cx, |buffer, cx| {
1312                    let old_file = buffer.file().cloned();
1313                    let new_path = file.path.clone();
1314
1315                    buffer.file_updated(Arc::new(file), cx);
1316                    if old_file
1317                        .as_ref()
1318                        .map_or(true, |old| *old.path() != new_path)
1319                    {
1320                        Some(old_file)
1321                    } else {
1322                        None
1323                    }
1324                });
1325                if let Some(old_file) = old_file {
1326                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1327                }
1328            }
1329            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1330                downstream_client
1331                    .send(proto::UpdateBufferFile {
1332                        project_id: *project_id,
1333                        buffer_id: buffer_id.into(),
1334                        file: envelope.payload.file,
1335                    })
1336                    .log_err();
1337            }
1338            Ok(())
1339        })?
1340    }
1341
1342    pub async fn handle_save_buffer(
1343        this: Entity<Self>,
1344        envelope: TypedEnvelope<proto::SaveBuffer>,
1345        mut cx: AsyncApp,
1346    ) -> Result<proto::BufferSaved> {
1347        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1348        let (buffer, project_id) = this.read_with(&cx, |this, _| {
1349            anyhow::Ok((
1350                this.get_existing(buffer_id)?,
1351                this.downstream_client
1352                    .as_ref()
1353                    .map(|(_, project_id)| *project_id)
1354                    .context("project is not shared")?,
1355            ))
1356        })??;
1357        buffer
1358            .update(&mut cx, |buffer, _| {
1359                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1360            })?
1361            .await?;
1362        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1363
1364        if let Some(new_path) = envelope.payload.new_path {
1365            let new_path = ProjectPath::from_proto(new_path);
1366            this.update(&mut cx, |this, cx| {
1367                this.save_buffer_as(buffer.clone(), new_path, cx)
1368            })?
1369            .await?;
1370        } else {
1371            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1372                .await?;
1373        }
1374
1375        buffer.read_with(&cx, |buffer, _| proto::BufferSaved {
1376            project_id,
1377            buffer_id: buffer_id.into(),
1378            version: serialize_version(buffer.saved_version()),
1379            mtime: buffer.saved_mtime().map(|time| time.into()),
1380        })
1381    }
1382
1383    pub async fn handle_close_buffer(
1384        this: Entity<Self>,
1385        envelope: TypedEnvelope<proto::CloseBuffer>,
1386        mut cx: AsyncApp,
1387    ) -> Result<()> {
1388        let peer_id = envelope.sender_id;
1389        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1390        this.update(&mut cx, |this, cx| {
1391            if let Some(shared) = this.shared_buffers.get_mut(&peer_id)
1392                && shared.remove(&buffer_id).is_some()
1393            {
1394                cx.emit(BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id));
1395                if shared.is_empty() {
1396                    this.shared_buffers.remove(&peer_id);
1397                }
1398                return;
1399            }
1400            debug_panic!(
1401                "peer_id {} closed buffer_id {} which was either not open or already closed",
1402                peer_id,
1403                buffer_id
1404            )
1405        })
1406    }
1407
1408    pub async fn handle_buffer_saved(
1409        this: Entity<Self>,
1410        envelope: TypedEnvelope<proto::BufferSaved>,
1411        mut cx: AsyncApp,
1412    ) -> Result<()> {
1413        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1414        let version = deserialize_version(&envelope.payload.version);
1415        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1416        this.update(&mut cx, move |this, cx| {
1417            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1418                buffer.update(cx, |buffer, cx| {
1419                    buffer.did_save(version, mtime, cx);
1420                });
1421            }
1422
1423            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1424                downstream_client
1425                    .send(proto::BufferSaved {
1426                        project_id: *project_id,
1427                        buffer_id: buffer_id.into(),
1428                        mtime: envelope.payload.mtime,
1429                        version: envelope.payload.version,
1430                    })
1431                    .log_err();
1432            }
1433        })
1434    }
1435
1436    pub async fn handle_buffer_reloaded(
1437        this: Entity<Self>,
1438        envelope: TypedEnvelope<proto::BufferReloaded>,
1439        mut cx: AsyncApp,
1440    ) -> Result<()> {
1441        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1442        let version = deserialize_version(&envelope.payload.version);
1443        let mtime = envelope.payload.mtime.clone().map(|time| time.into());
1444        let line_ending = deserialize_line_ending(
1445            proto::LineEnding::from_i32(envelope.payload.line_ending)
1446                .context("missing line ending")?,
1447        );
1448        this.update(&mut cx, |this, cx| {
1449            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1450                buffer.update(cx, |buffer, cx| {
1451                    buffer.did_reload(version, line_ending, mtime, cx);
1452                });
1453            }
1454
1455            if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
1456                downstream_client
1457                    .send(proto::BufferReloaded {
1458                        project_id: *project_id,
1459                        buffer_id: buffer_id.into(),
1460                        mtime: envelope.payload.mtime,
1461                        version: envelope.payload.version,
1462                        line_ending: envelope.payload.line_ending,
1463                    })
1464                    .log_err();
1465            }
1466        })
1467    }
1468
1469    pub fn reload_buffers(
1470        &self,
1471        buffers: HashSet<Entity<Buffer>>,
1472        push_to_history: bool,
1473        cx: &mut Context<Self>,
1474    ) -> Task<Result<ProjectTransaction>> {
1475        if buffers.is_empty() {
1476            return Task::ready(Ok(ProjectTransaction::default()));
1477        }
1478        match &self.state {
1479            BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx),
1480            BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx),
1481        }
1482    }
1483
1484    async fn handle_reload_buffers(
1485        this: Entity<Self>,
1486        envelope: TypedEnvelope<proto::ReloadBuffers>,
1487        mut cx: AsyncApp,
1488    ) -> Result<proto::ReloadBuffersResponse> {
1489        let sender_id = envelope.original_sender_id().unwrap_or_default();
1490        let reload = this.update(&mut cx, |this, cx| {
1491            let mut buffers = HashSet::default();
1492            for buffer_id in &envelope.payload.buffer_ids {
1493                let buffer_id = BufferId::new(*buffer_id)?;
1494                buffers.insert(this.get_existing(buffer_id)?);
1495            }
1496            anyhow::Ok(this.reload_buffers(buffers, false, cx))
1497        })??;
1498
1499        let project_transaction = reload.await?;
1500        let project_transaction = this.update(&mut cx, |this, cx| {
1501            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1502        })?;
1503        Ok(proto::ReloadBuffersResponse {
1504            transaction: Some(project_transaction),
1505        })
1506    }
1507
1508    pub fn create_buffer_for_peer(
1509        &mut self,
1510        buffer: &Entity<Buffer>,
1511        peer_id: proto::PeerId,
1512        cx: &mut Context<Self>,
1513    ) -> Task<Result<()>> {
1514        let buffer_id = buffer.read(cx).remote_id();
1515        let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
1516        if shared_buffers.contains_key(&buffer_id) {
1517            return Task::ready(Ok(()));
1518        }
1519        shared_buffers.insert(
1520            buffer_id,
1521            SharedBuffer {
1522                buffer: buffer.clone(),
1523                lsp_handle: None,
1524            },
1525        );
1526
1527        let Some((client, project_id)) = self.downstream_client.clone() else {
1528            return Task::ready(Ok(()));
1529        };
1530
1531        cx.spawn(async move |this, cx| {
1532            let Some(buffer) = this.read_with(cx, |this, _| this.get(buffer_id))? else {
1533                return anyhow::Ok(());
1534            };
1535
1536            let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
1537            let operations = operations.await;
1538            let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
1539
1540            let initial_state = proto::CreateBufferForPeer {
1541                project_id,
1542                peer_id: Some(peer_id),
1543                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1544            };
1545
1546            if client.send(initial_state).log_err().is_some() {
1547                let client = client.clone();
1548                cx.background_spawn(async move {
1549                    let mut chunks = split_operations(operations).peekable();
1550                    while let Some(chunk) = chunks.next() {
1551                        let is_last = chunks.peek().is_none();
1552                        client.send(proto::CreateBufferForPeer {
1553                            project_id,
1554                            peer_id: Some(peer_id),
1555                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1556                                proto::BufferChunk {
1557                                    buffer_id: buffer_id.into(),
1558                                    operations: chunk,
1559                                    is_last,
1560                                },
1561                            )),
1562                        })?;
1563                    }
1564                    anyhow::Ok(())
1565                })
1566                .await
1567                .log_err();
1568            }
1569            Ok(())
1570        })
1571    }
1572
1573    pub fn forget_shared_buffers(&mut self) {
1574        self.shared_buffers.clear();
1575    }
1576
1577    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1578        self.shared_buffers.remove(peer_id);
1579    }
1580
1581    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1582        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1583            self.shared_buffers.insert(new_peer_id, buffers);
1584        }
1585    }
1586
1587    pub fn has_shared_buffers(&self) -> bool {
1588        !self.shared_buffers.is_empty()
1589    }
1590
1591    pub fn create_local_buffer(
1592        &mut self,
1593        text: &str,
1594        language: Option<Arc<Language>>,
1595        cx: &mut Context<Self>,
1596    ) -> Entity<Buffer> {
1597        let buffer = cx.new(|cx| {
1598            Buffer::local(text, cx)
1599                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
1600        });
1601
1602        self.add_buffer(buffer.clone(), cx).log_err();
1603        let buffer_id = buffer.read(cx).remote_id();
1604
1605        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
1606            self.path_to_buffer_id.insert(
1607                ProjectPath {
1608                    worktree_id: file.worktree_id(cx),
1609                    path: file.path.clone(),
1610                },
1611                buffer_id,
1612            );
1613            let this = self
1614                .as_local_mut()
1615                .expect("local-only method called in a non-local context");
1616            if let Some(entry_id) = file.entry_id {
1617                this.local_buffer_ids_by_entry_id
1618                    .insert(entry_id, buffer_id);
1619            }
1620        }
1621        buffer
1622    }
1623
1624    pub fn deserialize_project_transaction(
1625        &mut self,
1626        message: proto::ProjectTransaction,
1627        push_to_history: bool,
1628        cx: &mut Context<Self>,
1629    ) -> Task<Result<ProjectTransaction>> {
1630        if let Some(this) = self.as_remote_mut() {
1631            this.deserialize_project_transaction(message, push_to_history, cx)
1632        } else {
1633            debug_panic!("not a remote buffer store");
1634            Task::ready(Err(anyhow!("not a remote buffer store")))
1635        }
1636    }
1637
1638    pub fn wait_for_remote_buffer(
1639        &mut self,
1640        id: BufferId,
1641        cx: &mut Context<BufferStore>,
1642    ) -> Task<Result<Entity<Buffer>>> {
1643        if let Some(this) = self.as_remote_mut() {
1644            this.wait_for_remote_buffer(id, cx)
1645        } else {
1646            debug_panic!("not a remote buffer store");
1647            Task::ready(Err(anyhow!("not a remote buffer store")))
1648        }
1649    }
1650
1651    pub fn serialize_project_transaction_for_peer(
1652        &mut self,
1653        project_transaction: ProjectTransaction,
1654        peer_id: proto::PeerId,
1655        cx: &mut Context<Self>,
1656    ) -> proto::ProjectTransaction {
1657        let mut serialized_transaction = proto::ProjectTransaction {
1658            buffer_ids: Default::default(),
1659            transactions: Default::default(),
1660        };
1661        for (buffer, transaction) in project_transaction.0 {
1662            self.create_buffer_for_peer(&buffer, peer_id, cx)
1663                .detach_and_log_err(cx);
1664            serialized_transaction
1665                .buffer_ids
1666                .push(buffer.read(cx).remote_id().into());
1667            serialized_transaction
1668                .transactions
1669                .push(language::proto::serialize_transaction(&transaction));
1670        }
1671        serialized_transaction
1672    }
1673
1674    pub(crate) fn mark_buffer_as_non_searchable(&mut self, buffer_id: BufferId) {
1675        self.non_searchable_buffers.insert(buffer_id);
1676    }
1677}
1678
1679impl OpenBuffer {
1680    fn upgrade(&self) -> Option<Entity<Buffer>> {
1681        match self {
1682            OpenBuffer::Complete { buffer, .. } => buffer.upgrade(),
1683            OpenBuffer::Operations(_) => None,
1684        }
1685    }
1686}
1687
1688fn is_not_found_error(error: &anyhow::Error) -> bool {
1689    error
1690        .root_cause()
1691        .downcast_ref::<io::Error>()
1692        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1693}