buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, NoRepositoryError, ProjectPath,
   5};
   6use anyhow::{anyhow, Context as _, Result};
   7use client::Client;
   8use collections::{hash_map, HashMap, HashSet};
   9use fs::Fs;
  10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
  11use git::blame::Blame;
  12use gpui::{
  13    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  14};
  15use http_client::Url;
  16use language::{
  17    proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
  18    Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
  19};
  20use rpc::{
  21    proto::{self, AnyProtoClient},
  22    ErrorExt as _, TypedEnvelope,
  23};
  24use smol::channel::Receiver;
  25use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
  26use text::BufferId;
  27use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  28use worktree::{
  29    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
  30    WorktreeId,
  31};
  32
  33/// A set of open buffers.
  34pub struct BufferStore {
  35    downstream_client: Option<AnyProtoClient>,
  36    remote_id: Option<u64>,
  37    #[allow(unused)]
  38    worktree_store: Model<WorktreeStore>,
  39    opened_buffers: HashMap<BufferId, OpenBuffer>,
  40    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  41    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  42    #[allow(clippy::type_complexity)]
  43    loading_buffers_by_path: HashMap<
  44        ProjectPath,
  45        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  46    >,
  47    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  48    remote_buffer_listeners:
  49        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  50    shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
  51}
  52
  53enum OpenBuffer {
  54    Strong(Model<Buffer>),
  55    Weak(WeakModel<Buffer>),
  56    Operations(Vec<Operation>),
  57}
  58
  59pub enum BufferStoreEvent {
  60    BufferAdded(Model<Buffer>),
  61    BufferDropped(BufferId),
  62    BufferChangedFilePath {
  63        buffer: Model<Buffer>,
  64        old_file: Option<Arc<dyn language::File>>,
  65    },
  66}
  67
  68#[derive(Default)]
  69pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
  70
  71impl EventEmitter<BufferStoreEvent> for BufferStore {}
  72
  73impl BufferStore {
  74    pub fn init(client: &AnyProtoClient) {
  75        client.add_model_message_handler(Self::handle_buffer_reloaded);
  76        client.add_model_message_handler(Self::handle_buffer_saved);
  77        client.add_model_message_handler(Self::handle_update_buffer_file);
  78        client.add_model_message_handler(Self::handle_update_diff_base);
  79        client.add_model_request_handler(Self::handle_save_buffer);
  80        client.add_model_request_handler(Self::handle_blame_buffer);
  81    }
  82
  83    /// Creates a buffer store, optionally retaining its buffers.
  84    ///
  85    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
  86    /// and won't be released unless they are explicitly removed, or `retain_buffers`
  87    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
  88    /// weak handles.
  89    pub fn new(
  90        worktree_store: Model<WorktreeStore>,
  91        remote_id: Option<u64>,
  92        cx: &mut ModelContext<Self>,
  93    ) -> Self {
  94        cx.subscribe(&worktree_store, |this, _, event, cx| match event {
  95            WorktreeStoreEvent::WorktreeAdded(worktree) => {
  96                this.subscribe_to_worktree(worktree, cx);
  97            }
  98            _ => {}
  99        })
 100        .detach();
 101
 102        Self {
 103            remote_id,
 104            downstream_client: None,
 105            worktree_store,
 106            opened_buffers: Default::default(),
 107            remote_buffer_listeners: Default::default(),
 108            loading_remote_buffers_by_id: Default::default(),
 109            local_buffer_ids_by_path: Default::default(),
 110            local_buffer_ids_by_entry_id: Default::default(),
 111            loading_buffers_by_path: Default::default(),
 112            shared_buffers: Default::default(),
 113        }
 114    }
 115
 116    pub fn open_buffer(
 117        &mut self,
 118        project_path: ProjectPath,
 119        cx: &mut ModelContext<Self>,
 120    ) -> Task<Result<Model<Buffer>>> {
 121        let existing_buffer = self.get_by_path(&project_path, cx);
 122        if let Some(existing_buffer) = existing_buffer {
 123            return Task::ready(Ok(existing_buffer));
 124        }
 125
 126        let Some(worktree) = self
 127            .worktree_store
 128            .read(cx)
 129            .worktree_for_id(project_path.worktree_id, cx)
 130        else {
 131            return Task::ready(Err(anyhow!("no such worktree")));
 132        };
 133
 134        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 135            // If the given path is already being loaded, then wait for that existing
 136            // task to complete and return the same buffer.
 137            hash_map::Entry::Occupied(e) => e.get().clone(),
 138
 139            // Otherwise, record the fact that this path is now being loaded.
 140            hash_map::Entry::Vacant(entry) => {
 141                let (mut tx, rx) = postage::watch::channel();
 142                entry.insert(rx.clone());
 143
 144                let project_path = project_path.clone();
 145                let load_buffer = match worktree.read(cx) {
 146                    Worktree::Local(_) => {
 147                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
 148                    }
 149                    Worktree::Remote(tree) => {
 150                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
 151                    }
 152                };
 153
 154                cx.spawn(move |this, mut cx| async move {
 155                    let load_result = load_buffer.await;
 156                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
 157                        // Record the fact that the buffer is no longer loading.
 158                        this.loading_buffers_by_path.remove(&project_path);
 159                        let buffer = load_result.map_err(Arc::new)?;
 160                        Ok(buffer)
 161                    })?);
 162                    anyhow::Ok(())
 163                })
 164                .detach();
 165                rx
 166            }
 167        };
 168
 169        cx.background_executor().spawn(async move {
 170            Self::wait_for_loading_buffer(loading_watch)
 171                .await
 172                .map_err(|e| e.cloned())
 173        })
 174    }
 175
 176    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 177        cx.subscribe(worktree, |this, worktree, event, cx| {
 178            if worktree.read(cx).is_local() {
 179                match event {
 180                    worktree::Event::UpdatedEntries(changes) => {
 181                        this.local_worktree_entries_changed(&worktree, changes, cx);
 182                    }
 183                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 184                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 185                    }
 186                    _ => {}
 187                }
 188            }
 189        })
 190        .detach();
 191    }
 192
 193    fn local_worktree_entries_changed(
 194        &mut self,
 195        worktree_handle: &Model<Worktree>,
 196        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 197        cx: &mut ModelContext<Self>,
 198    ) {
 199        let snapshot = worktree_handle.read(cx).snapshot();
 200        for (path, entry_id, _) in changes {
 201            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 202        }
 203    }
 204
 205    fn local_worktree_git_repos_changed(
 206        &mut self,
 207        worktree_handle: Model<Worktree>,
 208        changed_repos: &UpdatedGitRepositoriesSet,
 209        cx: &mut ModelContext<Self>,
 210    ) {
 211        debug_assert!(worktree_handle.read(cx).is_local());
 212
 213        // Identify the loading buffers whose containing repository that has changed.
 214        let future_buffers = self
 215            .loading_buffers()
 216            .filter_map(|(project_path, receiver)| {
 217                if project_path.worktree_id != worktree_handle.read(cx).id() {
 218                    return None;
 219                }
 220                let path = &project_path.path;
 221                changed_repos
 222                    .iter()
 223                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 224                let path = path.clone();
 225                Some(async move {
 226                    Self::wait_for_loading_buffer(receiver)
 227                        .await
 228                        .ok()
 229                        .map(|buffer| (buffer, path))
 230                })
 231            })
 232            .collect::<FuturesUnordered<_>>();
 233
 234        // Identify the current buffers whose containing repository has changed.
 235        let current_buffers = self
 236            .buffers()
 237            .filter_map(|buffer| {
 238                let file = File::from_dyn(buffer.read(cx).file())?;
 239                if file.worktree != worktree_handle {
 240                    return None;
 241                }
 242                changed_repos
 243                    .iter()
 244                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 245                Some((buffer, file.path.clone()))
 246            })
 247            .collect::<Vec<_>>();
 248
 249        if future_buffers.len() + current_buffers.len() == 0 {
 250            return;
 251        }
 252
 253        cx.spawn(move |this, mut cx| async move {
 254            // Wait for all of the buffers to load.
 255            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 256
 257            // Reload the diff base for every buffer whose containing git repository has changed.
 258            let snapshot =
 259                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 260            let diff_bases_by_buffer = cx
 261                .background_executor()
 262                .spawn(async move {
 263                    let mut diff_base_tasks = future_buffers
 264                        .into_iter()
 265                        .flatten()
 266                        .chain(current_buffers)
 267                        .filter_map(|(buffer, path)| {
 268                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 269                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 270                            Some(async move {
 271                                let base_text =
 272                                    local_repo_entry.repo().load_index_text(&relative_path);
 273                                Some((buffer, base_text))
 274                            })
 275                        })
 276                        .collect::<FuturesUnordered<_>>();
 277
 278                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 279                    while let Some(diff_base) = diff_base_tasks.next().await {
 280                        if let Some(diff_base) = diff_base {
 281                            diff_bases.push(diff_base);
 282                        }
 283                    }
 284                    diff_bases
 285                })
 286                .await;
 287
 288            this.update(&mut cx, |this, cx| {
 289                // Assign the new diff bases on all of the buffers.
 290                for (buffer, diff_base) in diff_bases_by_buffer {
 291                    let buffer_id = buffer.update(cx, |buffer, cx| {
 292                        buffer.set_diff_base(diff_base.clone(), cx);
 293                        buffer.remote_id().to_proto()
 294                    });
 295                    if let Some(project_id) = this.remote_id {
 296                        if let Some(client) = &this.downstream_client {
 297                            client
 298                                .send(proto::UpdateDiffBase {
 299                                    project_id,
 300                                    buffer_id,
 301                                    diff_base,
 302                                })
 303                                .log_err();
 304                        }
 305                    }
 306                }
 307            })
 308        })
 309        .detach_and_log_err(cx);
 310    }
 311
 312    fn open_local_buffer_internal(
 313        &mut self,
 314        path: Arc<Path>,
 315        worktree: Model<Worktree>,
 316        cx: &mut ModelContext<Self>,
 317    ) -> Task<Result<Model<Buffer>>> {
 318        let load_buffer = worktree.update(cx, |worktree, cx| {
 319            let load_file = worktree.load_file(path.as_ref(), cx);
 320            let reservation = cx.reserve_model();
 321            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 322            cx.spawn(move |_, mut cx| async move {
 323                let loaded = load_file.await?;
 324                let text_buffer = cx
 325                    .background_executor()
 326                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 327                    .await;
 328                cx.insert_model(reservation, |_| {
 329                    Buffer::build(
 330                        text_buffer,
 331                        loaded.diff_base,
 332                        Some(loaded.file),
 333                        Capability::ReadWrite,
 334                    )
 335                })
 336            })
 337        });
 338
 339        cx.spawn(move |this, mut cx| async move {
 340            let buffer = match load_buffer.await {
 341                Ok(buffer) => Ok(buffer),
 342                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 343                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 344                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 345                    Buffer::build(
 346                        text_buffer,
 347                        None,
 348                        Some(Arc::new(File {
 349                            worktree,
 350                            path,
 351                            mtime: None,
 352                            entry_id: None,
 353                            is_local: true,
 354                            is_deleted: false,
 355                            is_private: false,
 356                        })),
 357                        Capability::ReadWrite,
 358                    )
 359                }),
 360                Err(e) => Err(e),
 361            }?;
 362            this.update(&mut cx, |this, cx| {
 363                this.add_buffer(buffer.clone(), cx).log_err();
 364            })?;
 365            Ok(buffer)
 366        })
 367    }
 368
 369    fn open_remote_buffer_internal(
 370        &self,
 371        path: &Arc<Path>,
 372        worktree: &RemoteWorktree,
 373        cx: &ModelContext<Self>,
 374    ) -> Task<Result<Model<Buffer>>> {
 375        let worktree_id = worktree.id().to_proto();
 376        let project_id = worktree.project_id();
 377        let client = worktree.client();
 378        let path_string = path.clone().to_string_lossy().to_string();
 379        cx.spawn(move |this, mut cx| async move {
 380            let response = client
 381                .request(proto::OpenBufferByPath {
 382                    project_id,
 383                    worktree_id,
 384                    path: path_string,
 385                })
 386                .await?;
 387            let buffer_id = BufferId::new(response.buffer_id)?;
 388            this.update(&mut cx, |this, cx| {
 389                this.wait_for_remote_buffer(buffer_id, cx)
 390            })?
 391            .await
 392        })
 393    }
 394
 395    pub fn create_buffer(
 396        &mut self,
 397        remote_client: Option<(AnyProtoClient, u64)>,
 398        cx: &mut ModelContext<Self>,
 399    ) -> Task<Result<Model<Buffer>>> {
 400        if let Some((remote_client, project_id)) = remote_client {
 401            let create = remote_client.request(proto::OpenNewBuffer { project_id });
 402            cx.spawn(|this, mut cx| async move {
 403                let response = create.await?;
 404                let buffer_id = BufferId::new(response.buffer_id)?;
 405
 406                this.update(&mut cx, |this, cx| {
 407                    this.wait_for_remote_buffer(buffer_id, cx)
 408                })?
 409                .await
 410            })
 411        } else {
 412            Task::ready(Ok(self.create_local_buffer("", None, cx)))
 413        }
 414    }
 415
 416    pub fn create_local_buffer(
 417        &mut self,
 418        text: &str,
 419        language: Option<Arc<Language>>,
 420        cx: &mut ModelContext<Self>,
 421    ) -> Model<Buffer> {
 422        let buffer = cx.new_model(|cx| {
 423            Buffer::local(text, cx)
 424                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 425        });
 426        self.add_buffer(buffer.clone(), cx).log_err();
 427        buffer
 428    }
 429
 430    pub fn save_buffer(
 431        &mut self,
 432        buffer: Model<Buffer>,
 433        cx: &mut ModelContext<Self>,
 434    ) -> Task<Result<()>> {
 435        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 436            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 437        };
 438        match file.worktree.read(cx) {
 439            Worktree::Local(_) => {
 440                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
 441            }
 442            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
 443        }
 444    }
 445
 446    pub fn save_buffer_as(
 447        &mut self,
 448        buffer: Model<Buffer>,
 449        path: ProjectPath,
 450        cx: &mut ModelContext<Self>,
 451    ) -> Task<Result<()>> {
 452        let Some(worktree) = self
 453            .worktree_store
 454            .read(cx)
 455            .worktree_for_id(path.worktree_id, cx)
 456        else {
 457            return Task::ready(Err(anyhow!("no such worktree")));
 458        };
 459
 460        let old_file = buffer.read(cx).file().cloned();
 461
 462        let task = match worktree.read(cx) {
 463            Worktree::Local(_) => {
 464                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
 465            }
 466            Worktree::Remote(tree) => {
 467                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
 468            }
 469        };
 470        cx.spawn(|this, mut cx| async move {
 471            task.await?;
 472            this.update(&mut cx, |_, cx| {
 473                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 474            })
 475        })
 476    }
 477
 478    fn save_local_buffer(
 479        &self,
 480        worktree: Model<Worktree>,
 481        buffer_handle: Model<Buffer>,
 482        path: Arc<Path>,
 483        mut has_changed_file: bool,
 484        cx: &mut ModelContext<Self>,
 485    ) -> Task<Result<()>> {
 486        let buffer = buffer_handle.read(cx);
 487        let text = buffer.as_rope().clone();
 488        let line_ending = buffer.line_ending();
 489        let version = buffer.version();
 490        let buffer_id = buffer.remote_id();
 491        if buffer.file().is_some_and(|file| !file.is_created()) {
 492            has_changed_file = true;
 493        }
 494
 495        let save = worktree.update(cx, |worktree, cx| {
 496            worktree.write_file(path.as_ref(), text, line_ending, cx)
 497        });
 498
 499        cx.spawn(move |this, mut cx| async move {
 500            let new_file = save.await?;
 501            let mtime = new_file.mtime;
 502            this.update(&mut cx, |this, cx| {
 503                if let Some(downstream_client) = this.downstream_client.as_ref() {
 504                    let project_id = this.remote_id.unwrap_or(0);
 505                    if has_changed_file {
 506                        downstream_client
 507                            .send(proto::UpdateBufferFile {
 508                                project_id,
 509                                buffer_id: buffer_id.to_proto(),
 510                                file: Some(language::File::to_proto(&*new_file, cx)),
 511                            })
 512                            .log_err();
 513                    }
 514                    downstream_client
 515                        .send(proto::BufferSaved {
 516                            project_id,
 517                            buffer_id: buffer_id.to_proto(),
 518                            version: serialize_version(&version),
 519                            mtime: mtime.map(|time| time.into()),
 520                        })
 521                        .log_err();
 522                }
 523            })?;
 524            buffer_handle.update(&mut cx, |buffer, cx| {
 525                if has_changed_file {
 526                    buffer.file_updated(new_file, cx);
 527                }
 528                buffer.did_save(version.clone(), mtime, cx);
 529            })
 530        })
 531    }
 532
 533    fn save_remote_buffer(
 534        &self,
 535        buffer_handle: Model<Buffer>,
 536        new_path: Option<proto::ProjectPath>,
 537        tree: &RemoteWorktree,
 538        cx: &ModelContext<Self>,
 539    ) -> Task<Result<()>> {
 540        let buffer = buffer_handle.read(cx);
 541        let buffer_id = buffer.remote_id().into();
 542        let version = buffer.version();
 543        let rpc = tree.client();
 544        let project_id = tree.project_id();
 545        cx.spawn(move |_, mut cx| async move {
 546            let response = rpc
 547                .request(proto::SaveBuffer {
 548                    project_id,
 549                    buffer_id,
 550                    new_path,
 551                    version: serialize_version(&version),
 552                })
 553                .await?;
 554            let version = deserialize_version(&response.version);
 555            let mtime = response.mtime.map(|mtime| mtime.into());
 556
 557            buffer_handle.update(&mut cx, |buffer, cx| {
 558                buffer.did_save(version.clone(), mtime, cx);
 559            })?;
 560
 561            Ok(())
 562        })
 563    }
 564
 565    pub fn blame_buffer(
 566        &self,
 567        buffer: &Model<Buffer>,
 568        version: Option<clock::Global>,
 569        cx: &AppContext,
 570    ) -> Task<Result<Blame>> {
 571        let buffer = buffer.read(cx);
 572        let Some(file) = File::from_dyn(buffer.file()) else {
 573            return Task::ready(Err(anyhow!("buffer has no file")));
 574        };
 575
 576        match file.worktree.clone().read(cx) {
 577            Worktree::Local(worktree) => {
 578                let worktree = worktree.snapshot();
 579                let blame_params = maybe!({
 580                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
 581                        Some(repo_for_path) => repo_for_path,
 582                        None => anyhow::bail!(NoRepositoryError {}),
 583                    };
 584
 585                    let relative_path = repo_entry
 586                        .relativize(&worktree, &file.path)
 587                        .context("failed to relativize buffer path")?;
 588
 589                    let repo = local_repo_entry.repo().clone();
 590
 591                    let content = match version {
 592                        Some(version) => buffer.rope_for_version(&version).clone(),
 593                        None => buffer.as_rope().clone(),
 594                    };
 595
 596                    anyhow::Ok((repo, relative_path, content))
 597                });
 598
 599                cx.background_executor().spawn(async move {
 600                    let (repo, relative_path, content) = blame_params?;
 601                    repo.blame(&relative_path, content)
 602                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
 603                })
 604            }
 605            Worktree::Remote(worktree) => {
 606                let buffer_id = buffer.remote_id();
 607                let version = buffer.version();
 608                let project_id = worktree.project_id();
 609                let client = worktree.client();
 610                cx.spawn(|_| async move {
 611                    let response = client
 612                        .request(proto::BlameBuffer {
 613                            project_id,
 614                            buffer_id: buffer_id.into(),
 615                            version: serialize_version(&version),
 616                        })
 617                        .await?;
 618                    Ok(deserialize_blame_buffer_response(response))
 619                })
 620            }
 621        }
 622    }
 623
 624    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
 625        let remote_id = buffer.read(cx).remote_id();
 626        let is_remote = buffer.read(cx).replica_id() != 0;
 627        let open_buffer = if self.remote_id.is_some() {
 628            OpenBuffer::Strong(buffer.clone())
 629        } else {
 630            OpenBuffer::Weak(buffer.downgrade())
 631        };
 632
 633        let handle = cx.handle().downgrade();
 634        buffer.update(cx, move |_, cx| {
 635            cx.on_release(move |buffer, cx| {
 636                handle
 637                    .update(cx, |_, cx| {
 638                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
 639                    })
 640                    .ok();
 641            })
 642            .detach()
 643        });
 644
 645        match self.opened_buffers.entry(remote_id) {
 646            hash_map::Entry::Vacant(entry) => {
 647                entry.insert(open_buffer);
 648            }
 649            hash_map::Entry::Occupied(mut entry) => {
 650                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 651                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
 652                } else if entry.get().upgrade().is_some() {
 653                    if is_remote {
 654                        return Ok(());
 655                    } else {
 656                        debug_panic!("buffer {} was already registered", remote_id);
 657                        Err(anyhow!("buffer {} was already registered", remote_id))?;
 658                    }
 659                }
 660                entry.insert(open_buffer);
 661            }
 662        }
 663
 664        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
 665            for sender in senders {
 666                sender.send(Ok(buffer.clone())).ok();
 667            }
 668        }
 669
 670        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 671            if file.is_local {
 672                self.local_buffer_ids_by_path.insert(
 673                    ProjectPath {
 674                        worktree_id: file.worktree_id(cx),
 675                        path: file.path.clone(),
 676                    },
 677                    remote_id,
 678                );
 679
 680                if let Some(entry_id) = file.entry_id {
 681                    self.local_buffer_ids_by_entry_id
 682                        .insert(entry_id, remote_id);
 683                }
 684            }
 685        }
 686
 687        cx.subscribe(&buffer, Self::on_buffer_event).detach();
 688        cx.emit(BufferStoreEvent::BufferAdded(buffer));
 689        Ok(())
 690    }
 691
 692    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
 693        self.opened_buffers
 694            .values()
 695            .filter_map(|buffer| buffer.upgrade())
 696    }
 697
 698    pub fn loading_buffers(
 699        &self,
 700    ) -> impl Iterator<
 701        Item = (
 702            &ProjectPath,
 703            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 704        ),
 705    > {
 706        self.loading_buffers_by_path
 707            .iter()
 708            .map(|(path, rx)| (path, rx.clone()))
 709    }
 710
 711    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
 712        self.buffers().find_map(|buffer| {
 713            let file = File::from_dyn(buffer.read(cx).file())?;
 714            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
 715                Some(buffer)
 716            } else {
 717                None
 718            }
 719        })
 720    }
 721
 722    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 723        self.opened_buffers
 724            .get(&buffer_id)
 725            .and_then(|buffer| buffer.upgrade())
 726    }
 727
 728    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
 729        self.get(buffer_id)
 730            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
 731    }
 732
 733    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 734        self.get(buffer_id)
 735            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 736    }
 737
 738    pub fn wait_for_remote_buffer(
 739        &mut self,
 740        id: BufferId,
 741        cx: &mut AppContext,
 742    ) -> Task<Result<Model<Buffer>>> {
 743        let buffer = self.get(id);
 744        if let Some(buffer) = buffer {
 745            return Task::ready(Ok(buffer));
 746        }
 747        let (tx, rx) = oneshot::channel();
 748        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 749        cx.background_executor().spawn(async move { rx.await? })
 750    }
 751
 752    pub fn buffer_version_info(
 753        &self,
 754        cx: &AppContext,
 755    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 756        let buffers = self
 757            .buffers()
 758            .map(|buffer| {
 759                let buffer = buffer.read(cx);
 760                proto::BufferVersion {
 761                    id: buffer.remote_id().into(),
 762                    version: language::proto::serialize_version(&buffer.version),
 763                }
 764            })
 765            .collect();
 766        let incomplete_buffer_ids = self
 767            .loading_remote_buffers_by_id
 768            .keys()
 769            .copied()
 770            .collect::<Vec<_>>();
 771        (buffers, incomplete_buffer_ids)
 772    }
 773
 774    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 775        self.downstream_client.take();
 776        self.set_remote_id(None, cx);
 777
 778        for buffer in self.buffers() {
 779            buffer.update(cx, |buffer, cx| {
 780                buffer.set_capability(Capability::ReadOnly, cx)
 781            });
 782        }
 783
 784        // Wake up all futures currently waiting on a buffer to get opened,
 785        // to give them a chance to fail now that we've disconnected.
 786        self.remote_buffer_listeners.clear();
 787    }
 788
 789    pub fn shared(
 790        &mut self,
 791        remote_id: u64,
 792        downstream_client: AnyProtoClient,
 793        cx: &mut AppContext,
 794    ) {
 795        self.downstream_client = Some(downstream_client);
 796        self.set_remote_id(Some(remote_id), cx);
 797    }
 798
 799    pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
 800        self.remote_id.take();
 801    }
 802
 803    fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
 804        self.remote_id = remote_id;
 805        for open_buffer in self.opened_buffers.values_mut() {
 806            if remote_id.is_some() {
 807                if let OpenBuffer::Weak(buffer) = open_buffer {
 808                    if let Some(buffer) = buffer.upgrade() {
 809                        *open_buffer = OpenBuffer::Strong(buffer);
 810                    }
 811                }
 812            } else {
 813                if let Some(buffer) = open_buffer.upgrade() {
 814                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 815                }
 816                if let OpenBuffer::Strong(buffer) = open_buffer {
 817                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
 818                }
 819            }
 820        }
 821    }
 822
 823    pub fn discard_incomplete(&mut self) {
 824        self.opened_buffers
 825            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
 826    }
 827
 828    pub fn find_search_candidates(
 829        &mut self,
 830        query: &SearchQuery,
 831        mut limit: usize,
 832        fs: Arc<dyn Fs>,
 833        cx: &mut ModelContext<Self>,
 834    ) -> Receiver<Model<Buffer>> {
 835        let (tx, rx) = smol::channel::unbounded();
 836        let mut open_buffers = HashSet::default();
 837        let mut unnamed_buffers = Vec::new();
 838        for handle in self.buffers() {
 839            let buffer = handle.read(cx);
 840            if let Some(entry_id) = buffer.entry_id(cx) {
 841                open_buffers.insert(entry_id);
 842            } else {
 843                limit = limit.saturating_sub(1);
 844                unnamed_buffers.push(handle)
 845            };
 846        }
 847
 848        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 849        let mut project_paths_rx = self
 850            .worktree_store
 851            .update(cx, |worktree_store, cx| {
 852                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
 853            })
 854            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
 855
 856        cx.spawn(|this, mut cx| async move {
 857            for buffer in unnamed_buffers {
 858                tx.send(buffer).await.ok();
 859            }
 860
 861            while let Some(project_paths) = project_paths_rx.next().await {
 862                let buffers = this.update(&mut cx, |this, cx| {
 863                    project_paths
 864                        .into_iter()
 865                        .map(|project_path| this.open_buffer(project_path, cx))
 866                        .collect::<Vec<_>>()
 867                })?;
 868                for buffer_task in buffers {
 869                    if let Some(buffer) = buffer_task.await.log_err() {
 870                        if tx.send(buffer).await.is_err() {
 871                            return anyhow::Ok(());
 872                        }
 873                    }
 874                }
 875            }
 876            anyhow::Ok(())
 877        })
 878        .detach();
 879        rx
 880    }
 881
 882    fn on_buffer_event(
 883        &mut self,
 884        buffer: Model<Buffer>,
 885        event: &BufferEvent,
 886        cx: &mut ModelContext<Self>,
 887    ) {
 888        match event {
 889            BufferEvent::FileHandleChanged => {
 890                self.buffer_changed_file(buffer, cx);
 891            }
 892            _ => {}
 893        }
 894    }
 895
 896    fn local_worktree_entry_changed(
 897        &mut self,
 898        entry_id: ProjectEntryId,
 899        path: &Arc<Path>,
 900        worktree: &Model<worktree::Worktree>,
 901        snapshot: &worktree::Snapshot,
 902        cx: &mut ModelContext<Self>,
 903    ) -> Option<()> {
 904        let project_path = ProjectPath {
 905            worktree_id: snapshot.id(),
 906            path: path.clone(),
 907        };
 908        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 909            Some(&buffer_id) => buffer_id,
 910            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 911        };
 912        let buffer = if let Some(buffer) = self.get(buffer_id) {
 913            buffer
 914        } else {
 915            self.opened_buffers.remove(&buffer_id);
 916            self.local_buffer_ids_by_path.remove(&project_path);
 917            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 918            return None;
 919        };
 920
 921        let events = buffer.update(cx, |buffer, cx| {
 922            let file = buffer.file()?;
 923            let old_file = File::from_dyn(Some(file))?;
 924            if old_file.worktree != *worktree {
 925                return None;
 926            }
 927
 928            let new_file = if let Some(entry) = old_file
 929                .entry_id
 930                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 931            {
 932                File {
 933                    is_local: true,
 934                    entry_id: Some(entry.id),
 935                    mtime: entry.mtime,
 936                    path: entry.path.clone(),
 937                    worktree: worktree.clone(),
 938                    is_deleted: false,
 939                    is_private: entry.is_private,
 940                }
 941            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 942                File {
 943                    is_local: true,
 944                    entry_id: Some(entry.id),
 945                    mtime: entry.mtime,
 946                    path: entry.path.clone(),
 947                    worktree: worktree.clone(),
 948                    is_deleted: false,
 949                    is_private: entry.is_private,
 950                }
 951            } else {
 952                File {
 953                    is_local: true,
 954                    entry_id: old_file.entry_id,
 955                    path: old_file.path.clone(),
 956                    mtime: old_file.mtime,
 957                    worktree: worktree.clone(),
 958                    is_deleted: true,
 959                    is_private: old_file.is_private,
 960                }
 961            };
 962
 963            if new_file == *old_file {
 964                return None;
 965            }
 966
 967            let mut events = Vec::new();
 968            if new_file.path != old_file.path {
 969                self.local_buffer_ids_by_path.remove(&ProjectPath {
 970                    path: old_file.path.clone(),
 971                    worktree_id: old_file.worktree_id(cx),
 972                });
 973                self.local_buffer_ids_by_path.insert(
 974                    ProjectPath {
 975                        worktree_id: new_file.worktree_id(cx),
 976                        path: new_file.path.clone(),
 977                    },
 978                    buffer_id,
 979                );
 980                events.push(BufferStoreEvent::BufferChangedFilePath {
 981                    buffer: cx.handle(),
 982                    old_file: buffer.file().cloned(),
 983                });
 984            }
 985
 986            if new_file.entry_id != old_file.entry_id {
 987                if let Some(entry_id) = old_file.entry_id {
 988                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
 989                }
 990                if let Some(entry_id) = new_file.entry_id {
 991                    self.local_buffer_ids_by_entry_id
 992                        .insert(entry_id, buffer_id);
 993                }
 994            }
 995
 996            if let Some(project_id) = self.remote_id {
 997                if let Some(client) = &self.downstream_client {
 998                    client
 999                        .send(proto::UpdateBufferFile {
1000                            project_id,
1001                            buffer_id: buffer_id.to_proto(),
1002                            file: Some(new_file.to_proto(cx)),
1003                        })
1004                        .ok();
1005                }
1006            }
1007
1008            buffer.file_updated(Arc::new(new_file), cx);
1009            Some(events)
1010        })?;
1011
1012        for event in events {
1013            cx.emit(event);
1014        }
1015
1016        None
1017    }
1018
1019    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
1020        let file = File::from_dyn(buffer.read(cx).file())?;
1021
1022        let remote_id = buffer.read(cx).remote_id();
1023        if let Some(entry_id) = file.entry_id {
1024            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1025                Some(_) => {
1026                    return None;
1027                }
1028                None => {
1029                    self.local_buffer_ids_by_entry_id
1030                        .insert(entry_id, remote_id);
1031                }
1032            }
1033        };
1034        self.local_buffer_ids_by_path.insert(
1035            ProjectPath {
1036                worktree_id: file.worktree_id(cx),
1037                path: file.path.clone(),
1038            },
1039            remote_id,
1040        );
1041
1042        Some(())
1043    }
1044
1045    pub async fn handle_update_buffer(
1046        this: Model<Self>,
1047        envelope: TypedEnvelope<proto::UpdateBuffer>,
1048        mut cx: AsyncAppContext,
1049    ) -> Result<proto::Ack> {
1050        let payload = envelope.payload.clone();
1051        let buffer_id = BufferId::new(payload.buffer_id)?;
1052        let ops = payload
1053            .operations
1054            .into_iter()
1055            .map(language::proto::deserialize_operation)
1056            .collect::<Result<Vec<_>, _>>()?;
1057        this.update(&mut cx, |this, cx| {
1058            match this.opened_buffers.entry(buffer_id) {
1059                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1060                    OpenBuffer::Strong(buffer) => {
1061                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
1062                    }
1063                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1064                    OpenBuffer::Weak(_) => {}
1065                },
1066                hash_map::Entry::Vacant(e) => {
1067                    e.insert(OpenBuffer::Operations(ops));
1068                }
1069            }
1070            Ok(proto::Ack {})
1071        })?
1072    }
1073
1074    pub fn handle_synchronize_buffers(
1075        &mut self,
1076        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1077        cx: &mut ModelContext<Self>,
1078        client: Arc<Client>,
1079    ) -> Result<proto::SynchronizeBuffersResponse> {
1080        let project_id = envelope.payload.project_id;
1081        let mut response = proto::SynchronizeBuffersResponse {
1082            buffers: Default::default(),
1083        };
1084        let Some(guest_id) = envelope.original_sender_id else {
1085            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1086        };
1087
1088        self.shared_buffers.entry(guest_id).or_default().clear();
1089        for buffer in envelope.payload.buffers {
1090            let buffer_id = BufferId::new(buffer.id)?;
1091            let remote_version = language::proto::deserialize_version(&buffer.version);
1092            if let Some(buffer) = self.get(buffer_id) {
1093                self.shared_buffers
1094                    .entry(guest_id)
1095                    .or_default()
1096                    .insert(buffer_id);
1097
1098                let buffer = buffer.read(cx);
1099                response.buffers.push(proto::BufferVersion {
1100                    id: buffer_id.into(),
1101                    version: language::proto::serialize_version(&buffer.version),
1102                });
1103
1104                let operations = buffer.serialize_ops(Some(remote_version), cx);
1105                let client = client.clone();
1106                if let Some(file) = buffer.file() {
1107                    client
1108                        .send(proto::UpdateBufferFile {
1109                            project_id,
1110                            buffer_id: buffer_id.into(),
1111                            file: Some(file.to_proto(cx)),
1112                        })
1113                        .log_err();
1114                }
1115
1116                client
1117                    .send(proto::UpdateDiffBase {
1118                        project_id,
1119                        buffer_id: buffer_id.into(),
1120                        diff_base: buffer.diff_base().map(ToString::to_string),
1121                    })
1122                    .log_err();
1123
1124                client
1125                    .send(proto::BufferReloaded {
1126                        project_id,
1127                        buffer_id: buffer_id.into(),
1128                        version: language::proto::serialize_version(buffer.saved_version()),
1129                        mtime: buffer.saved_mtime().map(|time| time.into()),
1130                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1131                            as i32,
1132                    })
1133                    .log_err();
1134
1135                cx.background_executor()
1136                    .spawn(
1137                        async move {
1138                            let operations = operations.await;
1139                            for chunk in split_operations(operations) {
1140                                client
1141                                    .request(proto::UpdateBuffer {
1142                                        project_id,
1143                                        buffer_id: buffer_id.into(),
1144                                        operations: chunk,
1145                                    })
1146                                    .await?;
1147                            }
1148                            anyhow::Ok(())
1149                        }
1150                        .log_err(),
1151                    )
1152                    .detach();
1153            }
1154        }
1155        Ok(response)
1156    }
1157
1158    pub fn handle_create_buffer_for_peer(
1159        &mut self,
1160        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1161        replica_id: u16,
1162        capability: Capability,
1163        cx: &mut ModelContext<Self>,
1164    ) -> Result<()> {
1165        match envelope
1166            .payload
1167            .variant
1168            .ok_or_else(|| anyhow!("missing variant"))?
1169        {
1170            proto::create_buffer_for_peer::Variant::State(mut state) => {
1171                let buffer_id = BufferId::new(state.id)?;
1172
1173                let buffer_result = maybe!({
1174                    let mut buffer_file = None;
1175                    if let Some(file) = state.file.take() {
1176                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1177                        let worktree = self
1178                            .worktree_store
1179                            .read(cx)
1180                            .worktree_for_id(worktree_id, cx)
1181                            .ok_or_else(|| {
1182                                anyhow!("no worktree found for id {}", file.worktree_id)
1183                            })?;
1184                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1185                            as Arc<dyn language::File>);
1186                    }
1187                    Buffer::from_proto(replica_id, capability, state, buffer_file)
1188                });
1189
1190                match buffer_result {
1191                    Ok(buffer) => {
1192                        let buffer = cx.new_model(|_| buffer);
1193                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1194                    }
1195                    Err(error) => {
1196                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1197                            for listener in listeners {
1198                                listener.send(Err(anyhow!(error.cloned()))).ok();
1199                            }
1200                        }
1201                    }
1202                }
1203            }
1204            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1205                let buffer_id = BufferId::new(chunk.buffer_id)?;
1206                let buffer = self
1207                    .loading_remote_buffers_by_id
1208                    .get(&buffer_id)
1209                    .cloned()
1210                    .ok_or_else(|| {
1211                        anyhow!(
1212                            "received chunk for buffer {} without initial state",
1213                            chunk.buffer_id
1214                        )
1215                    })?;
1216
1217                let result = maybe!({
1218                    let operations = chunk
1219                        .operations
1220                        .into_iter()
1221                        .map(language::proto::deserialize_operation)
1222                        .collect::<Result<Vec<_>>>()?;
1223                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
1224                });
1225
1226                if let Err(error) = result {
1227                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1228                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1229                        for listener in listeners {
1230                            listener.send(Err(error.cloned())).ok();
1231                        }
1232                    }
1233                } else if chunk.is_last {
1234                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1235                    self.add_buffer(buffer, cx)?;
1236                }
1237            }
1238        }
1239
1240        Ok(())
1241    }
1242
1243    pub async fn handle_update_buffer_file(
1244        this: Model<Self>,
1245        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1246        mut cx: AsyncAppContext,
1247    ) -> Result<()> {
1248        let buffer_id = envelope.payload.buffer_id;
1249        let buffer_id = BufferId::new(buffer_id)?;
1250
1251        this.update(&mut cx, |this, cx| {
1252            let payload = envelope.payload.clone();
1253            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1254                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1255                let worktree = this
1256                    .worktree_store
1257                    .read(cx)
1258                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1259                    .ok_or_else(|| anyhow!("no such worktree"))?;
1260                let file = File::from_proto(file, worktree, cx)?;
1261                let old_file = buffer.update(cx, |buffer, cx| {
1262                    let old_file = buffer.file().cloned();
1263                    let new_path = file.path.clone();
1264                    buffer.file_updated(Arc::new(file), cx);
1265                    if old_file
1266                        .as_ref()
1267                        .map_or(true, |old| *old.path() != new_path)
1268                    {
1269                        Some(old_file)
1270                    } else {
1271                        None
1272                    }
1273                });
1274                if let Some(old_file) = old_file {
1275                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1276                }
1277            }
1278            Ok(())
1279        })?
1280    }
1281
1282    pub async fn handle_update_diff_base(
1283        this: Model<Self>,
1284        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1285        mut cx: AsyncAppContext,
1286    ) -> Result<()> {
1287        this.update(&mut cx, |this, cx| {
1288            let buffer_id = envelope.payload.buffer_id;
1289            let buffer_id = BufferId::new(buffer_id)?;
1290            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1291                buffer.update(cx, |buffer, cx| {
1292                    buffer.set_diff_base(envelope.payload.diff_base, cx)
1293                });
1294            }
1295            Ok(())
1296        })?
1297    }
1298
1299    pub async fn handle_save_buffer(
1300        this: Model<Self>,
1301        envelope: TypedEnvelope<proto::SaveBuffer>,
1302        mut cx: AsyncAppContext,
1303    ) -> Result<proto::BufferSaved> {
1304        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1305        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1306            anyhow::Ok((
1307                this.get_existing(buffer_id)?,
1308                this.remote_id.context("project is not shared")?,
1309            ))
1310        })??;
1311        buffer
1312            .update(&mut cx, |buffer, _| {
1313                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1314            })?
1315            .await?;
1316        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1317
1318        if let Some(new_path) = envelope.payload.new_path {
1319            let new_path = ProjectPath::from_proto(new_path);
1320            this.update(&mut cx, |this, cx| {
1321                this.save_buffer_as(buffer.clone(), new_path, cx)
1322            })?
1323            .await?;
1324        } else {
1325            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1326                .await?;
1327        }
1328
1329        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1330            project_id,
1331            buffer_id: buffer_id.into(),
1332            version: serialize_version(buffer.saved_version()),
1333            mtime: buffer.saved_mtime().map(|time| time.into()),
1334        })
1335    }
1336
1337    pub async fn handle_close_buffer(
1338        this: Model<Self>,
1339        envelope: TypedEnvelope<proto::CloseBuffer>,
1340        mut cx: AsyncAppContext,
1341    ) -> Result<()> {
1342        let peer_id = envelope.sender_id;
1343        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1344        this.update(&mut cx, |this, _| {
1345            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1346                if shared.remove(&buffer_id) {
1347                    if shared.is_empty() {
1348                        this.shared_buffers.remove(&peer_id);
1349                    }
1350                    return;
1351                }
1352            };
1353            debug_panic!(
1354                "peer_id {} closed buffer_id {} which was either not open or already closed",
1355                peer_id,
1356                buffer_id
1357            )
1358        })
1359    }
1360
1361    pub async fn handle_buffer_saved(
1362        this: Model<Self>,
1363        envelope: TypedEnvelope<proto::BufferSaved>,
1364        mut cx: AsyncAppContext,
1365    ) -> Result<()> {
1366        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1367        let version = deserialize_version(&envelope.payload.version);
1368        let mtime = envelope.payload.mtime.map(|time| time.into());
1369        this.update(&mut cx, |this, cx| {
1370            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1371                buffer.update(cx, |buffer, cx| {
1372                    buffer.did_save(version, mtime, cx);
1373                });
1374            }
1375        })
1376    }
1377
1378    pub async fn handle_buffer_reloaded(
1379        this: Model<Self>,
1380        envelope: TypedEnvelope<proto::BufferReloaded>,
1381        mut cx: AsyncAppContext,
1382    ) -> Result<()> {
1383        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1384        let version = deserialize_version(&envelope.payload.version);
1385        let mtime = envelope.payload.mtime.map(|time| time.into());
1386        let line_ending = deserialize_line_ending(
1387            proto::LineEnding::from_i32(envelope.payload.line_ending)
1388                .ok_or_else(|| anyhow!("missing line ending"))?,
1389        );
1390        this.update(&mut cx, |this, cx| {
1391            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1392                buffer.update(cx, |buffer, cx| {
1393                    buffer.did_reload(version, line_ending, mtime, cx);
1394                });
1395            }
1396        })
1397    }
1398
1399    pub async fn handle_blame_buffer(
1400        this: Model<Self>,
1401        envelope: TypedEnvelope<proto::BlameBuffer>,
1402        mut cx: AsyncAppContext,
1403    ) -> Result<proto::BlameBufferResponse> {
1404        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1405        let version = deserialize_version(&envelope.payload.version);
1406        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1407        buffer
1408            .update(&mut cx, |buffer, _| {
1409                buffer.wait_for_version(version.clone())
1410            })?
1411            .await?;
1412        let blame = this
1413            .update(&mut cx, |this, cx| {
1414                this.blame_buffer(&buffer, Some(version), cx)
1415            })?
1416            .await?;
1417        Ok(serialize_blame_buffer_response(blame))
1418    }
1419
1420    pub async fn wait_for_loading_buffer(
1421        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1422    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1423        loop {
1424            if let Some(result) = receiver.borrow().as_ref() {
1425                match result {
1426                    Ok(buffer) => return Ok(buffer.to_owned()),
1427                    Err(e) => return Err(e.to_owned()),
1428                }
1429            }
1430            receiver.next().await;
1431        }
1432    }
1433
1434    pub fn create_buffer_for_peer(
1435        &mut self,
1436        buffer: &Model<Buffer>,
1437        peer_id: proto::PeerId,
1438        cx: &mut ModelContext<Self>,
1439    ) -> Task<Result<()>> {
1440        let buffer_id = buffer.read(cx).remote_id();
1441        if !self
1442            .shared_buffers
1443            .entry(peer_id)
1444            .or_default()
1445            .insert(buffer_id)
1446        {
1447            return Task::ready(Ok(()));
1448        }
1449
1450        let Some((client, project_id)) = self.downstream_client.clone().zip(self.remote_id) else {
1451            return Task::ready(Ok(()));
1452        };
1453
1454        cx.spawn(|this, mut cx| async move {
1455            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1456                return anyhow::Ok(());
1457            };
1458
1459            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1460            let operations = operations.await;
1461            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1462
1463            let initial_state = proto::CreateBufferForPeer {
1464                project_id,
1465                peer_id: Some(peer_id),
1466                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1467            };
1468
1469            if client.send(initial_state).log_err().is_some() {
1470                let client = client.clone();
1471                cx.background_executor()
1472                    .spawn(async move {
1473                        let mut chunks = split_operations(operations).peekable();
1474                        while let Some(chunk) = chunks.next() {
1475                            let is_last = chunks.peek().is_none();
1476                            client.send(proto::CreateBufferForPeer {
1477                                project_id,
1478                                peer_id: Some(peer_id),
1479                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1480                                    proto::BufferChunk {
1481                                        buffer_id: buffer_id.into(),
1482                                        operations: chunk,
1483                                        is_last,
1484                                    },
1485                                )),
1486                            })?;
1487                        }
1488                        anyhow::Ok(())
1489                    })
1490                    .await
1491                    .log_err();
1492            }
1493            Ok(())
1494        })
1495    }
1496
1497    pub fn forget_shared_buffers(&mut self) {
1498        self.shared_buffers.clear();
1499    }
1500
1501    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1502        self.shared_buffers.remove(peer_id);
1503    }
1504
1505    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1506        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1507            self.shared_buffers.insert(new_peer_id, buffers);
1508        }
1509    }
1510
1511    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<BufferId>> {
1512        &self.shared_buffers
1513    }
1514
1515    pub fn serialize_project_transaction_for_peer(
1516        &mut self,
1517        project_transaction: ProjectTransaction,
1518        peer_id: proto::PeerId,
1519        cx: &mut ModelContext<Self>,
1520    ) -> proto::ProjectTransaction {
1521        let mut serialized_transaction = proto::ProjectTransaction {
1522            buffer_ids: Default::default(),
1523            transactions: Default::default(),
1524        };
1525        for (buffer, transaction) in project_transaction.0 {
1526            self.create_buffer_for_peer(&buffer, peer_id, cx)
1527                .detach_and_log_err(cx);
1528            serialized_transaction
1529                .buffer_ids
1530                .push(buffer.read(cx).remote_id().into());
1531            serialized_transaction
1532                .transactions
1533                .push(language::proto::serialize_transaction(&transaction));
1534        }
1535        serialized_transaction
1536    }
1537
1538    pub async fn deserialize_project_transaction(
1539        this: WeakModel<Self>,
1540        message: proto::ProjectTransaction,
1541        push_to_history: bool,
1542        mut cx: AsyncAppContext,
1543    ) -> Result<ProjectTransaction> {
1544        let mut project_transaction = ProjectTransaction::default();
1545        for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) {
1546            let buffer_id = BufferId::new(buffer_id)?;
1547            let buffer = this
1548                .update(&mut cx, |this, cx| {
1549                    this.wait_for_remote_buffer(buffer_id, cx)
1550                })?
1551                .await?;
1552            let transaction = language::proto::deserialize_transaction(transaction)?;
1553            project_transaction.0.insert(buffer, transaction);
1554        }
1555
1556        for (buffer, transaction) in &project_transaction.0 {
1557            buffer
1558                .update(&mut cx, |buffer, _| {
1559                    buffer.wait_for_edits(transaction.edit_ids.iter().copied())
1560                })?
1561                .await?;
1562
1563            if push_to_history {
1564                buffer.update(&mut cx, |buffer, _| {
1565                    buffer.push_transaction(transaction.clone(), Instant::now());
1566                })?;
1567            }
1568        }
1569
1570        Ok(project_transaction)
1571    }
1572}
1573
1574impl OpenBuffer {
1575    fn upgrade(&self) -> Option<Model<Buffer>> {
1576        match self {
1577            OpenBuffer::Strong(handle) => Some(handle.clone()),
1578            OpenBuffer::Weak(handle) => handle.upgrade(),
1579            OpenBuffer::Operations(_) => None,
1580        }
1581    }
1582}
1583
1584fn is_not_found_error(error: &anyhow::Error) -> bool {
1585    error
1586        .root_cause()
1587        .downcast_ref::<io::Error>()
1588        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1589}
1590
1591fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1592    let entries = blame
1593        .entries
1594        .into_iter()
1595        .map(|entry| proto::BlameEntry {
1596            sha: entry.sha.as_bytes().into(),
1597            start_line: entry.range.start,
1598            end_line: entry.range.end,
1599            original_line_number: entry.original_line_number,
1600            author: entry.author.clone(),
1601            author_mail: entry.author_mail.clone(),
1602            author_time: entry.author_time,
1603            author_tz: entry.author_tz.clone(),
1604            committer: entry.committer.clone(),
1605            committer_mail: entry.committer_mail.clone(),
1606            committer_time: entry.committer_time,
1607            committer_tz: entry.committer_tz.clone(),
1608            summary: entry.summary.clone(),
1609            previous: entry.previous.clone(),
1610            filename: entry.filename.clone(),
1611        })
1612        .collect::<Vec<_>>();
1613
1614    let messages = blame
1615        .messages
1616        .into_iter()
1617        .map(|(oid, message)| proto::CommitMessage {
1618            oid: oid.as_bytes().into(),
1619            message,
1620        })
1621        .collect::<Vec<_>>();
1622
1623    let permalinks = blame
1624        .permalinks
1625        .into_iter()
1626        .map(|(oid, url)| proto::CommitPermalink {
1627            oid: oid.as_bytes().into(),
1628            permalink: url.to_string(),
1629        })
1630        .collect::<Vec<_>>();
1631
1632    proto::BlameBufferResponse {
1633        entries,
1634        messages,
1635        permalinks,
1636        remote_url: blame.remote_url,
1637    }
1638}
1639
1640fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1641    let entries = response
1642        .entries
1643        .into_iter()
1644        .filter_map(|entry| {
1645            Some(git::blame::BlameEntry {
1646                sha: git::Oid::from_bytes(&entry.sha).ok()?,
1647                range: entry.start_line..entry.end_line,
1648                original_line_number: entry.original_line_number,
1649                committer: entry.committer,
1650                committer_time: entry.committer_time,
1651                committer_tz: entry.committer_tz,
1652                committer_mail: entry.committer_mail,
1653                author: entry.author,
1654                author_mail: entry.author_mail,
1655                author_time: entry.author_time,
1656                author_tz: entry.author_tz,
1657                summary: entry.summary,
1658                previous: entry.previous,
1659                filename: entry.filename,
1660            })
1661        })
1662        .collect::<Vec<_>>();
1663
1664    let messages = response
1665        .messages
1666        .into_iter()
1667        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1668        .collect::<HashMap<_, _>>();
1669
1670    let permalinks = response
1671        .permalinks
1672        .into_iter()
1673        .filter_map(|permalink| {
1674            Some((
1675                git::Oid::from_bytes(&permalink.oid).ok()?,
1676                Url::from_str(&permalink.permalink).ok()?,
1677            ))
1678        })
1679        .collect::<HashMap<_, _>>();
1680
1681    Blame {
1682        entries,
1683        permalinks,
1684        messages,
1685        remote_url: response.remote_url,
1686    }
1687}