buffer_store.rs

   1use crate::{
   2    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   3    ProjectPath,
   4};
   5use anyhow::{anyhow, Result};
   6use collections::{hash_map, HashMap};
   7use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
   8use gpui::{
   9    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  10};
  11use language::{
  12    proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
  13    Buffer, Capability, Event as BufferEvent, Language, Operation,
  14};
  15use rpc::{
  16    proto::{self, AnyProtoClient, PeerId},
  17    ErrorExt as _, TypedEnvelope,
  18};
  19use std::{io, path::Path, sync::Arc};
  20use text::BufferId;
  21use util::{debug_panic, maybe, ResultExt as _};
  22use worktree::{
  23    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
  24};
  25
  26/// A set of open buffers.
  27pub struct BufferStore {
  28    retain_buffers: bool,
  29    #[allow(unused)]
  30    worktree_store: Model<WorktreeStore>,
  31    opened_buffers: HashMap<BufferId, OpenBuffer>,
  32    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  33    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  34    #[allow(clippy::type_complexity)]
  35    loading_buffers_by_path: HashMap<
  36        ProjectPath,
  37        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  38    >,
  39    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  40    remote_buffer_listeners:
  41        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  42}
  43
  44enum OpenBuffer {
  45    Strong(Model<Buffer>),
  46    Weak(WeakModel<Buffer>),
  47    Operations(Vec<Operation>),
  48}
  49
  50pub enum BufferStoreEvent {
  51    BufferAdded(Model<Buffer>),
  52    BufferChangedFilePath {
  53        buffer: Model<Buffer>,
  54        old_file: Option<Arc<File>>,
  55    },
  56    BufferSaved {
  57        buffer: Model<Buffer>,
  58        has_changed_file: bool,
  59        saved_version: clock::Global,
  60    },
  61    LocalBufferUpdated {
  62        buffer: Model<Buffer>,
  63    },
  64    DiffBaseUpdated {
  65        buffer: Model<Buffer>,
  66    },
  67}
  68
  69impl EventEmitter<BufferStoreEvent> for BufferStore {}
  70
  71impl BufferStore {
  72    /// Creates a buffer store, optionally retaining its buffers.
  73    ///
  74    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
  75    /// and won't be released unless they are explicitly removed, or `retain_buffers`
  76    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
  77    /// weak handles.
  78    pub fn new(
  79        worktree_store: Model<WorktreeStore>,
  80        retain_buffers: bool,
  81        cx: &mut ModelContext<Self>,
  82    ) -> Self {
  83        cx.subscribe(&worktree_store, |this, _, event, cx| match event {
  84            WorktreeStoreEvent::WorktreeAdded(worktree) => {
  85                this.subscribe_to_worktree(worktree, cx);
  86            }
  87            _ => {}
  88        })
  89        .detach();
  90
  91        Self {
  92            retain_buffers,
  93            worktree_store,
  94            opened_buffers: Default::default(),
  95            remote_buffer_listeners: Default::default(),
  96            loading_remote_buffers_by_id: Default::default(),
  97            local_buffer_ids_by_path: Default::default(),
  98            local_buffer_ids_by_entry_id: Default::default(),
  99            loading_buffers_by_path: Default::default(),
 100        }
 101    }
 102
 103    pub fn open_buffer(
 104        &mut self,
 105        project_path: ProjectPath,
 106        cx: &mut ModelContext<Self>,
 107    ) -> Task<Result<Model<Buffer>>> {
 108        let existing_buffer = self.get_by_path(&project_path, cx);
 109        if let Some(existing_buffer) = existing_buffer {
 110            return Task::ready(Ok(existing_buffer));
 111        }
 112
 113        let Some(worktree) = self
 114            .worktree_store
 115            .read(cx)
 116            .worktree_for_id(project_path.worktree_id, cx)
 117        else {
 118            return Task::ready(Err(anyhow!("no such worktree")));
 119        };
 120
 121        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 122            // If the given path is already being loaded, then wait for that existing
 123            // task to complete and return the same buffer.
 124            hash_map::Entry::Occupied(e) => e.get().clone(),
 125
 126            // Otherwise, record the fact that this path is now being loaded.
 127            hash_map::Entry::Vacant(entry) => {
 128                let (mut tx, rx) = postage::watch::channel();
 129                entry.insert(rx.clone());
 130
 131                let project_path = project_path.clone();
 132                let load_buffer = match worktree.read(cx) {
 133                    Worktree::Local(_) => {
 134                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
 135                    }
 136                    Worktree::Remote(tree) => {
 137                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
 138                    }
 139                };
 140
 141                cx.spawn(move |this, mut cx| async move {
 142                    let load_result = load_buffer.await;
 143                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
 144                        // Record the fact that the buffer is no longer loading.
 145                        this.loading_buffers_by_path.remove(&project_path);
 146                        let buffer = load_result.map_err(Arc::new)?;
 147                        Ok(buffer)
 148                    })?);
 149                    anyhow::Ok(())
 150                })
 151                .detach();
 152                rx
 153            }
 154        };
 155
 156        cx.background_executor().spawn(async move {
 157            Self::wait_for_loading_buffer(loading_watch)
 158                .await
 159                .map_err(|e| e.cloned())
 160        })
 161    }
 162
 163    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 164        cx.subscribe(worktree, |this, worktree, event, cx| {
 165            if worktree.read(cx).is_local() {
 166                match event {
 167                    worktree::Event::UpdatedEntries(changes) => {
 168                        this.local_worktree_entries_changed(&worktree, changes, cx);
 169                    }
 170                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 171                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 172                    }
 173                    _ => {}
 174                }
 175            }
 176        })
 177        .detach();
 178    }
 179
 180    fn local_worktree_entries_changed(
 181        &mut self,
 182        worktree_handle: &Model<Worktree>,
 183        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 184        cx: &mut ModelContext<Self>,
 185    ) {
 186        let snapshot = worktree_handle.read(cx).snapshot();
 187        for (path, entry_id, _) in changes {
 188            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 189        }
 190    }
 191
 192    fn local_worktree_git_repos_changed(
 193        &mut self,
 194        worktree_handle: Model<Worktree>,
 195        changed_repos: &UpdatedGitRepositoriesSet,
 196        cx: &mut ModelContext<Self>,
 197    ) {
 198        debug_assert!(worktree_handle.read(cx).is_local());
 199
 200        // Identify the loading buffers whose containing repository that has changed.
 201        let future_buffers = self
 202            .loading_buffers()
 203            .filter_map(|(project_path, receiver)| {
 204                if project_path.worktree_id != worktree_handle.read(cx).id() {
 205                    return None;
 206                }
 207                let path = &project_path.path;
 208                changed_repos
 209                    .iter()
 210                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 211                let path = path.clone();
 212                Some(async move {
 213                    Self::wait_for_loading_buffer(receiver)
 214                        .await
 215                        .ok()
 216                        .map(|buffer| (buffer, path))
 217                })
 218            })
 219            .collect::<FuturesUnordered<_>>();
 220
 221        // Identify the current buffers whose containing repository has changed.
 222        let current_buffers = self
 223            .buffers()
 224            .filter_map(|buffer| {
 225                let file = File::from_dyn(buffer.read(cx).file())?;
 226                if file.worktree != worktree_handle {
 227                    return None;
 228                }
 229                changed_repos
 230                    .iter()
 231                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 232                Some((buffer, file.path.clone()))
 233            })
 234            .collect::<Vec<_>>();
 235
 236        if future_buffers.len() + current_buffers.len() == 0 {
 237            return;
 238        }
 239
 240        cx.spawn(move |this, mut cx| async move {
 241            // Wait for all of the buffers to load.
 242            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 243
 244            // Reload the diff base for every buffer whose containing git repository has changed.
 245            let snapshot =
 246                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 247            let diff_bases_by_buffer = cx
 248                .background_executor()
 249                .spawn(async move {
 250                    let mut diff_base_tasks = future_buffers
 251                        .into_iter()
 252                        .flatten()
 253                        .chain(current_buffers)
 254                        .filter_map(|(buffer, path)| {
 255                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 256                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 257                            Some(async move {
 258                                let base_text =
 259                                    local_repo_entry.repo().load_index_text(&relative_path);
 260                                Some((buffer, base_text))
 261                            })
 262                        })
 263                        .collect::<FuturesUnordered<_>>();
 264
 265                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 266                    while let Some(diff_base) = diff_base_tasks.next().await {
 267                        if let Some(diff_base) = diff_base {
 268                            diff_bases.push(diff_base);
 269                        }
 270                    }
 271                    diff_bases
 272                })
 273                .await;
 274
 275            this.update(&mut cx, |_, cx| {
 276                // Assign the new diff bases on all of the buffers.
 277                for (buffer, diff_base) in diff_bases_by_buffer {
 278                    buffer.update(cx, |buffer, cx| {
 279                        buffer.set_diff_base(diff_base.clone(), cx);
 280                    });
 281                    cx.emit(BufferStoreEvent::DiffBaseUpdated { buffer })
 282                }
 283            })
 284        })
 285        .detach_and_log_err(cx);
 286    }
 287
 288    fn open_local_buffer_internal(
 289        &mut self,
 290        path: Arc<Path>,
 291        worktree: Model<Worktree>,
 292        cx: &mut ModelContext<Self>,
 293    ) -> Task<Result<Model<Buffer>>> {
 294        let load_buffer = worktree.update(cx, |worktree, cx| {
 295            let load_file = worktree.load_file(path.as_ref(), cx);
 296            let reservation = cx.reserve_model();
 297            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 298            cx.spawn(move |_, mut cx| async move {
 299                let loaded = load_file.await?;
 300                let text_buffer = cx
 301                    .background_executor()
 302                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 303                    .await;
 304                cx.insert_model(reservation, |_| {
 305                    Buffer::build(
 306                        text_buffer,
 307                        loaded.diff_base,
 308                        Some(loaded.file),
 309                        Capability::ReadWrite,
 310                    )
 311                })
 312            })
 313        });
 314
 315        cx.spawn(move |this, mut cx| async move {
 316            let buffer = match load_buffer.await {
 317                Ok(buffer) => Ok(buffer),
 318                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 319                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 320                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 321                    Buffer::build(
 322                        text_buffer,
 323                        None,
 324                        Some(Arc::new(File {
 325                            worktree,
 326                            path,
 327                            mtime: None,
 328                            entry_id: None,
 329                            is_local: true,
 330                            is_deleted: false,
 331                            is_private: false,
 332                        })),
 333                        Capability::ReadWrite,
 334                    )
 335                }),
 336                Err(e) => Err(e),
 337            }?;
 338            this.update(&mut cx, |this, cx| {
 339                this.add_buffer(buffer.clone(), cx).log_err();
 340            })?;
 341            Ok(buffer)
 342        })
 343    }
 344
 345    fn open_remote_buffer_internal(
 346        &self,
 347        path: &Arc<Path>,
 348        worktree: &RemoteWorktree,
 349        cx: &ModelContext<Self>,
 350    ) -> Task<Result<Model<Buffer>>> {
 351        let worktree_id = worktree.id().to_proto();
 352        let project_id = worktree.project_id();
 353        let client = worktree.client();
 354        let path_string = path.clone().to_string_lossy().to_string();
 355        cx.spawn(move |this, mut cx| async move {
 356            let response = client
 357                .request(proto::OpenBufferByPath {
 358                    project_id,
 359                    worktree_id,
 360                    path: path_string,
 361                })
 362                .await?;
 363            let buffer_id = BufferId::new(response.buffer_id)?;
 364            this.update(&mut cx, |this, cx| {
 365                this.wait_for_remote_buffer(buffer_id, cx)
 366            })?
 367            .await
 368        })
 369    }
 370
 371    pub fn create_buffer(
 372        &mut self,
 373        remote_client: Option<(AnyProtoClient, u64)>,
 374        cx: &mut ModelContext<Self>,
 375    ) -> Task<Result<Model<Buffer>>> {
 376        if let Some((remote_client, project_id)) = remote_client {
 377            let create = remote_client.request(proto::OpenNewBuffer { project_id });
 378            cx.spawn(|this, mut cx| async move {
 379                let response = create.await?;
 380                let buffer_id = BufferId::new(response.buffer_id)?;
 381
 382                this.update(&mut cx, |this, cx| {
 383                    this.wait_for_remote_buffer(buffer_id, cx)
 384                })?
 385                .await
 386            })
 387        } else {
 388            Task::ready(Ok(self.create_local_buffer("", None, cx)))
 389        }
 390    }
 391
 392    pub fn create_local_buffer(
 393        &mut self,
 394        text: &str,
 395        language: Option<Arc<Language>>,
 396        cx: &mut ModelContext<Self>,
 397    ) -> Model<Buffer> {
 398        let buffer = cx.new_model(|cx| {
 399            Buffer::local(text, cx)
 400                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 401        });
 402        self.add_buffer(buffer.clone(), cx).log_err();
 403        buffer
 404    }
 405
 406    pub fn save_buffer(
 407        &mut self,
 408        buffer: Model<Buffer>,
 409        cx: &mut ModelContext<Self>,
 410    ) -> Task<Result<()>> {
 411        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 412            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 413        };
 414        match file.worktree.read(cx) {
 415            Worktree::Local(_) => {
 416                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
 417            }
 418            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
 419        }
 420    }
 421
 422    pub fn save_buffer_as(
 423        &mut self,
 424        buffer: Model<Buffer>,
 425        path: ProjectPath,
 426        cx: &mut ModelContext<Self>,
 427    ) -> Task<Result<()>> {
 428        let Some(worktree) = self
 429            .worktree_store
 430            .read(cx)
 431            .worktree_for_id(path.worktree_id, cx)
 432        else {
 433            return Task::ready(Err(anyhow!("no such worktree")));
 434        };
 435
 436        let old_file = File::from_dyn(buffer.read(cx).file())
 437            .cloned()
 438            .map(Arc::new);
 439
 440        let task = match worktree.read(cx) {
 441            Worktree::Local(_) => {
 442                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
 443            }
 444            Worktree::Remote(tree) => {
 445                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
 446            }
 447        };
 448        cx.spawn(|this, mut cx| async move {
 449            task.await?;
 450            this.update(&mut cx, |_, cx| {
 451                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 452            })
 453        })
 454    }
 455
 456    fn save_local_buffer(
 457        &self,
 458        worktree: Model<Worktree>,
 459        buffer_handle: Model<Buffer>,
 460        path: Arc<Path>,
 461        mut has_changed_file: bool,
 462        cx: &mut ModelContext<Self>,
 463    ) -> Task<Result<()>> {
 464        let buffer = buffer_handle.read(cx);
 465        let text = buffer.as_rope().clone();
 466        let line_ending = buffer.line_ending();
 467        let version = buffer.version();
 468        if buffer.file().is_some_and(|file| !file.is_created()) {
 469            has_changed_file = true;
 470        }
 471
 472        let save = worktree.update(cx, |worktree, cx| {
 473            worktree.write_file(path.as_ref(), text, line_ending, cx)
 474        });
 475
 476        cx.spawn(move |this, mut cx| async move {
 477            let new_file = save.await?;
 478            let mtime = new_file.mtime;
 479            buffer_handle.update(&mut cx, |buffer, cx| {
 480                if has_changed_file {
 481                    buffer.file_updated(new_file, cx);
 482                }
 483                buffer.did_save(version.clone(), mtime, cx);
 484            })?;
 485            this.update(&mut cx, |_, cx| {
 486                cx.emit(BufferStoreEvent::BufferSaved {
 487                    buffer: buffer_handle,
 488                    has_changed_file,
 489                    saved_version: version,
 490                })
 491            })?;
 492            Ok(())
 493        })
 494    }
 495
 496    fn save_remote_buffer(
 497        &self,
 498        buffer_handle: Model<Buffer>,
 499        new_path: Option<proto::ProjectPath>,
 500        tree: &RemoteWorktree,
 501        cx: &ModelContext<Self>,
 502    ) -> Task<Result<()>> {
 503        let buffer = buffer_handle.read(cx);
 504        let buffer_id = buffer.remote_id().into();
 505        let version = buffer.version();
 506        let rpc = tree.client();
 507        let project_id = tree.project_id();
 508        cx.spawn(move |_, mut cx| async move {
 509            let response = rpc
 510                .request(proto::SaveBuffer {
 511                    project_id,
 512                    buffer_id,
 513                    new_path,
 514                    version: serialize_version(&version),
 515                })
 516                .await?;
 517            let version = deserialize_version(&response.version);
 518            let mtime = response.mtime.map(|mtime| mtime.into());
 519
 520            buffer_handle.update(&mut cx, |buffer, cx| {
 521                buffer.did_save(version.clone(), mtime, cx);
 522            })?;
 523
 524            Ok(())
 525        })
 526    }
 527
 528    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
 529        let remote_id = buffer.read(cx).remote_id();
 530        let is_remote = buffer.read(cx).replica_id() != 0;
 531        let open_buffer = if self.retain_buffers {
 532            OpenBuffer::Strong(buffer.clone())
 533        } else {
 534            OpenBuffer::Weak(buffer.downgrade())
 535        };
 536
 537        match self.opened_buffers.entry(remote_id) {
 538            hash_map::Entry::Vacant(entry) => {
 539                entry.insert(open_buffer);
 540            }
 541            hash_map::Entry::Occupied(mut entry) => {
 542                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 543                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
 544                } else if entry.get().upgrade().is_some() {
 545                    if is_remote {
 546                        return Ok(());
 547                    } else {
 548                        debug_panic!("buffer {} was already registered", remote_id);
 549                        Err(anyhow!("buffer {} was already registered", remote_id))?;
 550                    }
 551                }
 552                entry.insert(open_buffer);
 553            }
 554        }
 555
 556        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
 557            for sender in senders {
 558                sender.send(Ok(buffer.clone())).ok();
 559            }
 560        }
 561
 562        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 563            if file.is_local {
 564                self.local_buffer_ids_by_path.insert(
 565                    ProjectPath {
 566                        worktree_id: file.worktree_id(cx),
 567                        path: file.path.clone(),
 568                    },
 569                    remote_id,
 570                );
 571
 572                if let Some(entry_id) = file.entry_id {
 573                    self.local_buffer_ids_by_entry_id
 574                        .insert(entry_id, remote_id);
 575                }
 576            }
 577        }
 578
 579        cx.subscribe(&buffer, Self::on_buffer_event).detach();
 580        cx.emit(BufferStoreEvent::BufferAdded(buffer));
 581        Ok(())
 582    }
 583
 584    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
 585        self.opened_buffers
 586            .values()
 587            .filter_map(|buffer| buffer.upgrade())
 588    }
 589
 590    pub fn loading_buffers(
 591        &self,
 592    ) -> impl Iterator<
 593        Item = (
 594            &ProjectPath,
 595            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 596        ),
 597    > {
 598        self.loading_buffers_by_path
 599            .iter()
 600            .map(|(path, rx)| (path, rx.clone()))
 601    }
 602
 603    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
 604        self.buffers().find_map(|buffer| {
 605            let file = File::from_dyn(buffer.read(cx).file())?;
 606            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
 607                Some(buffer)
 608            } else {
 609                None
 610            }
 611        })
 612    }
 613
 614    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 615        self.opened_buffers
 616            .get(&buffer_id)
 617            .and_then(|buffer| buffer.upgrade())
 618    }
 619
 620    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
 621        self.get(buffer_id)
 622            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
 623    }
 624
 625    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 626        self.get(buffer_id)
 627            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 628    }
 629
 630    pub fn wait_for_remote_buffer(
 631        &mut self,
 632        id: BufferId,
 633        cx: &mut AppContext,
 634    ) -> Task<Result<Model<Buffer>>> {
 635        let buffer = self.get(id);
 636        if let Some(buffer) = buffer {
 637            return Task::ready(Ok(buffer));
 638        }
 639        let (tx, rx) = oneshot::channel();
 640        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 641        cx.background_executor().spawn(async move { rx.await? })
 642    }
 643
 644    pub fn buffer_version_info(
 645        &self,
 646        cx: &AppContext,
 647    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 648        let buffers = self
 649            .buffers()
 650            .map(|buffer| {
 651                let buffer = buffer.read(cx);
 652                proto::BufferVersion {
 653                    id: buffer.remote_id().into(),
 654                    version: language::proto::serialize_version(&buffer.version),
 655                }
 656            })
 657            .collect();
 658        let incomplete_buffer_ids = self
 659            .loading_remote_buffers_by_id
 660            .keys()
 661            .copied()
 662            .collect::<Vec<_>>();
 663        (buffers, incomplete_buffer_ids)
 664    }
 665
 666    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 667        self.set_retain_buffers(false, cx);
 668
 669        for buffer in self.buffers() {
 670            buffer.update(cx, |buffer, cx| {
 671                buffer.set_capability(Capability::ReadOnly, cx)
 672            });
 673        }
 674
 675        // Wake up all futures currently waiting on a buffer to get opened,
 676        // to give them a chance to fail now that we've disconnected.
 677        self.remote_buffer_listeners.clear();
 678    }
 679
 680    pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) {
 681        self.retain_buffers = retain_buffers;
 682        for open_buffer in self.opened_buffers.values_mut() {
 683            if retain_buffers {
 684                if let OpenBuffer::Weak(buffer) = open_buffer {
 685                    if let Some(buffer) = buffer.upgrade() {
 686                        *open_buffer = OpenBuffer::Strong(buffer);
 687                    }
 688                }
 689            } else {
 690                if let Some(buffer) = open_buffer.upgrade() {
 691                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 692                }
 693                if let OpenBuffer::Strong(buffer) = open_buffer {
 694                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
 695                }
 696            }
 697        }
 698    }
 699
 700    pub fn discard_incomplete(&mut self) {
 701        self.opened_buffers
 702            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
 703    }
 704
 705    fn on_buffer_event(
 706        &mut self,
 707        buffer: Model<Buffer>,
 708        event: &BufferEvent,
 709        cx: &mut ModelContext<Self>,
 710    ) {
 711        match event {
 712            BufferEvent::FileHandleChanged => {
 713                self.buffer_changed_file(buffer, cx);
 714            }
 715            _ => {}
 716        }
 717    }
 718
 719    fn local_worktree_entry_changed(
 720        &mut self,
 721        entry_id: ProjectEntryId,
 722        path: &Arc<Path>,
 723        worktree: &Model<worktree::Worktree>,
 724        snapshot: &worktree::Snapshot,
 725        cx: &mut ModelContext<Self>,
 726    ) -> Option<()> {
 727        let project_path = ProjectPath {
 728            worktree_id: snapshot.id(),
 729            path: path.clone(),
 730        };
 731        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 732            Some(&buffer_id) => buffer_id,
 733            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 734        };
 735        let buffer = if let Some(buffer) = self.get(buffer_id) {
 736            buffer
 737        } else {
 738            self.opened_buffers.remove(&buffer_id);
 739            self.local_buffer_ids_by_path.remove(&project_path);
 740            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 741            return None;
 742        };
 743
 744        let (old_file, new_file) = buffer.update(cx, |buffer, cx| {
 745            let old_file = File::from_dyn(buffer.file())?;
 746            if old_file.worktree != *worktree {
 747                return None;
 748            }
 749
 750            let new_file = if let Some(entry) = old_file
 751                .entry_id
 752                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 753            {
 754                File {
 755                    is_local: true,
 756                    entry_id: Some(entry.id),
 757                    mtime: entry.mtime,
 758                    path: entry.path.clone(),
 759                    worktree: worktree.clone(),
 760                    is_deleted: false,
 761                    is_private: entry.is_private,
 762                }
 763            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 764                File {
 765                    is_local: true,
 766                    entry_id: Some(entry.id),
 767                    mtime: entry.mtime,
 768                    path: entry.path.clone(),
 769                    worktree: worktree.clone(),
 770                    is_deleted: false,
 771                    is_private: entry.is_private,
 772                }
 773            } else {
 774                File {
 775                    is_local: true,
 776                    entry_id: old_file.entry_id,
 777                    path: old_file.path.clone(),
 778                    mtime: old_file.mtime,
 779                    worktree: worktree.clone(),
 780                    is_deleted: true,
 781                    is_private: old_file.is_private,
 782                }
 783            };
 784
 785            if new_file == *old_file {
 786                return None;
 787            }
 788
 789            let old_file = Arc::new(old_file.clone());
 790            let new_file = Arc::new(new_file);
 791            buffer.file_updated(new_file.clone(), cx);
 792            Some((old_file, new_file))
 793        })?;
 794
 795        if new_file.path != old_file.path {
 796            self.local_buffer_ids_by_path.remove(&ProjectPath {
 797                path: old_file.path.clone(),
 798                worktree_id: old_file.worktree_id(cx),
 799            });
 800            self.local_buffer_ids_by_path.insert(
 801                ProjectPath {
 802                    worktree_id: new_file.worktree_id(cx),
 803                    path: new_file.path.clone(),
 804                },
 805                buffer_id,
 806            );
 807            cx.emit(BufferStoreEvent::BufferChangedFilePath {
 808                buffer: buffer.clone(),
 809                old_file: Some(old_file.clone()),
 810            });
 811        }
 812
 813        if new_file.entry_id != old_file.entry_id {
 814            if let Some(entry_id) = old_file.entry_id {
 815                self.local_buffer_ids_by_entry_id.remove(&entry_id);
 816            }
 817            if let Some(entry_id) = new_file.entry_id {
 818                self.local_buffer_ids_by_entry_id
 819                    .insert(entry_id, buffer_id);
 820            }
 821        }
 822
 823        cx.emit(BufferStoreEvent::LocalBufferUpdated { buffer });
 824        None
 825    }
 826
 827    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 828        let file = File::from_dyn(buffer.read(cx).file())?;
 829
 830        let remote_id = buffer.read(cx).remote_id();
 831        if let Some(entry_id) = file.entry_id {
 832            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 833                Some(_) => {
 834                    return None;
 835                }
 836                None => {
 837                    self.local_buffer_ids_by_entry_id
 838                        .insert(entry_id, remote_id);
 839                }
 840            }
 841        };
 842        self.local_buffer_ids_by_path.insert(
 843            ProjectPath {
 844                worktree_id: file.worktree_id(cx),
 845                path: file.path.clone(),
 846            },
 847            remote_id,
 848        );
 849
 850        Some(())
 851    }
 852
 853    pub async fn create_buffer_for_peer(
 854        this: Model<Self>,
 855        peer_id: PeerId,
 856        buffer_id: BufferId,
 857        project_id: u64,
 858        client: AnyProtoClient,
 859        cx: &mut AsyncAppContext,
 860    ) -> Result<()> {
 861        let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
 862            return Ok(());
 863        };
 864
 865        let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
 866        let operations = operations.await;
 867        let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
 868
 869        let initial_state = proto::CreateBufferForPeer {
 870            project_id,
 871            peer_id: Some(peer_id),
 872            variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
 873        };
 874
 875        if client.send(initial_state).log_err().is_some() {
 876            let client = client.clone();
 877            cx.background_executor()
 878                .spawn(async move {
 879                    let mut chunks = split_operations(operations).peekable();
 880                    while let Some(chunk) = chunks.next() {
 881                        let is_last = chunks.peek().is_none();
 882                        client.send(proto::CreateBufferForPeer {
 883                            project_id,
 884                            peer_id: Some(peer_id),
 885                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
 886                                proto::BufferChunk {
 887                                    buffer_id: buffer_id.into(),
 888                                    operations: chunk,
 889                                    is_last,
 890                                },
 891                            )),
 892                        })?;
 893                    }
 894                    anyhow::Ok(())
 895                })
 896                .await
 897                .log_err();
 898        }
 899        Ok(())
 900    }
 901
 902    pub fn handle_update_buffer(
 903        &mut self,
 904        envelope: TypedEnvelope<proto::UpdateBuffer>,
 905        is_remote: bool,
 906        cx: &mut AppContext,
 907    ) -> Result<proto::Ack> {
 908        let payload = envelope.payload.clone();
 909        let buffer_id = BufferId::new(payload.buffer_id)?;
 910        let ops = payload
 911            .operations
 912            .into_iter()
 913            .map(language::proto::deserialize_operation)
 914            .collect::<Result<Vec<_>, _>>()?;
 915        match self.opened_buffers.entry(buffer_id) {
 916            hash_map::Entry::Occupied(mut e) => match e.get_mut() {
 917                OpenBuffer::Strong(buffer) => {
 918                    buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
 919                }
 920                OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
 921                OpenBuffer::Weak(_) => {}
 922            },
 923            hash_map::Entry::Vacant(e) => {
 924                if !is_remote {
 925                    debug_panic!(
 926                        "received buffer update from {:?}",
 927                        envelope.original_sender_id
 928                    );
 929                    return Err(anyhow!("received buffer update for non-remote project"));
 930                }
 931                e.insert(OpenBuffer::Operations(ops));
 932            }
 933        }
 934        Ok(proto::Ack {})
 935    }
 936
 937    pub fn handle_create_buffer_for_peer(
 938        &mut self,
 939        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
 940        mut worktrees: impl Iterator<Item = Model<Worktree>>,
 941        replica_id: u16,
 942        capability: Capability,
 943        cx: &mut ModelContext<Self>,
 944    ) -> Result<()> {
 945        match envelope
 946            .payload
 947            .variant
 948            .ok_or_else(|| anyhow!("missing variant"))?
 949        {
 950            proto::create_buffer_for_peer::Variant::State(mut state) => {
 951                let buffer_id = BufferId::new(state.id)?;
 952
 953                let buffer_result = maybe!({
 954                    let mut buffer_file = None;
 955                    if let Some(file) = state.file.take() {
 956                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
 957                        let worktree = worktrees
 958                            .find(|worktree| worktree.read(cx).id() == worktree_id)
 959                            .ok_or_else(|| {
 960                                anyhow!("no worktree found for id {}", file.worktree_id)
 961                            })?;
 962                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
 963                            as Arc<dyn language::File>);
 964                    }
 965                    Buffer::from_proto(replica_id, capability, state, buffer_file)
 966                });
 967
 968                match buffer_result {
 969                    Ok(buffer) => {
 970                        let buffer = cx.new_model(|_| buffer);
 971                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
 972                    }
 973                    Err(error) => {
 974                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
 975                            for listener in listeners {
 976                                listener.send(Err(anyhow!(error.cloned()))).ok();
 977                            }
 978                        }
 979                    }
 980                }
 981            }
 982            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
 983                let buffer_id = BufferId::new(chunk.buffer_id)?;
 984                let buffer = self
 985                    .loading_remote_buffers_by_id
 986                    .get(&buffer_id)
 987                    .cloned()
 988                    .ok_or_else(|| {
 989                        anyhow!(
 990                            "received chunk for buffer {} without initial state",
 991                            chunk.buffer_id
 992                        )
 993                    })?;
 994
 995                let result = maybe!({
 996                    let operations = chunk
 997                        .operations
 998                        .into_iter()
 999                        .map(language::proto::deserialize_operation)
1000                        .collect::<Result<Vec<_>>>()?;
1001                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1002                });
1003
1004                if let Err(error) = result {
1005                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1006                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1007                        for listener in listeners {
1008                            listener.send(Err(error.cloned())).ok();
1009                        }
1010                    }
1011                } else if chunk.is_last {
1012                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1013                    self.add_buffer(buffer, cx)?;
1014                }
1015            }
1016        }
1017
1018        Ok(())
1019    }
1020
1021    pub async fn handle_save_buffer(
1022        this: Model<Self>,
1023        project_id: u64,
1024        envelope: TypedEnvelope<proto::SaveBuffer>,
1025        mut cx: AsyncAppContext,
1026    ) -> Result<proto::BufferSaved> {
1027        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1028        let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??;
1029        buffer
1030            .update(&mut cx, |buffer, _| {
1031                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1032            })?
1033            .await?;
1034        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1035
1036        if let Some(new_path) = envelope.payload.new_path {
1037            let new_path = ProjectPath::from_proto(new_path);
1038            this.update(&mut cx, |this, cx| {
1039                this.save_buffer_as(buffer.clone(), new_path, cx)
1040            })?
1041            .await?;
1042        } else {
1043            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1044                .await?;
1045        }
1046
1047        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1048            project_id,
1049            buffer_id: buffer_id.into(),
1050            version: serialize_version(buffer.saved_version()),
1051            mtime: buffer.saved_mtime().map(|time| time.into()),
1052        })
1053    }
1054
1055    pub async fn handle_buffer_saved(
1056        this: Model<Self>,
1057        envelope: TypedEnvelope<proto::BufferSaved>,
1058        mut cx: AsyncAppContext,
1059    ) -> Result<()> {
1060        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1061        let version = deserialize_version(&envelope.payload.version);
1062        let mtime = envelope.payload.mtime.map(|time| time.into());
1063        this.update(&mut cx, |this, cx| {
1064            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1065                buffer.update(cx, |buffer, cx| {
1066                    buffer.did_save(version, mtime, cx);
1067                });
1068            }
1069        })
1070    }
1071
1072    pub async fn handle_buffer_reloaded(
1073        this: Model<Self>,
1074        envelope: TypedEnvelope<proto::BufferReloaded>,
1075        mut cx: AsyncAppContext,
1076    ) -> Result<()> {
1077        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1078        let version = deserialize_version(&envelope.payload.version);
1079        let mtime = envelope.payload.mtime.map(|time| time.into());
1080        let line_ending = deserialize_line_ending(
1081            proto::LineEnding::from_i32(envelope.payload.line_ending)
1082                .ok_or_else(|| anyhow!("missing line ending"))?,
1083        );
1084        this.update(&mut cx, |this, cx| {
1085            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1086                buffer.update(cx, |buffer, cx| {
1087                    buffer.did_reload(version, line_ending, mtime, cx);
1088                });
1089            }
1090        })
1091    }
1092
1093    pub async fn wait_for_loading_buffer(
1094        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1095    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1096        loop {
1097            if let Some(result) = receiver.borrow().as_ref() {
1098                match result {
1099                    Ok(buffer) => return Ok(buffer.to_owned()),
1100                    Err(e) => return Err(e.to_owned()),
1101                }
1102            }
1103            receiver.next().await;
1104        }
1105    }
1106}
1107
1108impl OpenBuffer {
1109    fn upgrade(&self) -> Option<Model<Buffer>> {
1110        match self {
1111            OpenBuffer::Strong(handle) => Some(handle.clone()),
1112            OpenBuffer::Weak(handle) => handle.upgrade(),
1113            OpenBuffer::Operations(_) => None,
1114        }
1115    }
1116}
1117
1118fn is_not_found_error(error: &anyhow::Error) -> bool {
1119    error
1120        .root_cause()
1121        .downcast_ref::<io::Error>()
1122        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1123}