buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, NoRepositoryError, ProjectPath,
   5};
   6use anyhow::{anyhow, Context as _, Result};
   7use client::Client;
   8use collections::{hash_map, HashMap, HashSet};
   9use fs::Fs;
  10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
  11use git::blame::Blame;
  12use gpui::{
  13    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  14};
  15use http_client::Url;
  16use language::{
  17    proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
  18    Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
  19};
  20use rpc::{
  21    proto::{self, AnyProtoClient, EnvelopedMessage},
  22    ErrorExt as _, TypedEnvelope,
  23};
  24use smol::channel::Receiver;
  25use std::{io, path::Path, str::FromStr as _, sync::Arc};
  26use text::BufferId;
  27use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  28use worktree::{
  29    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
  30    WorktreeId,
  31};
  32
  33/// A set of open buffers.
  34pub struct BufferStore {
  35    remote_id: Option<u64>,
  36    #[allow(unused)]
  37    worktree_store: Model<WorktreeStore>,
  38    opened_buffers: HashMap<BufferId, OpenBuffer>,
  39    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  40    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  41    #[allow(clippy::type_complexity)]
  42    loading_buffers_by_path: HashMap<
  43        ProjectPath,
  44        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  45    >,
  46    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  47    remote_buffer_listeners:
  48        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  49    shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
  50}
  51
  52enum OpenBuffer {
  53    Strong(Model<Buffer>),
  54    Weak(WeakModel<Buffer>),
  55    Operations(Vec<Operation>),
  56}
  57
  58pub enum BufferStoreEvent {
  59    BufferAdded(Model<Buffer>),
  60    BufferDropped(BufferId),
  61    BufferChangedFilePath {
  62        buffer: Model<Buffer>,
  63        old_file: Option<Arc<dyn language::File>>,
  64    },
  65    MessageToReplicas(Box<proto::Envelope>),
  66}
  67
  68impl EventEmitter<BufferStoreEvent> for BufferStore {}
  69
  70impl BufferStore {
  71    /// Creates a buffer store, optionally retaining its buffers.
  72    ///
  73    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
  74    /// and won't be released unless they are explicitly removed, or `retain_buffers`
  75    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
  76    /// weak handles.
  77    pub fn new(
  78        worktree_store: Model<WorktreeStore>,
  79        remote_id: Option<u64>,
  80        cx: &mut ModelContext<Self>,
  81    ) -> Self {
  82        cx.subscribe(&worktree_store, |this, _, event, cx| match event {
  83            WorktreeStoreEvent::WorktreeAdded(worktree) => {
  84                this.subscribe_to_worktree(worktree, cx);
  85            }
  86            _ => {}
  87        })
  88        .detach();
  89
  90        Self {
  91            remote_id,
  92            worktree_store,
  93            opened_buffers: Default::default(),
  94            remote_buffer_listeners: Default::default(),
  95            loading_remote_buffers_by_id: Default::default(),
  96            local_buffer_ids_by_path: Default::default(),
  97            local_buffer_ids_by_entry_id: Default::default(),
  98            loading_buffers_by_path: Default::default(),
  99            shared_buffers: Default::default(),
 100        }
 101    }
 102
 103    pub fn open_buffer(
 104        &mut self,
 105        project_path: ProjectPath,
 106        cx: &mut ModelContext<Self>,
 107    ) -> Task<Result<Model<Buffer>>> {
 108        let existing_buffer = self.get_by_path(&project_path, cx);
 109        if let Some(existing_buffer) = existing_buffer {
 110            return Task::ready(Ok(existing_buffer));
 111        }
 112
 113        let Some(worktree) = self
 114            .worktree_store
 115            .read(cx)
 116            .worktree_for_id(project_path.worktree_id, cx)
 117        else {
 118            return Task::ready(Err(anyhow!("no such worktree")));
 119        };
 120
 121        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 122            // If the given path is already being loaded, then wait for that existing
 123            // task to complete and return the same buffer.
 124            hash_map::Entry::Occupied(e) => e.get().clone(),
 125
 126            // Otherwise, record the fact that this path is now being loaded.
 127            hash_map::Entry::Vacant(entry) => {
 128                let (mut tx, rx) = postage::watch::channel();
 129                entry.insert(rx.clone());
 130
 131                let project_path = project_path.clone();
 132                let load_buffer = match worktree.read(cx) {
 133                    Worktree::Local(_) => {
 134                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
 135                    }
 136                    Worktree::Remote(tree) => {
 137                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
 138                    }
 139                };
 140
 141                cx.spawn(move |this, mut cx| async move {
 142                    let load_result = load_buffer.await;
 143                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
 144                        // Record the fact that the buffer is no longer loading.
 145                        this.loading_buffers_by_path.remove(&project_path);
 146                        let buffer = load_result.map_err(Arc::new)?;
 147                        Ok(buffer)
 148                    })?);
 149                    anyhow::Ok(())
 150                })
 151                .detach();
 152                rx
 153            }
 154        };
 155
 156        cx.background_executor().spawn(async move {
 157            Self::wait_for_loading_buffer(loading_watch)
 158                .await
 159                .map_err(|e| e.cloned())
 160        })
 161    }
 162
 163    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 164        cx.subscribe(worktree, |this, worktree, event, cx| {
 165            if worktree.read(cx).is_local() {
 166                match event {
 167                    worktree::Event::UpdatedEntries(changes) => {
 168                        this.local_worktree_entries_changed(&worktree, changes, cx);
 169                    }
 170                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 171                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 172                    }
 173                    _ => {}
 174                }
 175            }
 176        })
 177        .detach();
 178    }
 179
 180    fn local_worktree_entries_changed(
 181        &mut self,
 182        worktree_handle: &Model<Worktree>,
 183        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 184        cx: &mut ModelContext<Self>,
 185    ) {
 186        let snapshot = worktree_handle.read(cx).snapshot();
 187        for (path, entry_id, _) in changes {
 188            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 189        }
 190    }
 191
 192    fn local_worktree_git_repos_changed(
 193        &mut self,
 194        worktree_handle: Model<Worktree>,
 195        changed_repos: &UpdatedGitRepositoriesSet,
 196        cx: &mut ModelContext<Self>,
 197    ) {
 198        debug_assert!(worktree_handle.read(cx).is_local());
 199
 200        // Identify the loading buffers whose containing repository that has changed.
 201        let future_buffers = self
 202            .loading_buffers()
 203            .filter_map(|(project_path, receiver)| {
 204                if project_path.worktree_id != worktree_handle.read(cx).id() {
 205                    return None;
 206                }
 207                let path = &project_path.path;
 208                changed_repos
 209                    .iter()
 210                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 211                let path = path.clone();
 212                Some(async move {
 213                    Self::wait_for_loading_buffer(receiver)
 214                        .await
 215                        .ok()
 216                        .map(|buffer| (buffer, path))
 217                })
 218            })
 219            .collect::<FuturesUnordered<_>>();
 220
 221        // Identify the current buffers whose containing repository has changed.
 222        let current_buffers = self
 223            .buffers()
 224            .filter_map(|buffer| {
 225                let file = File::from_dyn(buffer.read(cx).file())?;
 226                if file.worktree != worktree_handle {
 227                    return None;
 228                }
 229                changed_repos
 230                    .iter()
 231                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 232                Some((buffer, file.path.clone()))
 233            })
 234            .collect::<Vec<_>>();
 235
 236        if future_buffers.len() + current_buffers.len() == 0 {
 237            return;
 238        }
 239
 240        cx.spawn(move |this, mut cx| async move {
 241            // Wait for all of the buffers to load.
 242            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 243
 244            // Reload the diff base for every buffer whose containing git repository has changed.
 245            let snapshot =
 246                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 247            let diff_bases_by_buffer = cx
 248                .background_executor()
 249                .spawn(async move {
 250                    let mut diff_base_tasks = future_buffers
 251                        .into_iter()
 252                        .flatten()
 253                        .chain(current_buffers)
 254                        .filter_map(|(buffer, path)| {
 255                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 256                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 257                            Some(async move {
 258                                let base_text =
 259                                    local_repo_entry.repo().load_index_text(&relative_path);
 260                                Some((buffer, base_text))
 261                            })
 262                        })
 263                        .collect::<FuturesUnordered<_>>();
 264
 265                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 266                    while let Some(diff_base) = diff_base_tasks.next().await {
 267                        if let Some(diff_base) = diff_base {
 268                            diff_bases.push(diff_base);
 269                        }
 270                    }
 271                    diff_bases
 272                })
 273                .await;
 274
 275            this.update(&mut cx, |this, cx| {
 276                // Assign the new diff bases on all of the buffers.
 277                for (buffer, diff_base) in diff_bases_by_buffer {
 278                    let buffer_id = buffer.update(cx, |buffer, cx| {
 279                        buffer.set_diff_base(diff_base.clone(), cx);
 280                        buffer.remote_id().to_proto()
 281                    });
 282                    if let Some(project_id) = this.remote_id {
 283                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 284                            proto::UpdateDiffBase {
 285                                project_id,
 286                                buffer_id,
 287                                diff_base,
 288                            }
 289                            .into_envelope(0, None, None),
 290                        )))
 291                    }
 292                }
 293            })
 294        })
 295        .detach_and_log_err(cx);
 296    }
 297
 298    fn open_local_buffer_internal(
 299        &mut self,
 300        path: Arc<Path>,
 301        worktree: Model<Worktree>,
 302        cx: &mut ModelContext<Self>,
 303    ) -> Task<Result<Model<Buffer>>> {
 304        let load_buffer = worktree.update(cx, |worktree, cx| {
 305            let load_file = worktree.load_file(path.as_ref(), cx);
 306            let reservation = cx.reserve_model();
 307            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 308            cx.spawn(move |_, mut cx| async move {
 309                let loaded = load_file.await?;
 310                let text_buffer = cx
 311                    .background_executor()
 312                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 313                    .await;
 314                cx.insert_model(reservation, |_| {
 315                    Buffer::build(
 316                        text_buffer,
 317                        loaded.diff_base,
 318                        Some(loaded.file),
 319                        Capability::ReadWrite,
 320                    )
 321                })
 322            })
 323        });
 324
 325        cx.spawn(move |this, mut cx| async move {
 326            let buffer = match load_buffer.await {
 327                Ok(buffer) => Ok(buffer),
 328                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 329                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 330                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 331                    Buffer::build(
 332                        text_buffer,
 333                        None,
 334                        Some(Arc::new(File {
 335                            worktree,
 336                            path,
 337                            mtime: None,
 338                            entry_id: None,
 339                            is_local: true,
 340                            is_deleted: false,
 341                            is_private: false,
 342                        })),
 343                        Capability::ReadWrite,
 344                    )
 345                }),
 346                Err(e) => Err(e),
 347            }?;
 348            this.update(&mut cx, |this, cx| {
 349                this.add_buffer(buffer.clone(), cx).log_err();
 350            })?;
 351            Ok(buffer)
 352        })
 353    }
 354
 355    fn open_remote_buffer_internal(
 356        &self,
 357        path: &Arc<Path>,
 358        worktree: &RemoteWorktree,
 359        cx: &ModelContext<Self>,
 360    ) -> Task<Result<Model<Buffer>>> {
 361        let worktree_id = worktree.id().to_proto();
 362        let project_id = worktree.project_id();
 363        let client = worktree.client();
 364        let path_string = path.clone().to_string_lossy().to_string();
 365        cx.spawn(move |this, mut cx| async move {
 366            let response = client
 367                .request(proto::OpenBufferByPath {
 368                    project_id,
 369                    worktree_id,
 370                    path: path_string,
 371                })
 372                .await?;
 373            let buffer_id = BufferId::new(response.buffer_id)?;
 374            this.update(&mut cx, |this, cx| {
 375                this.wait_for_remote_buffer(buffer_id, cx)
 376            })?
 377            .await
 378        })
 379    }
 380
 381    pub fn create_buffer(
 382        &mut self,
 383        remote_client: Option<(AnyProtoClient, u64)>,
 384        cx: &mut ModelContext<Self>,
 385    ) -> Task<Result<Model<Buffer>>> {
 386        if let Some((remote_client, project_id)) = remote_client {
 387            let create = remote_client.request(proto::OpenNewBuffer { project_id });
 388            cx.spawn(|this, mut cx| async move {
 389                let response = create.await?;
 390                let buffer_id = BufferId::new(response.buffer_id)?;
 391
 392                this.update(&mut cx, |this, cx| {
 393                    this.wait_for_remote_buffer(buffer_id, cx)
 394                })?
 395                .await
 396            })
 397        } else {
 398            Task::ready(Ok(self.create_local_buffer("", None, cx)))
 399        }
 400    }
 401
 402    pub fn create_local_buffer(
 403        &mut self,
 404        text: &str,
 405        language: Option<Arc<Language>>,
 406        cx: &mut ModelContext<Self>,
 407    ) -> Model<Buffer> {
 408        let buffer = cx.new_model(|cx| {
 409            Buffer::local(text, cx)
 410                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 411        });
 412        self.add_buffer(buffer.clone(), cx).log_err();
 413        buffer
 414    }
 415
 416    pub fn save_buffer(
 417        &mut self,
 418        buffer: Model<Buffer>,
 419        cx: &mut ModelContext<Self>,
 420    ) -> Task<Result<()>> {
 421        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 422            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 423        };
 424        match file.worktree.read(cx) {
 425            Worktree::Local(_) => {
 426                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
 427            }
 428            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
 429        }
 430    }
 431
 432    pub fn save_buffer_as(
 433        &mut self,
 434        buffer: Model<Buffer>,
 435        path: ProjectPath,
 436        cx: &mut ModelContext<Self>,
 437    ) -> Task<Result<()>> {
 438        let Some(worktree) = self
 439            .worktree_store
 440            .read(cx)
 441            .worktree_for_id(path.worktree_id, cx)
 442        else {
 443            return Task::ready(Err(anyhow!("no such worktree")));
 444        };
 445
 446        let old_file = buffer.read(cx).file().cloned();
 447
 448        let task = match worktree.read(cx) {
 449            Worktree::Local(_) => {
 450                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
 451            }
 452            Worktree::Remote(tree) => {
 453                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
 454            }
 455        };
 456        cx.spawn(|this, mut cx| async move {
 457            task.await?;
 458            this.update(&mut cx, |_, cx| {
 459                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 460            })
 461        })
 462    }
 463
 464    fn save_local_buffer(
 465        &self,
 466        worktree: Model<Worktree>,
 467        buffer_handle: Model<Buffer>,
 468        path: Arc<Path>,
 469        mut has_changed_file: bool,
 470        cx: &mut ModelContext<Self>,
 471    ) -> Task<Result<()>> {
 472        let buffer = buffer_handle.read(cx);
 473        let text = buffer.as_rope().clone();
 474        let line_ending = buffer.line_ending();
 475        let version = buffer.version();
 476        let buffer_id = buffer.remote_id();
 477        if buffer.file().is_some_and(|file| !file.is_created()) {
 478            has_changed_file = true;
 479        }
 480
 481        let save = worktree.update(cx, |worktree, cx| {
 482            worktree.write_file(path.as_ref(), text, line_ending, cx)
 483        });
 484
 485        cx.spawn(move |this, mut cx| async move {
 486            let new_file = save.await?;
 487            let mtime = new_file.mtime;
 488            this.update(&mut cx, |this, cx| {
 489                if let Some(project_id) = this.remote_id {
 490                    if has_changed_file {
 491                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 492                            proto::UpdateBufferFile {
 493                                project_id,
 494                                buffer_id: buffer_id.to_proto(),
 495                                file: Some(language::File::to_proto(&*new_file, cx)),
 496                            }
 497                            .into_envelope(0, None, None),
 498                        )));
 499                    }
 500                    cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
 501                        proto::BufferSaved {
 502                            project_id,
 503                            buffer_id: buffer_id.to_proto(),
 504                            version: serialize_version(&version),
 505                            mtime: mtime.map(|time| time.into()),
 506                        }
 507                        .into_envelope(0, None, None),
 508                    )));
 509                }
 510            })?;
 511            buffer_handle.update(&mut cx, |buffer, cx| {
 512                if has_changed_file {
 513                    buffer.file_updated(new_file, cx);
 514                }
 515                buffer.did_save(version.clone(), mtime, cx);
 516            })
 517        })
 518    }
 519
 520    fn save_remote_buffer(
 521        &self,
 522        buffer_handle: Model<Buffer>,
 523        new_path: Option<proto::ProjectPath>,
 524        tree: &RemoteWorktree,
 525        cx: &ModelContext<Self>,
 526    ) -> Task<Result<()>> {
 527        let buffer = buffer_handle.read(cx);
 528        let buffer_id = buffer.remote_id().into();
 529        let version = buffer.version();
 530        let rpc = tree.client();
 531        let project_id = tree.project_id();
 532        cx.spawn(move |_, mut cx| async move {
 533            let response = rpc
 534                .request(proto::SaveBuffer {
 535                    project_id,
 536                    buffer_id,
 537                    new_path,
 538                    version: serialize_version(&version),
 539                })
 540                .await?;
 541            let version = deserialize_version(&response.version);
 542            let mtime = response.mtime.map(|mtime| mtime.into());
 543
 544            buffer_handle.update(&mut cx, |buffer, cx| {
 545                buffer.did_save(version.clone(), mtime, cx);
 546            })?;
 547
 548            Ok(())
 549        })
 550    }
 551
 552    pub fn blame_buffer(
 553        &self,
 554        buffer: &Model<Buffer>,
 555        version: Option<clock::Global>,
 556        cx: &AppContext,
 557    ) -> Task<Result<Blame>> {
 558        let buffer = buffer.read(cx);
 559        let Some(file) = File::from_dyn(buffer.file()) else {
 560            return Task::ready(Err(anyhow!("buffer has no file")));
 561        };
 562
 563        match file.worktree.clone().read(cx) {
 564            Worktree::Local(worktree) => {
 565                let worktree = worktree.snapshot();
 566                let blame_params = maybe!({
 567                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
 568                        Some(repo_for_path) => repo_for_path,
 569                        None => anyhow::bail!(NoRepositoryError {}),
 570                    };
 571
 572                    let relative_path = repo_entry
 573                        .relativize(&worktree, &file.path)
 574                        .context("failed to relativize buffer path")?;
 575
 576                    let repo = local_repo_entry.repo().clone();
 577
 578                    let content = match version {
 579                        Some(version) => buffer.rope_for_version(&version).clone(),
 580                        None => buffer.as_rope().clone(),
 581                    };
 582
 583                    anyhow::Ok((repo, relative_path, content))
 584                });
 585
 586                cx.background_executor().spawn(async move {
 587                    let (repo, relative_path, content) = blame_params?;
 588                    repo.blame(&relative_path, content)
 589                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
 590                })
 591            }
 592            Worktree::Remote(worktree) => {
 593                let buffer_id = buffer.remote_id();
 594                let version = buffer.version();
 595                let project_id = worktree.project_id();
 596                let client = worktree.client();
 597                cx.spawn(|_| async move {
 598                    let response = client
 599                        .request(proto::BlameBuffer {
 600                            project_id,
 601                            buffer_id: buffer_id.into(),
 602                            version: serialize_version(&version),
 603                        })
 604                        .await?;
 605                    Ok(deserialize_blame_buffer_response(response))
 606                })
 607            }
 608        }
 609    }
 610
 611    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
 612        let remote_id = buffer.read(cx).remote_id();
 613        let is_remote = buffer.read(cx).replica_id() != 0;
 614        let open_buffer = if self.remote_id.is_some() {
 615            OpenBuffer::Strong(buffer.clone())
 616        } else {
 617            OpenBuffer::Weak(buffer.downgrade())
 618        };
 619
 620        let handle = cx.handle().downgrade();
 621        buffer.update(cx, move |_, cx| {
 622            cx.on_release(move |buffer, cx| {
 623                handle
 624                    .update(cx, |_, cx| {
 625                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
 626                    })
 627                    .ok();
 628            })
 629            .detach()
 630        });
 631
 632        match self.opened_buffers.entry(remote_id) {
 633            hash_map::Entry::Vacant(entry) => {
 634                entry.insert(open_buffer);
 635            }
 636            hash_map::Entry::Occupied(mut entry) => {
 637                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 638                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
 639                } else if entry.get().upgrade().is_some() {
 640                    if is_remote {
 641                        return Ok(());
 642                    } else {
 643                        debug_panic!("buffer {} was already registered", remote_id);
 644                        Err(anyhow!("buffer {} was already registered", remote_id))?;
 645                    }
 646                }
 647                entry.insert(open_buffer);
 648            }
 649        }
 650
 651        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
 652            for sender in senders {
 653                sender.send(Ok(buffer.clone())).ok();
 654            }
 655        }
 656
 657        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 658            if file.is_local {
 659                self.local_buffer_ids_by_path.insert(
 660                    ProjectPath {
 661                        worktree_id: file.worktree_id(cx),
 662                        path: file.path.clone(),
 663                    },
 664                    remote_id,
 665                );
 666
 667                if let Some(entry_id) = file.entry_id {
 668                    self.local_buffer_ids_by_entry_id
 669                        .insert(entry_id, remote_id);
 670                }
 671            }
 672        }
 673
 674        cx.subscribe(&buffer, Self::on_buffer_event).detach();
 675        cx.emit(BufferStoreEvent::BufferAdded(buffer));
 676        Ok(())
 677    }
 678
 679    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
 680        self.opened_buffers
 681            .values()
 682            .filter_map(|buffer| buffer.upgrade())
 683    }
 684
 685    pub fn loading_buffers(
 686        &self,
 687    ) -> impl Iterator<
 688        Item = (
 689            &ProjectPath,
 690            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 691        ),
 692    > {
 693        self.loading_buffers_by_path
 694            .iter()
 695            .map(|(path, rx)| (path, rx.clone()))
 696    }
 697
 698    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
 699        self.buffers().find_map(|buffer| {
 700            let file = File::from_dyn(buffer.read(cx).file())?;
 701            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
 702                Some(buffer)
 703            } else {
 704                None
 705            }
 706        })
 707    }
 708
 709    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 710        self.opened_buffers
 711            .get(&buffer_id)
 712            .and_then(|buffer| buffer.upgrade())
 713    }
 714
 715    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
 716        self.get(buffer_id)
 717            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
 718    }
 719
 720    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 721        self.get(buffer_id)
 722            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 723    }
 724
 725    pub fn wait_for_remote_buffer(
 726        &mut self,
 727        id: BufferId,
 728        cx: &mut AppContext,
 729    ) -> Task<Result<Model<Buffer>>> {
 730        let buffer = self.get(id);
 731        if let Some(buffer) = buffer {
 732            return Task::ready(Ok(buffer));
 733        }
 734        let (tx, rx) = oneshot::channel();
 735        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 736        cx.background_executor().spawn(async move { rx.await? })
 737    }
 738
 739    pub fn buffer_version_info(
 740        &self,
 741        cx: &AppContext,
 742    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 743        let buffers = self
 744            .buffers()
 745            .map(|buffer| {
 746                let buffer = buffer.read(cx);
 747                proto::BufferVersion {
 748                    id: buffer.remote_id().into(),
 749                    version: language::proto::serialize_version(&buffer.version),
 750                }
 751            })
 752            .collect();
 753        let incomplete_buffer_ids = self
 754            .loading_remote_buffers_by_id
 755            .keys()
 756            .copied()
 757            .collect::<Vec<_>>();
 758        (buffers, incomplete_buffer_ids)
 759    }
 760
 761    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 762        self.set_remote_id(None, cx);
 763
 764        for buffer in self.buffers() {
 765            buffer.update(cx, |buffer, cx| {
 766                buffer.set_capability(Capability::ReadOnly, cx)
 767            });
 768        }
 769
 770        // Wake up all futures currently waiting on a buffer to get opened,
 771        // to give them a chance to fail now that we've disconnected.
 772        self.remote_buffer_listeners.clear();
 773    }
 774
 775    pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
 776        self.remote_id = remote_id;
 777        for open_buffer in self.opened_buffers.values_mut() {
 778            if remote_id.is_some() {
 779                if let OpenBuffer::Weak(buffer) = open_buffer {
 780                    if let Some(buffer) = buffer.upgrade() {
 781                        *open_buffer = OpenBuffer::Strong(buffer);
 782                    }
 783                }
 784            } else {
 785                if let Some(buffer) = open_buffer.upgrade() {
 786                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 787                }
 788                if let OpenBuffer::Strong(buffer) = open_buffer {
 789                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
 790                }
 791            }
 792        }
 793    }
 794
 795    pub fn discard_incomplete(&mut self) {
 796        self.opened_buffers
 797            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
 798    }
 799
 800    pub fn find_search_candidates(
 801        &mut self,
 802        query: &SearchQuery,
 803        mut limit: usize,
 804        fs: Arc<dyn Fs>,
 805        cx: &mut ModelContext<Self>,
 806    ) -> Receiver<Model<Buffer>> {
 807        let (tx, rx) = smol::channel::unbounded();
 808        let mut open_buffers = HashSet::default();
 809        let mut unnamed_buffers = Vec::new();
 810        for handle in self.buffers() {
 811            let buffer = handle.read(cx);
 812            if let Some(entry_id) = buffer.entry_id(cx) {
 813                open_buffers.insert(entry_id);
 814            } else {
 815                limit = limit.saturating_sub(1);
 816                unnamed_buffers.push(handle)
 817            };
 818        }
 819
 820        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 821        let mut project_paths_rx = self
 822            .worktree_store
 823            .update(cx, |worktree_store, cx| {
 824                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
 825            })
 826            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
 827
 828        cx.spawn(|this, mut cx| async move {
 829            for buffer in unnamed_buffers {
 830                tx.send(buffer).await.ok();
 831            }
 832
 833            while let Some(project_paths) = project_paths_rx.next().await {
 834                let buffers = this.update(&mut cx, |this, cx| {
 835                    project_paths
 836                        .into_iter()
 837                        .map(|project_path| this.open_buffer(project_path, cx))
 838                        .collect::<Vec<_>>()
 839                })?;
 840                for buffer_task in buffers {
 841                    if let Some(buffer) = buffer_task.await.log_err() {
 842                        if tx.send(buffer).await.is_err() {
 843                            return anyhow::Ok(());
 844                        }
 845                    }
 846                }
 847            }
 848            anyhow::Ok(())
 849        })
 850        .detach();
 851        rx
 852    }
 853
 854    fn on_buffer_event(
 855        &mut self,
 856        buffer: Model<Buffer>,
 857        event: &BufferEvent,
 858        cx: &mut ModelContext<Self>,
 859    ) {
 860        match event {
 861            BufferEvent::FileHandleChanged => {
 862                self.buffer_changed_file(buffer, cx);
 863            }
 864            _ => {}
 865        }
 866    }
 867
 868    fn local_worktree_entry_changed(
 869        &mut self,
 870        entry_id: ProjectEntryId,
 871        path: &Arc<Path>,
 872        worktree: &Model<worktree::Worktree>,
 873        snapshot: &worktree::Snapshot,
 874        cx: &mut ModelContext<Self>,
 875    ) -> Option<()> {
 876        let project_path = ProjectPath {
 877            worktree_id: snapshot.id(),
 878            path: path.clone(),
 879        };
 880        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 881            Some(&buffer_id) => buffer_id,
 882            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 883        };
 884        let buffer = if let Some(buffer) = self.get(buffer_id) {
 885            buffer
 886        } else {
 887            self.opened_buffers.remove(&buffer_id);
 888            self.local_buffer_ids_by_path.remove(&project_path);
 889            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 890            return None;
 891        };
 892
 893        let events = buffer.update(cx, |buffer, cx| {
 894            let file = buffer.file()?;
 895            let old_file = File::from_dyn(Some(file))?;
 896            if old_file.worktree != *worktree {
 897                return None;
 898            }
 899
 900            let new_file = if let Some(entry) = old_file
 901                .entry_id
 902                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 903            {
 904                File {
 905                    is_local: true,
 906                    entry_id: Some(entry.id),
 907                    mtime: entry.mtime,
 908                    path: entry.path.clone(),
 909                    worktree: worktree.clone(),
 910                    is_deleted: false,
 911                    is_private: entry.is_private,
 912                }
 913            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 914                File {
 915                    is_local: true,
 916                    entry_id: Some(entry.id),
 917                    mtime: entry.mtime,
 918                    path: entry.path.clone(),
 919                    worktree: worktree.clone(),
 920                    is_deleted: false,
 921                    is_private: entry.is_private,
 922                }
 923            } else {
 924                File {
 925                    is_local: true,
 926                    entry_id: old_file.entry_id,
 927                    path: old_file.path.clone(),
 928                    mtime: old_file.mtime,
 929                    worktree: worktree.clone(),
 930                    is_deleted: true,
 931                    is_private: old_file.is_private,
 932                }
 933            };
 934
 935            if new_file == *old_file {
 936                return None;
 937            }
 938
 939            let mut events = Vec::new();
 940            if new_file.path != old_file.path {
 941                self.local_buffer_ids_by_path.remove(&ProjectPath {
 942                    path: old_file.path.clone(),
 943                    worktree_id: old_file.worktree_id(cx),
 944                });
 945                self.local_buffer_ids_by_path.insert(
 946                    ProjectPath {
 947                        worktree_id: new_file.worktree_id(cx),
 948                        path: new_file.path.clone(),
 949                    },
 950                    buffer_id,
 951                );
 952                events.push(BufferStoreEvent::BufferChangedFilePath {
 953                    buffer: cx.handle(),
 954                    old_file: buffer.file().cloned(),
 955                });
 956            }
 957
 958            if new_file.entry_id != old_file.entry_id {
 959                if let Some(entry_id) = old_file.entry_id {
 960                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
 961                }
 962                if let Some(entry_id) = new_file.entry_id {
 963                    self.local_buffer_ids_by_entry_id
 964                        .insert(entry_id, buffer_id);
 965                }
 966            }
 967
 968            if let Some(project_id) = self.remote_id {
 969                events.push(BufferStoreEvent::MessageToReplicas(Box::new(
 970                    proto::UpdateBufferFile {
 971                        project_id,
 972                        buffer_id: buffer_id.to_proto(),
 973                        file: Some(new_file.to_proto(cx)),
 974                    }
 975                    .into_envelope(0, None, None),
 976                )))
 977            }
 978
 979            buffer.file_updated(Arc::new(new_file), cx);
 980            Some(events)
 981        })?;
 982
 983        for event in events {
 984            cx.emit(event);
 985        }
 986
 987        None
 988    }
 989
 990    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
 991        let file = File::from_dyn(buffer.read(cx).file())?;
 992
 993        let remote_id = buffer.read(cx).remote_id();
 994        if let Some(entry_id) = file.entry_id {
 995            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 996                Some(_) => {
 997                    return None;
 998                }
 999                None => {
1000                    self.local_buffer_ids_by_entry_id
1001                        .insert(entry_id, remote_id);
1002                }
1003            }
1004        };
1005        self.local_buffer_ids_by_path.insert(
1006            ProjectPath {
1007                worktree_id: file.worktree_id(cx),
1008                path: file.path.clone(),
1009            },
1010            remote_id,
1011        );
1012
1013        Some(())
1014    }
1015
1016    pub async fn handle_update_buffer(
1017        this: Model<Self>,
1018        envelope: TypedEnvelope<proto::UpdateBuffer>,
1019        mut cx: AsyncAppContext,
1020    ) -> Result<proto::Ack> {
1021        let payload = envelope.payload.clone();
1022        let buffer_id = BufferId::new(payload.buffer_id)?;
1023        let ops = payload
1024            .operations
1025            .into_iter()
1026            .map(language::proto::deserialize_operation)
1027            .collect::<Result<Vec<_>, _>>()?;
1028        this.update(&mut cx, |this, cx| {
1029            match this.opened_buffers.entry(buffer_id) {
1030                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1031                    OpenBuffer::Strong(buffer) => {
1032                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1033                    }
1034                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1035                    OpenBuffer::Weak(_) => {}
1036                },
1037                hash_map::Entry::Vacant(e) => {
1038                    e.insert(OpenBuffer::Operations(ops));
1039                }
1040            }
1041            Ok(proto::Ack {})
1042        })?
1043    }
1044
1045    pub fn handle_synchronize_buffers(
1046        &mut self,
1047        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1048        cx: &mut ModelContext<Self>,
1049        client: Arc<Client>,
1050    ) -> Result<proto::SynchronizeBuffersResponse> {
1051        let project_id = envelope.payload.project_id;
1052        let mut response = proto::SynchronizeBuffersResponse {
1053            buffers: Default::default(),
1054        };
1055        let Some(guest_id) = envelope.original_sender_id else {
1056            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1057        };
1058
1059        self.shared_buffers.entry(guest_id).or_default().clear();
1060        for buffer in envelope.payload.buffers {
1061            let buffer_id = BufferId::new(buffer.id)?;
1062            let remote_version = language::proto::deserialize_version(&buffer.version);
1063            if let Some(buffer) = self.get(buffer_id) {
1064                self.shared_buffers
1065                    .entry(guest_id)
1066                    .or_default()
1067                    .insert(buffer_id);
1068
1069                let buffer = buffer.read(cx);
1070                response.buffers.push(proto::BufferVersion {
1071                    id: buffer_id.into(),
1072                    version: language::proto::serialize_version(&buffer.version),
1073                });
1074
1075                let operations = buffer.serialize_ops(Some(remote_version), cx);
1076                let client = client.clone();
1077                if let Some(file) = buffer.file() {
1078                    client
1079                        .send(proto::UpdateBufferFile {
1080                            project_id,
1081                            buffer_id: buffer_id.into(),
1082                            file: Some(file.to_proto(cx)),
1083                        })
1084                        .log_err();
1085                }
1086
1087                client
1088                    .send(proto::UpdateDiffBase {
1089                        project_id,
1090                        buffer_id: buffer_id.into(),
1091                        diff_base: buffer.diff_base().map(ToString::to_string),
1092                    })
1093                    .log_err();
1094
1095                client
1096                    .send(proto::BufferReloaded {
1097                        project_id,
1098                        buffer_id: buffer_id.into(),
1099                        version: language::proto::serialize_version(buffer.saved_version()),
1100                        mtime: buffer.saved_mtime().map(|time| time.into()),
1101                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1102                            as i32,
1103                    })
1104                    .log_err();
1105
1106                cx.background_executor()
1107                    .spawn(
1108                        async move {
1109                            let operations = operations.await;
1110                            for chunk in split_operations(operations) {
1111                                client
1112                                    .request(proto::UpdateBuffer {
1113                                        project_id,
1114                                        buffer_id: buffer_id.into(),
1115                                        operations: chunk,
1116                                    })
1117                                    .await?;
1118                            }
1119                            anyhow::Ok(())
1120                        }
1121                        .log_err(),
1122                    )
1123                    .detach();
1124            }
1125        }
1126        Ok(response)
1127    }
1128
1129    pub fn handle_create_buffer_for_peer(
1130        &mut self,
1131        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1132        replica_id: u16,
1133        capability: Capability,
1134        cx: &mut ModelContext<Self>,
1135    ) -> Result<()> {
1136        match envelope
1137            .payload
1138            .variant
1139            .ok_or_else(|| anyhow!("missing variant"))?
1140        {
1141            proto::create_buffer_for_peer::Variant::State(mut state) => {
1142                let buffer_id = BufferId::new(state.id)?;
1143
1144                let buffer_result = maybe!({
1145                    let mut buffer_file = None;
1146                    if let Some(file) = state.file.take() {
1147                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1148                        let worktree = self
1149                            .worktree_store
1150                            .read(cx)
1151                            .worktree_for_id(worktree_id, cx)
1152                            .ok_or_else(|| {
1153                                anyhow!("no worktree found for id {}", file.worktree_id)
1154                            })?;
1155                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1156                            as Arc<dyn language::File>);
1157                    }
1158                    Buffer::from_proto(replica_id, capability, state, buffer_file)
1159                });
1160
1161                match buffer_result {
1162                    Ok(buffer) => {
1163                        let buffer = cx.new_model(|_| buffer);
1164                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1165                    }
1166                    Err(error) => {
1167                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1168                            for listener in listeners {
1169                                listener.send(Err(anyhow!(error.cloned()))).ok();
1170                            }
1171                        }
1172                    }
1173                }
1174            }
1175            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1176                let buffer_id = BufferId::new(chunk.buffer_id)?;
1177                let buffer = self
1178                    .loading_remote_buffers_by_id
1179                    .get(&buffer_id)
1180                    .cloned()
1181                    .ok_or_else(|| {
1182                        anyhow!(
1183                            "received chunk for buffer {} without initial state",
1184                            chunk.buffer_id
1185                        )
1186                    })?;
1187
1188                let result = maybe!({
1189                    let operations = chunk
1190                        .operations
1191                        .into_iter()
1192                        .map(language::proto::deserialize_operation)
1193                        .collect::<Result<Vec<_>>>()?;
1194                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1195                });
1196
1197                if let Err(error) = result {
1198                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1199                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1200                        for listener in listeners {
1201                            listener.send(Err(error.cloned())).ok();
1202                        }
1203                    }
1204                } else if chunk.is_last {
1205                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1206                    self.add_buffer(buffer, cx)?;
1207                }
1208            }
1209        }
1210
1211        Ok(())
1212    }
1213
1214    pub async fn handle_update_buffer_file(
1215        this: Model<Self>,
1216        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1217        mut cx: AsyncAppContext,
1218    ) -> Result<()> {
1219        let buffer_id = envelope.payload.buffer_id;
1220        let buffer_id = BufferId::new(buffer_id)?;
1221
1222        this.update(&mut cx, |this, cx| {
1223            let payload = envelope.payload.clone();
1224            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1225                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1226                let worktree = this
1227                    .worktree_store
1228                    .read(cx)
1229                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1230                    .ok_or_else(|| anyhow!("no such worktree"))?;
1231                let file = File::from_proto(file, worktree, cx)?;
1232                let old_file = buffer.update(cx, |buffer, cx| {
1233                    let old_file = buffer.file().cloned();
1234                    let new_path = file.path.clone();
1235                    buffer.file_updated(Arc::new(file), cx);
1236                    if old_file
1237                        .as_ref()
1238                        .map_or(true, |old| *old.path() != new_path)
1239                    {
1240                        Some(old_file)
1241                    } else {
1242                        None
1243                    }
1244                });
1245                if let Some(old_file) = old_file {
1246                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1247                }
1248            }
1249            Ok(())
1250        })?
1251    }
1252
1253    pub async fn handle_update_diff_base(
1254        this: Model<Self>,
1255        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1256        mut cx: AsyncAppContext,
1257    ) -> Result<()> {
1258        this.update(&mut cx, |this, cx| {
1259            let buffer_id = envelope.payload.buffer_id;
1260            let buffer_id = BufferId::new(buffer_id)?;
1261            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1262                buffer.update(cx, |buffer, cx| {
1263                    buffer.set_diff_base(envelope.payload.diff_base, cx)
1264                });
1265            }
1266            Ok(())
1267        })?
1268    }
1269
1270    pub async fn handle_save_buffer(
1271        this: Model<Self>,
1272        envelope: TypedEnvelope<proto::SaveBuffer>,
1273        mut cx: AsyncAppContext,
1274    ) -> Result<proto::BufferSaved> {
1275        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1276        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1277            anyhow::Ok((
1278                this.get_existing(buffer_id)?,
1279                this.remote_id.context("project is not shared")?,
1280            ))
1281        })??;
1282        buffer
1283            .update(&mut cx, |buffer, _| {
1284                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1285            })?
1286            .await?;
1287        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1288
1289        if let Some(new_path) = envelope.payload.new_path {
1290            let new_path = ProjectPath::from_proto(new_path);
1291            this.update(&mut cx, |this, cx| {
1292                this.save_buffer_as(buffer.clone(), new_path, cx)
1293            })?
1294            .await?;
1295        } else {
1296            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1297                .await?;
1298        }
1299
1300        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1301            project_id,
1302            buffer_id: buffer_id.into(),
1303            version: serialize_version(buffer.saved_version()),
1304            mtime: buffer.saved_mtime().map(|time| time.into()),
1305        })
1306    }
1307
1308    pub async fn handle_close_buffer(
1309        this: Model<Self>,
1310        envelope: TypedEnvelope<proto::CloseBuffer>,
1311        mut cx: AsyncAppContext,
1312    ) -> Result<()> {
1313        let peer_id = envelope.sender_id;
1314        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1315        this.update(&mut cx, |this, _| {
1316            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1317                if shared.remove(&buffer_id) {
1318                    if shared.is_empty() {
1319                        this.shared_buffers.remove(&peer_id);
1320                    }
1321                    return;
1322                }
1323            };
1324            debug_panic!(
1325                "peer_id {} closed buffer_id {} which was either not open or already closed",
1326                peer_id,
1327                buffer_id
1328            )
1329        })
1330    }
1331
1332    pub async fn handle_buffer_saved(
1333        this: Model<Self>,
1334        envelope: TypedEnvelope<proto::BufferSaved>,
1335        mut cx: AsyncAppContext,
1336    ) -> Result<()> {
1337        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1338        let version = deserialize_version(&envelope.payload.version);
1339        let mtime = envelope.payload.mtime.map(|time| time.into());
1340        this.update(&mut cx, |this, cx| {
1341            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1342                buffer.update(cx, |buffer, cx| {
1343                    buffer.did_save(version, mtime, cx);
1344                });
1345            }
1346        })
1347    }
1348
1349    pub async fn handle_buffer_reloaded(
1350        this: Model<Self>,
1351        envelope: TypedEnvelope<proto::BufferReloaded>,
1352        mut cx: AsyncAppContext,
1353    ) -> Result<()> {
1354        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1355        let version = deserialize_version(&envelope.payload.version);
1356        let mtime = envelope.payload.mtime.map(|time| time.into());
1357        let line_ending = deserialize_line_ending(
1358            proto::LineEnding::from_i32(envelope.payload.line_ending)
1359                .ok_or_else(|| anyhow!("missing line ending"))?,
1360        );
1361        this.update(&mut cx, |this, cx| {
1362            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1363                buffer.update(cx, |buffer, cx| {
1364                    buffer.did_reload(version, line_ending, mtime, cx);
1365                });
1366            }
1367        })
1368    }
1369
1370    pub async fn handle_blame_buffer(
1371        this: Model<Self>,
1372        envelope: TypedEnvelope<proto::BlameBuffer>,
1373        mut cx: AsyncAppContext,
1374    ) -> Result<proto::BlameBufferResponse> {
1375        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1376        let version = deserialize_version(&envelope.payload.version);
1377        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1378        buffer
1379            .update(&mut cx, |buffer, _| {
1380                buffer.wait_for_version(version.clone())
1381            })?
1382            .await?;
1383        let blame = this
1384            .update(&mut cx, |this, cx| {
1385                this.blame_buffer(&buffer, Some(version), cx)
1386            })?
1387            .await?;
1388        Ok(serialize_blame_buffer_response(blame))
1389    }
1390
1391    pub async fn wait_for_loading_buffer(
1392        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1393    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1394        loop {
1395            if let Some(result) = receiver.borrow().as_ref() {
1396                match result {
1397                    Ok(buffer) => return Ok(buffer.to_owned()),
1398                    Err(e) => return Err(e.to_owned()),
1399                }
1400            }
1401            receiver.next().await;
1402        }
1403    }
1404
1405    pub fn create_buffer_for_peer(
1406        &mut self,
1407        buffer: &Model<Buffer>,
1408        peer_id: proto::PeerId,
1409        project_id: u64,
1410        client: AnyProtoClient,
1411        cx: &mut ModelContext<Self>,
1412    ) -> Task<Result<()>> {
1413        let buffer_id = buffer.read(cx).remote_id();
1414        if !self
1415            .shared_buffers
1416            .entry(peer_id)
1417            .or_default()
1418            .insert(buffer_id)
1419        {
1420            return Task::ready(Ok(()));
1421        }
1422
1423        cx.spawn(|this, mut cx| async move {
1424            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1425                return anyhow::Ok(());
1426            };
1427
1428            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1429            let operations = operations.await;
1430            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1431
1432            let initial_state = proto::CreateBufferForPeer {
1433                project_id,
1434                peer_id: Some(peer_id),
1435                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1436            };
1437
1438            if client.send(initial_state).log_err().is_some() {
1439                let client = client.clone();
1440                cx.background_executor()
1441                    .spawn(async move {
1442                        let mut chunks = split_operations(operations).peekable();
1443                        while let Some(chunk) = chunks.next() {
1444                            let is_last = chunks.peek().is_none();
1445                            client.send(proto::CreateBufferForPeer {
1446                                project_id,
1447                                peer_id: Some(peer_id),
1448                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1449                                    proto::BufferChunk {
1450                                        buffer_id: buffer_id.into(),
1451                                        operations: chunk,
1452                                        is_last,
1453                                    },
1454                                )),
1455                            })?;
1456                        }
1457                        anyhow::Ok(())
1458                    })
1459                    .await
1460                    .log_err();
1461            }
1462            Ok(())
1463        })
1464    }
1465
1466    pub fn forget_shared_buffers(&mut self) {
1467        self.shared_buffers.clear();
1468    }
1469
1470    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1471        self.shared_buffers.remove(peer_id);
1472    }
1473
1474    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1475        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1476            self.shared_buffers.insert(new_peer_id, buffers);
1477        }
1478    }
1479
1480    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<BufferId>> {
1481        &self.shared_buffers
1482    }
1483}
1484
1485impl OpenBuffer {
1486    fn upgrade(&self) -> Option<Model<Buffer>> {
1487        match self {
1488            OpenBuffer::Strong(handle) => Some(handle.clone()),
1489            OpenBuffer::Weak(handle) => handle.upgrade(),
1490            OpenBuffer::Operations(_) => None,
1491        }
1492    }
1493}
1494
1495fn is_not_found_error(error: &anyhow::Error) -> bool {
1496    error
1497        .root_cause()
1498        .downcast_ref::<io::Error>()
1499        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1500}
1501
1502fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1503    let entries = blame
1504        .entries
1505        .into_iter()
1506        .map(|entry| proto::BlameEntry {
1507            sha: entry.sha.as_bytes().into(),
1508            start_line: entry.range.start,
1509            end_line: entry.range.end,
1510            original_line_number: entry.original_line_number,
1511            author: entry.author.clone(),
1512            author_mail: entry.author_mail.clone(),
1513            author_time: entry.author_time,
1514            author_tz: entry.author_tz.clone(),
1515            committer: entry.committer.clone(),
1516            committer_mail: entry.committer_mail.clone(),
1517            committer_time: entry.committer_time,
1518            committer_tz: entry.committer_tz.clone(),
1519            summary: entry.summary.clone(),
1520            previous: entry.previous.clone(),
1521            filename: entry.filename.clone(),
1522        })
1523        .collect::<Vec<_>>();
1524
1525    let messages = blame
1526        .messages
1527        .into_iter()
1528        .map(|(oid, message)| proto::CommitMessage {
1529            oid: oid.as_bytes().into(),
1530            message,
1531        })
1532        .collect::<Vec<_>>();
1533
1534    let permalinks = blame
1535        .permalinks
1536        .into_iter()
1537        .map(|(oid, url)| proto::CommitPermalink {
1538            oid: oid.as_bytes().into(),
1539            permalink: url.to_string(),
1540        })
1541        .collect::<Vec<_>>();
1542
1543    proto::BlameBufferResponse {
1544        entries,
1545        messages,
1546        permalinks,
1547        remote_url: blame.remote_url,
1548    }
1549}
1550
1551fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1552    let entries = response
1553        .entries
1554        .into_iter()
1555        .filter_map(|entry| {
1556            Some(git::blame::BlameEntry {
1557                sha: git::Oid::from_bytes(&entry.sha).ok()?,
1558                range: entry.start_line..entry.end_line,
1559                original_line_number: entry.original_line_number,
1560                committer: entry.committer,
1561                committer_time: entry.committer_time,
1562                committer_tz: entry.committer_tz,
1563                committer_mail: entry.committer_mail,
1564                author: entry.author,
1565                author_mail: entry.author_mail,
1566                author_time: entry.author_time,
1567                author_tz: entry.author_tz,
1568                summary: entry.summary,
1569                previous: entry.previous,
1570                filename: entry.filename,
1571            })
1572        })
1573        .collect::<Vec<_>>();
1574
1575    let messages = response
1576        .messages
1577        .into_iter()
1578        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1579        .collect::<HashMap<_, _>>();
1580
1581    let permalinks = response
1582        .permalinks
1583        .into_iter()
1584        .filter_map(|permalink| {
1585            Some((
1586                git::Oid::from_bytes(&permalink.oid).ok()?,
1587                Url::from_str(&permalink.permalink).ok()?,
1588            ))
1589        })
1590        .collect::<HashMap<_, _>>();
1591
1592    Blame {
1593        entries,
1594        permalinks,
1595        messages,
1596        remote_url: response.remote_url,
1597    }
1598}