buffer_store.rs

   1use crate::{
   2    search::SearchQuery,
   3    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   4    Item, NoRepositoryError, ProjectPath,
   5};
   6use anyhow::{anyhow, Context as _, Result};
   7use client::Client;
   8use collections::{hash_map, HashMap, HashSet};
   9use fs::Fs;
  10use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
  11use git::blame::Blame;
  12use gpui::{
  13    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
  14};
  15use http_client::Url;
  16use language::{
  17    proto::{
  18        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
  19        split_operations,
  20    },
  21    Buffer, BufferEvent, Capability, File as _, Language, Operation,
  22};
  23use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
  24use smol::channel::Receiver;
  25use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
  26use text::BufferId;
  27use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
  28use worktree::{
  29    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
  30    WorktreeId,
  31};
  32
  33/// A set of open buffers.
  34pub struct BufferStore {
  35    state: BufferStoreState,
  36    downstream_client: Option<(AnyProtoClient, u64)>,
  37    worktree_store: Model<WorktreeStore>,
  38    opened_buffers: HashMap<BufferId, OpenBuffer>,
  39    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
  40    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
  41    #[allow(clippy::type_complexity)]
  42    loading_buffers_by_path: HashMap<
  43        ProjectPath,
  44        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
  45    >,
  46    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
  47    remote_buffer_listeners:
  48        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
  49    shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
  50}
  51
  52enum OpenBuffer {
  53    Buffer(WeakModel<Buffer>),
  54    Operations(Vec<Operation>),
  55}
  56
  57pub enum BufferStoreEvent {
  58    BufferAdded(Model<Buffer>),
  59    BufferDropped(BufferId),
  60    BufferChangedFilePath {
  61        buffer: Model<Buffer>,
  62        old_file: Option<Arc<dyn language::File>>,
  63    },
  64}
  65
  66enum BufferStoreState {
  67    Remote {
  68        shared_with_me: HashSet<Model<Buffer>>,
  69        upstream_client: AnyProtoClient,
  70        project_id: u64,
  71    },
  72    Local {},
  73}
  74
  75#[derive(Default, Debug)]
  76pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
  77
  78impl EventEmitter<BufferStoreEvent> for BufferStore {}
  79
  80impl BufferStore {
  81    pub fn init(client: &AnyProtoClient) {
  82        client.add_model_message_handler(Self::handle_buffer_reloaded);
  83        client.add_model_message_handler(Self::handle_buffer_saved);
  84        client.add_model_message_handler(Self::handle_update_buffer_file);
  85        client.add_model_message_handler(Self::handle_update_diff_base);
  86        client.add_model_request_handler(Self::handle_save_buffer);
  87        client.add_model_request_handler(Self::handle_blame_buffer);
  88        client.add_model_request_handler(Self::handle_reload_buffers);
  89    }
  90
  91    /// Creates a buffer store, optionally retaining its buffers.
  92    pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
  93        cx.subscribe(&worktree_store, |this, _, event, cx| {
  94            if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
  95                this.subscribe_to_worktree(worktree, cx);
  96            }
  97        })
  98        .detach();
  99
 100        Self {
 101            state: BufferStoreState::Local {},
 102            downstream_client: None,
 103            worktree_store,
 104            opened_buffers: Default::default(),
 105            remote_buffer_listeners: Default::default(),
 106            loading_remote_buffers_by_id: Default::default(),
 107            local_buffer_ids_by_path: Default::default(),
 108            local_buffer_ids_by_entry_id: Default::default(),
 109            loading_buffers_by_path: Default::default(),
 110            shared_buffers: Default::default(),
 111        }
 112    }
 113
 114    pub fn remote(
 115        worktree_store: Model<WorktreeStore>,
 116        upstream_client: AnyProtoClient,
 117        remote_id: u64,
 118        cx: &mut ModelContext<Self>,
 119    ) -> Self {
 120        cx.subscribe(&worktree_store, |this, _, event, cx| {
 121            if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
 122                this.subscribe_to_worktree(worktree, cx);
 123            }
 124        })
 125        .detach();
 126
 127        Self {
 128            state: BufferStoreState::Remote {
 129                shared_with_me: Default::default(),
 130                upstream_client,
 131                project_id: remote_id,
 132            },
 133            downstream_client: None,
 134            worktree_store,
 135            opened_buffers: Default::default(),
 136            remote_buffer_listeners: Default::default(),
 137            loading_remote_buffers_by_id: Default::default(),
 138            local_buffer_ids_by_path: Default::default(),
 139            local_buffer_ids_by_entry_id: Default::default(),
 140            loading_buffers_by_path: Default::default(),
 141            shared_buffers: Default::default(),
 142        }
 143    }
 144
 145    pub fn open_buffer(
 146        &mut self,
 147        project_path: ProjectPath,
 148        cx: &mut ModelContext<Self>,
 149    ) -> Task<Result<Model<Buffer>>> {
 150        let existing_buffer = self.get_by_path(&project_path, cx);
 151        if let Some(existing_buffer) = existing_buffer {
 152            return Task::ready(Ok(existing_buffer));
 153        }
 154
 155        let Some(worktree) = self
 156            .worktree_store
 157            .read(cx)
 158            .worktree_for_id(project_path.worktree_id, cx)
 159        else {
 160            return Task::ready(Err(anyhow!("no such worktree")));
 161        };
 162
 163        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
 164            // If the given path is already being loaded, then wait for that existing
 165            // task to complete and return the same buffer.
 166            hash_map::Entry::Occupied(e) => e.get().clone(),
 167
 168            // Otherwise, record the fact that this path is now being loaded.
 169            hash_map::Entry::Vacant(entry) => {
 170                let (mut tx, rx) = postage::watch::channel();
 171                entry.insert(rx.clone());
 172
 173                let project_path = project_path.clone();
 174                let load_buffer = match worktree.read(cx) {
 175                    Worktree::Local(_) => {
 176                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
 177                    }
 178                    Worktree::Remote(tree) => {
 179                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
 180                    }
 181                };
 182
 183                cx.spawn(move |this, mut cx| async move {
 184                    let load_result = load_buffer.await;
 185                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
 186                        // Record the fact that the buffer is no longer loading.
 187                        this.loading_buffers_by_path.remove(&project_path);
 188                        let buffer = load_result.map_err(Arc::new)?;
 189                        Ok(buffer)
 190                    })?);
 191                    anyhow::Ok(())
 192                })
 193                .detach();
 194                rx
 195            }
 196        };
 197
 198        cx.background_executor().spawn(async move {
 199            Self::wait_for_loading_buffer(loading_watch)
 200                .await
 201                .map_err(|e| e.cloned())
 202        })
 203    }
 204
 205    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
 206        cx.subscribe(worktree, |this, worktree, event, cx| {
 207            if worktree.read(cx).is_local() {
 208                match event {
 209                    worktree::Event::UpdatedEntries(changes) => {
 210                        this.local_worktree_entries_changed(&worktree, changes, cx);
 211                    }
 212                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
 213                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
 214                    }
 215                    _ => {}
 216                }
 217            }
 218        })
 219        .detach();
 220    }
 221
 222    fn local_worktree_entries_changed(
 223        &mut self,
 224        worktree_handle: &Model<Worktree>,
 225        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
 226        cx: &mut ModelContext<Self>,
 227    ) {
 228        let snapshot = worktree_handle.read(cx).snapshot();
 229        for (path, entry_id, _) in changes {
 230            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
 231        }
 232    }
 233
 234    fn local_worktree_git_repos_changed(
 235        &mut self,
 236        worktree_handle: Model<Worktree>,
 237        changed_repos: &UpdatedGitRepositoriesSet,
 238        cx: &mut ModelContext<Self>,
 239    ) {
 240        debug_assert!(worktree_handle.read(cx).is_local());
 241
 242        // Identify the loading buffers whose containing repository that has changed.
 243        let future_buffers = self
 244            .loading_buffers()
 245            .filter_map(|(project_path, receiver)| {
 246                if project_path.worktree_id != worktree_handle.read(cx).id() {
 247                    return None;
 248                }
 249                let path = &project_path.path;
 250                changed_repos
 251                    .iter()
 252                    .find(|(work_dir, _)| path.starts_with(work_dir))?;
 253                let path = path.clone();
 254                Some(async move {
 255                    Self::wait_for_loading_buffer(receiver)
 256                        .await
 257                        .ok()
 258                        .map(|buffer| (buffer, path))
 259                })
 260            })
 261            .collect::<FuturesUnordered<_>>();
 262
 263        // Identify the current buffers whose containing repository has changed.
 264        let current_buffers = self
 265            .buffers()
 266            .filter_map(|buffer| {
 267                let file = File::from_dyn(buffer.read(cx).file())?;
 268                if file.worktree != worktree_handle {
 269                    return None;
 270                }
 271                changed_repos
 272                    .iter()
 273                    .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
 274                Some((buffer, file.path.clone()))
 275            })
 276            .collect::<Vec<_>>();
 277
 278        if future_buffers.len() + current_buffers.len() == 0 {
 279            return;
 280        }
 281
 282        cx.spawn(move |this, mut cx| async move {
 283            // Wait for all of the buffers to load.
 284            let future_buffers = future_buffers.collect::<Vec<_>>().await;
 285
 286            // Reload the diff base for every buffer whose containing git repository has changed.
 287            let snapshot =
 288                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
 289            let diff_bases_by_buffer = cx
 290                .background_executor()
 291                .spawn(async move {
 292                    let mut diff_base_tasks = future_buffers
 293                        .into_iter()
 294                        .flatten()
 295                        .chain(current_buffers)
 296                        .filter_map(|(buffer, path)| {
 297                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
 298                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
 299                            Some(async move {
 300                                let base_text =
 301                                    local_repo_entry.repo().load_index_text(&relative_path);
 302                                Some((buffer, base_text))
 303                            })
 304                        })
 305                        .collect::<FuturesUnordered<_>>();
 306
 307                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
 308                    while let Some(diff_base) = diff_base_tasks.next().await {
 309                        if let Some(diff_base) = diff_base {
 310                            diff_bases.push(diff_base);
 311                        }
 312                    }
 313                    diff_bases
 314                })
 315                .await;
 316
 317            this.update(&mut cx, |this, cx| {
 318                // Assign the new diff bases on all of the buffers.
 319                for (buffer, diff_base) in diff_bases_by_buffer {
 320                    let buffer_id = buffer.update(cx, |buffer, cx| {
 321                        buffer.set_diff_base(diff_base.clone(), cx);
 322                        buffer.remote_id().to_proto()
 323                    });
 324                    if let Some((client, project_id)) = &this.downstream_client {
 325                        client
 326                            .send(proto::UpdateDiffBase {
 327                                project_id: *project_id,
 328                                buffer_id,
 329                                diff_base,
 330                            })
 331                            .log_err();
 332                    }
 333                }
 334            })
 335        })
 336        .detach_and_log_err(cx);
 337    }
 338
 339    fn open_local_buffer_internal(
 340        &mut self,
 341        path: Arc<Path>,
 342        worktree: Model<Worktree>,
 343        cx: &mut ModelContext<Self>,
 344    ) -> Task<Result<Model<Buffer>>> {
 345        let load_buffer = worktree.update(cx, |worktree, cx| {
 346            let load_file = worktree.load_file(path.as_ref(), cx);
 347            let reservation = cx.reserve_model();
 348            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
 349            cx.spawn(move |_, mut cx| async move {
 350                let loaded = load_file.await?;
 351                let text_buffer = cx
 352                    .background_executor()
 353                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
 354                    .await;
 355                cx.insert_model(reservation, |_| {
 356                    Buffer::build(
 357                        text_buffer,
 358                        loaded.diff_base,
 359                        Some(loaded.file),
 360                        Capability::ReadWrite,
 361                    )
 362                })
 363            })
 364        });
 365
 366        cx.spawn(move |this, mut cx| async move {
 367            let buffer = match load_buffer.await {
 368                Ok(buffer) => Ok(buffer),
 369                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
 370                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
 371                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
 372                    Buffer::build(
 373                        text_buffer,
 374                        None,
 375                        Some(Arc::new(File {
 376                            worktree,
 377                            path,
 378                            mtime: None,
 379                            entry_id: None,
 380                            is_local: true,
 381                            is_deleted: false,
 382                            is_private: false,
 383                        })),
 384                        Capability::ReadWrite,
 385                    )
 386                }),
 387                Err(e) => Err(e),
 388            }?;
 389            this.update(&mut cx, |this, cx| {
 390                this.add_buffer(buffer.clone(), cx).log_err();
 391            })?;
 392            Ok(buffer)
 393        })
 394    }
 395
 396    fn open_remote_buffer_internal(
 397        &self,
 398        path: &Arc<Path>,
 399        worktree: &RemoteWorktree,
 400        cx: &ModelContext<Self>,
 401    ) -> Task<Result<Model<Buffer>>> {
 402        let worktree_id = worktree.id().to_proto();
 403        let project_id = worktree.project_id();
 404        let client = worktree.client();
 405        let path_string = path.clone().to_string_lossy().to_string();
 406        cx.spawn(move |this, mut cx| async move {
 407            let response = client
 408                .request(proto::OpenBufferByPath {
 409                    project_id,
 410                    worktree_id,
 411                    path: path_string,
 412                })
 413                .await?;
 414            let buffer_id = BufferId::new(response.buffer_id)?;
 415            this.update(&mut cx, |this, cx| {
 416                this.wait_for_remote_buffer(buffer_id, cx)
 417            })?
 418            .await
 419        })
 420    }
 421
 422    pub fn create_buffer(
 423        &mut self,
 424        remote_client: Option<(AnyProtoClient, u64)>,
 425        cx: &mut ModelContext<Self>,
 426    ) -> Task<Result<Model<Buffer>>> {
 427        if let Some((remote_client, project_id)) = remote_client {
 428            let create = remote_client.request(proto::OpenNewBuffer { project_id });
 429            cx.spawn(|this, mut cx| async move {
 430                let response = create.await?;
 431                let buffer_id = BufferId::new(response.buffer_id)?;
 432
 433                this.update(&mut cx, |this, cx| {
 434                    this.wait_for_remote_buffer(buffer_id, cx)
 435                })?
 436                .await
 437            })
 438        } else {
 439            Task::ready(Ok(self.create_local_buffer("", None, cx)))
 440        }
 441    }
 442
 443    pub fn create_local_buffer(
 444        &mut self,
 445        text: &str,
 446        language: Option<Arc<Language>>,
 447        cx: &mut ModelContext<Self>,
 448    ) -> Model<Buffer> {
 449        let buffer = cx.new_model(|cx| {
 450            Buffer::local(text, cx)
 451                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
 452        });
 453        self.add_buffer(buffer.clone(), cx).log_err();
 454        buffer
 455    }
 456
 457    pub fn save_buffer(
 458        &mut self,
 459        buffer: Model<Buffer>,
 460        cx: &mut ModelContext<Self>,
 461    ) -> Task<Result<()>> {
 462        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 463            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
 464        };
 465        match file.worktree.read(cx) {
 466            Worktree::Local(_) => {
 467                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
 468            }
 469            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
 470        }
 471    }
 472
 473    pub fn save_buffer_as(
 474        &mut self,
 475        buffer: Model<Buffer>,
 476        path: ProjectPath,
 477        cx: &mut ModelContext<Self>,
 478    ) -> Task<Result<()>> {
 479        let Some(worktree) = self
 480            .worktree_store
 481            .read(cx)
 482            .worktree_for_id(path.worktree_id, cx)
 483        else {
 484            return Task::ready(Err(anyhow!("no such worktree")));
 485        };
 486
 487        let old_file = buffer.read(cx).file().cloned();
 488
 489        let task = match worktree.read(cx) {
 490            Worktree::Local(_) => {
 491                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
 492            }
 493            Worktree::Remote(tree) => {
 494                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
 495            }
 496        };
 497        cx.spawn(|this, mut cx| async move {
 498            task.await?;
 499            this.update(&mut cx, |_, cx| {
 500                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
 501            })
 502        })
 503    }
 504
 505    fn save_local_buffer(
 506        &self,
 507        worktree: Model<Worktree>,
 508        buffer_handle: Model<Buffer>,
 509        path: Arc<Path>,
 510        mut has_changed_file: bool,
 511        cx: &mut ModelContext<Self>,
 512    ) -> Task<Result<()>> {
 513        let buffer = buffer_handle.read(cx);
 514        let text = buffer.as_rope().clone();
 515        let line_ending = buffer.line_ending();
 516        let version = buffer.version();
 517        let buffer_id = buffer.remote_id();
 518        if buffer.file().is_some_and(|file| !file.is_created()) {
 519            has_changed_file = true;
 520        }
 521
 522        let save = worktree.update(cx, |worktree, cx| {
 523            worktree.write_file(path.as_ref(), text, line_ending, cx)
 524        });
 525
 526        cx.spawn(move |this, mut cx| async move {
 527            let new_file = save.await?;
 528            let mtime = new_file.mtime;
 529            this.update(&mut cx, |this, cx| {
 530                if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
 531                    let project_id = *project_id;
 532                    if has_changed_file {
 533                        downstream_client
 534                            .send(proto::UpdateBufferFile {
 535                                project_id,
 536                                buffer_id: buffer_id.to_proto(),
 537                                file: Some(language::File::to_proto(&*new_file, cx)),
 538                            })
 539                            .log_err();
 540                    }
 541                    downstream_client
 542                        .send(proto::BufferSaved {
 543                            project_id,
 544                            buffer_id: buffer_id.to_proto(),
 545                            version: serialize_version(&version),
 546                            mtime: mtime.map(|time| time.into()),
 547                        })
 548                        .log_err();
 549                }
 550            })?;
 551            buffer_handle.update(&mut cx, |buffer, cx| {
 552                if has_changed_file {
 553                    buffer.file_updated(new_file, cx);
 554                }
 555                buffer.did_save(version.clone(), mtime, cx);
 556            })
 557        })
 558    }
 559
 560    fn save_remote_buffer(
 561        &self,
 562        buffer_handle: Model<Buffer>,
 563        new_path: Option<proto::ProjectPath>,
 564        tree: &RemoteWorktree,
 565        cx: &ModelContext<Self>,
 566    ) -> Task<Result<()>> {
 567        let buffer = buffer_handle.read(cx);
 568        let buffer_id = buffer.remote_id().into();
 569        let version = buffer.version();
 570        let rpc = tree.client();
 571        let project_id = tree.project_id();
 572        cx.spawn(move |_, mut cx| async move {
 573            let response = rpc
 574                .request(proto::SaveBuffer {
 575                    project_id,
 576                    buffer_id,
 577                    new_path,
 578                    version: serialize_version(&version),
 579                })
 580                .await?;
 581            let version = deserialize_version(&response.version);
 582            let mtime = response.mtime.map(|mtime| mtime.into());
 583
 584            buffer_handle.update(&mut cx, |buffer, cx| {
 585                buffer.did_save(version.clone(), mtime, cx);
 586            })?;
 587
 588            Ok(())
 589        })
 590    }
 591
 592    pub fn blame_buffer(
 593        &self,
 594        buffer: &Model<Buffer>,
 595        version: Option<clock::Global>,
 596        cx: &AppContext,
 597    ) -> Task<Result<Blame>> {
 598        let buffer = buffer.read(cx);
 599        let Some(file) = File::from_dyn(buffer.file()) else {
 600            return Task::ready(Err(anyhow!("buffer has no file")));
 601        };
 602
 603        match file.worktree.clone().read(cx) {
 604            Worktree::Local(worktree) => {
 605                let worktree = worktree.snapshot();
 606                let blame_params = maybe!({
 607                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
 608                        Some(repo_for_path) => repo_for_path,
 609                        None => anyhow::bail!(NoRepositoryError {}),
 610                    };
 611
 612                    let relative_path = repo_entry
 613                        .relativize(&worktree, &file.path)
 614                        .context("failed to relativize buffer path")?;
 615
 616                    let repo = local_repo_entry.repo().clone();
 617
 618                    let content = match version {
 619                        Some(version) => buffer.rope_for_version(&version).clone(),
 620                        None => buffer.as_rope().clone(),
 621                    };
 622
 623                    anyhow::Ok((repo, relative_path, content))
 624                });
 625
 626                cx.background_executor().spawn(async move {
 627                    let (repo, relative_path, content) = blame_params?;
 628                    repo.blame(&relative_path, content)
 629                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
 630                })
 631            }
 632            Worktree::Remote(worktree) => {
 633                let buffer_id = buffer.remote_id();
 634                let version = buffer.version();
 635                let project_id = worktree.project_id();
 636                let client = worktree.client();
 637                cx.spawn(|_| async move {
 638                    let response = client
 639                        .request(proto::BlameBuffer {
 640                            project_id,
 641                            buffer_id: buffer_id.into(),
 642                            version: serialize_version(&version),
 643                        })
 644                        .await?;
 645                    Ok(deserialize_blame_buffer_response(response))
 646                })
 647            }
 648        }
 649    }
 650
 651    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
 652        let remote_id = buffer.read(cx).remote_id();
 653        let is_remote = buffer.read(cx).replica_id() != 0;
 654        let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
 655
 656        let handle = cx.handle().downgrade();
 657        buffer.update(cx, move |_, cx| {
 658            cx.on_release(move |buffer, cx| {
 659                handle
 660                    .update(cx, |_, cx| {
 661                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
 662                    })
 663                    .ok();
 664            })
 665            .detach()
 666        });
 667
 668        match self.opened_buffers.entry(remote_id) {
 669            hash_map::Entry::Vacant(entry) => {
 670                entry.insert(open_buffer);
 671            }
 672            hash_map::Entry::Occupied(mut entry) => {
 673                if let OpenBuffer::Operations(operations) = entry.get_mut() {
 674                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx));
 675                } else if entry.get().upgrade().is_some() {
 676                    if is_remote {
 677                        return Ok(());
 678                    } else {
 679                        debug_panic!("buffer {} was already registered", remote_id);
 680                        Err(anyhow!("buffer {} was already registered", remote_id))?;
 681                    }
 682                }
 683                entry.insert(open_buffer);
 684            }
 685        }
 686
 687        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
 688            for sender in senders {
 689                sender.send(Ok(buffer.clone())).ok();
 690            }
 691        }
 692
 693        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
 694            if file.is_local {
 695                self.local_buffer_ids_by_path.insert(
 696                    ProjectPath {
 697                        worktree_id: file.worktree_id(cx),
 698                        path: file.path.clone(),
 699                    },
 700                    remote_id,
 701                );
 702
 703                if let Some(entry_id) = file.entry_id {
 704                    self.local_buffer_ids_by_entry_id
 705                        .insert(entry_id, remote_id);
 706                }
 707            }
 708        }
 709
 710        cx.subscribe(&buffer, Self::on_buffer_event).detach();
 711        cx.emit(BufferStoreEvent::BufferAdded(buffer));
 712        Ok(())
 713    }
 714
 715    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
 716        self.opened_buffers
 717            .values()
 718            .filter_map(|buffer| buffer.upgrade())
 719    }
 720
 721    pub fn loading_buffers(
 722        &self,
 723    ) -> impl Iterator<
 724        Item = (
 725            &ProjectPath,
 726            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
 727        ),
 728    > {
 729        self.loading_buffers_by_path
 730            .iter()
 731            .map(|(path, rx)| (path, rx.clone()))
 732    }
 733
 734    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
 735        self.buffers().find_map(|buffer| {
 736            let file = File::from_dyn(buffer.read(cx).file())?;
 737            if file.worktree_id(cx) == path.worktree_id && file.path == path.path {
 738                Some(buffer)
 739            } else {
 740                None
 741            }
 742        })
 743    }
 744
 745    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 746        self.opened_buffers
 747            .get(&buffer_id)
 748            .and_then(|buffer| buffer.upgrade())
 749    }
 750
 751    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
 752        self.get(buffer_id)
 753            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
 754    }
 755
 756    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
 757        self.get(buffer_id)
 758            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
 759    }
 760
 761    pub fn wait_for_remote_buffer(
 762        &mut self,
 763        id: BufferId,
 764        cx: &mut AppContext,
 765    ) -> Task<Result<Model<Buffer>>> {
 766        let buffer = self.get(id);
 767        if let Some(buffer) = buffer {
 768            return Task::ready(Ok(buffer));
 769        }
 770        let (tx, rx) = oneshot::channel();
 771        self.remote_buffer_listeners.entry(id).or_default().push(tx);
 772        cx.background_executor().spawn(async move { rx.await? })
 773    }
 774
 775    pub fn buffer_version_info(
 776        &self,
 777        cx: &AppContext,
 778    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
 779        let buffers = self
 780            .buffers()
 781            .map(|buffer| {
 782                let buffer = buffer.read(cx);
 783                proto::BufferVersion {
 784                    id: buffer.remote_id().into(),
 785                    version: language::proto::serialize_version(&buffer.version),
 786                }
 787            })
 788            .collect();
 789        let incomplete_buffer_ids = self
 790            .loading_remote_buffers_by_id
 791            .keys()
 792            .copied()
 793            .collect::<Vec<_>>();
 794        (buffers, incomplete_buffer_ids)
 795    }
 796
 797    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
 798        self.drop_unnecessary_buffers(cx);
 799
 800        for buffer in self.buffers() {
 801            buffer.update(cx, |buffer, cx| {
 802                buffer.set_capability(Capability::ReadOnly, cx)
 803            });
 804        }
 805
 806        // Wake up all futures currently waiting on a buffer to get opened,
 807        // to give them a chance to fail now that we've disconnected.
 808        self.remote_buffer_listeners.clear();
 809    }
 810
 811    pub fn shared(
 812        &mut self,
 813        remote_id: u64,
 814        downstream_client: AnyProtoClient,
 815        _cx: &mut AppContext,
 816    ) {
 817        self.downstream_client = Some((downstream_client, remote_id));
 818    }
 819
 820    pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
 821        self.downstream_client.take();
 822        self.forget_shared_buffers();
 823    }
 824
 825    fn drop_unnecessary_buffers(&mut self, cx: &mut AppContext) {
 826        for open_buffer in self.opened_buffers.values_mut() {
 827            if let Some(buffer) = open_buffer.upgrade() {
 828                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
 829            }
 830        }
 831    }
 832
 833    pub fn discard_incomplete(&mut self) {
 834        self.opened_buffers
 835            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
 836    }
 837
 838    pub fn find_search_candidates(
 839        &mut self,
 840        query: &SearchQuery,
 841        mut limit: usize,
 842        fs: Arc<dyn Fs>,
 843        cx: &mut ModelContext<Self>,
 844    ) -> Receiver<Model<Buffer>> {
 845        let (tx, rx) = smol::channel::unbounded();
 846        let mut open_buffers = HashSet::default();
 847        let mut unnamed_buffers = Vec::new();
 848        for handle in self.buffers() {
 849            let buffer = handle.read(cx);
 850            if let Some(entry_id) = buffer.entry_id(cx) {
 851                open_buffers.insert(entry_id);
 852            } else {
 853                limit = limit.saturating_sub(1);
 854                unnamed_buffers.push(handle)
 855            };
 856        }
 857
 858        const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
 859        let mut project_paths_rx = self
 860            .worktree_store
 861            .update(cx, |worktree_store, cx| {
 862                worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
 863            })
 864            .chunks(MAX_CONCURRENT_BUFFER_OPENS);
 865
 866        cx.spawn(|this, mut cx| async move {
 867            for buffer in unnamed_buffers {
 868                tx.send(buffer).await.ok();
 869            }
 870
 871            while let Some(project_paths) = project_paths_rx.next().await {
 872                let buffers = this.update(&mut cx, |this, cx| {
 873                    project_paths
 874                        .into_iter()
 875                        .map(|project_path| this.open_buffer(project_path, cx))
 876                        .collect::<Vec<_>>()
 877                })?;
 878                for buffer_task in buffers {
 879                    if let Some(buffer) = buffer_task.await.log_err() {
 880                        if tx.send(buffer).await.is_err() {
 881                            return anyhow::Ok(());
 882                        }
 883                    }
 884                }
 885            }
 886            anyhow::Ok(())
 887        })
 888        .detach();
 889        rx
 890    }
 891
 892    fn on_buffer_event(
 893        &mut self,
 894        buffer: Model<Buffer>,
 895        event: &BufferEvent,
 896        cx: &mut ModelContext<Self>,
 897    ) {
 898        match event {
 899            BufferEvent::FileHandleChanged => {
 900                self.buffer_changed_file(buffer, cx);
 901            }
 902            BufferEvent::Reloaded => {
 903                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
 904                    return;
 905                };
 906                let buffer = buffer.read(cx);
 907                downstream_client
 908                    .send(proto::BufferReloaded {
 909                        project_id: *project_id,
 910                        buffer_id: buffer.remote_id().to_proto(),
 911                        version: serialize_version(&buffer.version()),
 912                        mtime: buffer.saved_mtime().map(|t| t.into()),
 913                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
 914                    })
 915                    .log_err();
 916            }
 917            _ => {}
 918        }
 919    }
 920
 921    fn local_worktree_entry_changed(
 922        &mut self,
 923        entry_id: ProjectEntryId,
 924        path: &Arc<Path>,
 925        worktree: &Model<worktree::Worktree>,
 926        snapshot: &worktree::Snapshot,
 927        cx: &mut ModelContext<Self>,
 928    ) -> Option<()> {
 929        let project_path = ProjectPath {
 930            worktree_id: snapshot.id(),
 931            path: path.clone(),
 932        };
 933        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
 934            Some(&buffer_id) => buffer_id,
 935            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
 936        };
 937        let buffer = if let Some(buffer) = self.get(buffer_id) {
 938            buffer
 939        } else {
 940            self.opened_buffers.remove(&buffer_id);
 941            self.local_buffer_ids_by_path.remove(&project_path);
 942            self.local_buffer_ids_by_entry_id.remove(&entry_id);
 943            return None;
 944        };
 945
 946        let events = buffer.update(cx, |buffer, cx| {
 947            let file = buffer.file()?;
 948            let old_file = File::from_dyn(Some(file))?;
 949            if old_file.worktree != *worktree {
 950                return None;
 951            }
 952
 953            let new_file = if let Some(entry) = old_file
 954                .entry_id
 955                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
 956            {
 957                File {
 958                    is_local: true,
 959                    entry_id: Some(entry.id),
 960                    mtime: entry.mtime,
 961                    path: entry.path.clone(),
 962                    worktree: worktree.clone(),
 963                    is_deleted: false,
 964                    is_private: entry.is_private,
 965                }
 966            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
 967                File {
 968                    is_local: true,
 969                    entry_id: Some(entry.id),
 970                    mtime: entry.mtime,
 971                    path: entry.path.clone(),
 972                    worktree: worktree.clone(),
 973                    is_deleted: false,
 974                    is_private: entry.is_private,
 975                }
 976            } else {
 977                File {
 978                    is_local: true,
 979                    entry_id: old_file.entry_id,
 980                    path: old_file.path.clone(),
 981                    mtime: old_file.mtime,
 982                    worktree: worktree.clone(),
 983                    is_deleted: true,
 984                    is_private: old_file.is_private,
 985                }
 986            };
 987
 988            if new_file == *old_file {
 989                return None;
 990            }
 991
 992            let mut events = Vec::new();
 993            if new_file.path != old_file.path {
 994                self.local_buffer_ids_by_path.remove(&ProjectPath {
 995                    path: old_file.path.clone(),
 996                    worktree_id: old_file.worktree_id(cx),
 997                });
 998                self.local_buffer_ids_by_path.insert(
 999                    ProjectPath {
1000                        worktree_id: new_file.worktree_id(cx),
1001                        path: new_file.path.clone(),
1002                    },
1003                    buffer_id,
1004                );
1005                events.push(BufferStoreEvent::BufferChangedFilePath {
1006                    buffer: cx.handle(),
1007                    old_file: buffer.file().cloned(),
1008                });
1009            }
1010
1011            if new_file.entry_id != old_file.entry_id {
1012                if let Some(entry_id) = old_file.entry_id {
1013                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
1014                }
1015                if let Some(entry_id) = new_file.entry_id {
1016                    self.local_buffer_ids_by_entry_id
1017                        .insert(entry_id, buffer_id);
1018                }
1019            }
1020
1021            if let Some((client, project_id)) = &self.downstream_client {
1022                client
1023                    .send(proto::UpdateBufferFile {
1024                        project_id: *project_id,
1025                        buffer_id: buffer_id.to_proto(),
1026                        file: Some(new_file.to_proto(cx)),
1027                    })
1028                    .ok();
1029            }
1030
1031            buffer.file_updated(Arc::new(new_file), cx);
1032            Some(events)
1033        })?;
1034
1035        for event in events {
1036            cx.emit(event);
1037        }
1038
1039        None
1040    }
1041
1042    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
1043        let file = File::from_dyn(buffer.read(cx).file())?;
1044
1045        let remote_id = buffer.read(cx).remote_id();
1046        if let Some(entry_id) = file.entry_id {
1047            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
1048                Some(_) => {
1049                    return None;
1050                }
1051                None => {
1052                    self.local_buffer_ids_by_entry_id
1053                        .insert(entry_id, remote_id);
1054                }
1055            }
1056        };
1057        self.local_buffer_ids_by_path.insert(
1058            ProjectPath {
1059                worktree_id: file.worktree_id(cx),
1060                path: file.path.clone(),
1061            },
1062            remote_id,
1063        );
1064
1065        Some(())
1066    }
1067
1068    pub async fn handle_update_buffer(
1069        this: Model<Self>,
1070        envelope: TypedEnvelope<proto::UpdateBuffer>,
1071        mut cx: AsyncAppContext,
1072    ) -> Result<proto::Ack> {
1073        let payload = envelope.payload.clone();
1074        let buffer_id = BufferId::new(payload.buffer_id)?;
1075        let ops = payload
1076            .operations
1077            .into_iter()
1078            .map(language::proto::deserialize_operation)
1079            .collect::<Result<Vec<_>, _>>()?;
1080        this.update(&mut cx, |this, cx| {
1081            match this.opened_buffers.entry(buffer_id) {
1082                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
1083                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
1084                    OpenBuffer::Buffer(buffer) => {
1085                        if let Some(buffer) = buffer.upgrade() {
1086                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
1087                        }
1088                    }
1089                },
1090                hash_map::Entry::Vacant(e) => {
1091                    e.insert(OpenBuffer::Operations(ops));
1092                }
1093            }
1094            Ok(proto::Ack {})
1095        })?
1096    }
1097
1098    pub fn handle_synchronize_buffers(
1099        &mut self,
1100        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
1101        cx: &mut ModelContext<Self>,
1102        client: Arc<Client>,
1103    ) -> Result<proto::SynchronizeBuffersResponse> {
1104        let project_id = envelope.payload.project_id;
1105        let mut response = proto::SynchronizeBuffersResponse {
1106            buffers: Default::default(),
1107        };
1108        let Some(guest_id) = envelope.original_sender_id else {
1109            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
1110        };
1111
1112        self.shared_buffers.entry(guest_id).or_default().clear();
1113        for buffer in envelope.payload.buffers {
1114            let buffer_id = BufferId::new(buffer.id)?;
1115            let remote_version = language::proto::deserialize_version(&buffer.version);
1116            if let Some(buffer) = self.get(buffer_id) {
1117                self.shared_buffers
1118                    .entry(guest_id)
1119                    .or_default()
1120                    .insert(buffer.clone());
1121
1122                let buffer = buffer.read(cx);
1123                response.buffers.push(proto::BufferVersion {
1124                    id: buffer_id.into(),
1125                    version: language::proto::serialize_version(&buffer.version),
1126                });
1127
1128                let operations = buffer.serialize_ops(Some(remote_version), cx);
1129                let client = client.clone();
1130                if let Some(file) = buffer.file() {
1131                    client
1132                        .send(proto::UpdateBufferFile {
1133                            project_id,
1134                            buffer_id: buffer_id.into(),
1135                            file: Some(file.to_proto(cx)),
1136                        })
1137                        .log_err();
1138                }
1139
1140                client
1141                    .send(proto::UpdateDiffBase {
1142                        project_id,
1143                        buffer_id: buffer_id.into(),
1144                        diff_base: buffer.diff_base().map(ToString::to_string),
1145                    })
1146                    .log_err();
1147
1148                client
1149                    .send(proto::BufferReloaded {
1150                        project_id,
1151                        buffer_id: buffer_id.into(),
1152                        version: language::proto::serialize_version(buffer.saved_version()),
1153                        mtime: buffer.saved_mtime().map(|time| time.into()),
1154                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
1155                            as i32,
1156                    })
1157                    .log_err();
1158
1159                cx.background_executor()
1160                    .spawn(
1161                        async move {
1162                            let operations = operations.await;
1163                            for chunk in split_operations(operations) {
1164                                client
1165                                    .request(proto::UpdateBuffer {
1166                                        project_id,
1167                                        buffer_id: buffer_id.into(),
1168                                        operations: chunk,
1169                                    })
1170                                    .await?;
1171                            }
1172                            anyhow::Ok(())
1173                        }
1174                        .log_err(),
1175                    )
1176                    .detach();
1177            }
1178        }
1179        Ok(response)
1180    }
1181
1182    pub fn handle_create_buffer_for_peer(
1183        &mut self,
1184        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
1185        replica_id: u16,
1186        capability: Capability,
1187        cx: &mut ModelContext<Self>,
1188    ) -> Result<()> {
1189        match envelope
1190            .payload
1191            .variant
1192            .ok_or_else(|| anyhow!("missing variant"))?
1193        {
1194            proto::create_buffer_for_peer::Variant::State(mut state) => {
1195                let buffer_id = BufferId::new(state.id)?;
1196
1197                let buffer_result = maybe!({
1198                    let mut buffer_file = None;
1199                    if let Some(file) = state.file.take() {
1200                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
1201                        let worktree = self
1202                            .worktree_store
1203                            .read(cx)
1204                            .worktree_for_id(worktree_id, cx)
1205                            .ok_or_else(|| {
1206                                anyhow!("no worktree found for id {}", file.worktree_id)
1207                            })?;
1208                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
1209                            as Arc<dyn language::File>);
1210                    }
1211                    Buffer::from_proto(replica_id, capability, state, buffer_file)
1212                });
1213
1214                match buffer_result {
1215                    Ok(buffer) => {
1216                        let buffer = cx.new_model(|_| buffer);
1217                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
1218                    }
1219                    Err(error) => {
1220                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1221                            for listener in listeners {
1222                                listener.send(Err(anyhow!(error.cloned()))).ok();
1223                            }
1224                        }
1225                    }
1226                }
1227            }
1228            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
1229                let buffer_id = BufferId::new(chunk.buffer_id)?;
1230                let buffer = self
1231                    .loading_remote_buffers_by_id
1232                    .get(&buffer_id)
1233                    .cloned()
1234                    .ok_or_else(|| {
1235                        anyhow!(
1236                            "received chunk for buffer {} without initial state",
1237                            chunk.buffer_id
1238                        )
1239                    })?;
1240
1241                let result = maybe!({
1242                    let operations = chunk
1243                        .operations
1244                        .into_iter()
1245                        .map(language::proto::deserialize_operation)
1246                        .collect::<Result<Vec<_>>>()?;
1247                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
1248                    anyhow::Ok(())
1249                });
1250
1251                if let Err(error) = result {
1252                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1253                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
1254                        for listener in listeners {
1255                            listener.send(Err(error.cloned())).ok();
1256                        }
1257                    }
1258                } else if chunk.is_last {
1259                    self.loading_remote_buffers_by_id.remove(&buffer_id);
1260                    // retain buffers sent by peers to avoid races.
1261                    match &mut self.state {
1262                        BufferStoreState::Remote {
1263                            ref mut shared_with_me,
1264                            upstream_client,
1265                            ..
1266                        } => {
1267                            if upstream_client.is_via_collab() {
1268                                shared_with_me.insert(buffer.clone());
1269                            }
1270                        }
1271                        _ => {}
1272                    }
1273                    self.add_buffer(buffer, cx)?;
1274                }
1275            }
1276        }
1277
1278        Ok(())
1279    }
1280
1281    pub async fn handle_update_buffer_file(
1282        this: Model<Self>,
1283        envelope: TypedEnvelope<proto::UpdateBufferFile>,
1284        mut cx: AsyncAppContext,
1285    ) -> Result<()> {
1286        let buffer_id = envelope.payload.buffer_id;
1287        let buffer_id = BufferId::new(buffer_id)?;
1288
1289        this.update(&mut cx, |this, cx| {
1290            let payload = envelope.payload.clone();
1291            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1292                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
1293                let worktree = this
1294                    .worktree_store
1295                    .read(cx)
1296                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
1297                    .ok_or_else(|| anyhow!("no such worktree"))?;
1298                let file = File::from_proto(file, worktree, cx)?;
1299                let old_file = buffer.update(cx, |buffer, cx| {
1300                    let old_file = buffer.file().cloned();
1301                    let new_path = file.path.clone();
1302                    buffer.file_updated(Arc::new(file), cx);
1303                    if old_file
1304                        .as_ref()
1305                        .map_or(true, |old| *old.path() != new_path)
1306                    {
1307                        Some(old_file)
1308                    } else {
1309                        None
1310                    }
1311                });
1312                if let Some(old_file) = old_file {
1313                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
1314                }
1315            }
1316            Ok(())
1317        })?
1318    }
1319
1320    pub async fn handle_update_diff_base(
1321        this: Model<Self>,
1322        envelope: TypedEnvelope<proto::UpdateDiffBase>,
1323        mut cx: AsyncAppContext,
1324    ) -> Result<()> {
1325        this.update(&mut cx, |this, cx| {
1326            let buffer_id = envelope.payload.buffer_id;
1327            let buffer_id = BufferId::new(buffer_id)?;
1328            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1329                buffer.update(cx, |buffer, cx| {
1330                    buffer.set_diff_base(envelope.payload.diff_base, cx)
1331                });
1332            }
1333            Ok(())
1334        })?
1335    }
1336
1337    pub async fn handle_save_buffer(
1338        this: Model<Self>,
1339        envelope: TypedEnvelope<proto::SaveBuffer>,
1340        mut cx: AsyncAppContext,
1341    ) -> Result<proto::BufferSaved> {
1342        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1343        let (buffer, project_id) = this.update(&mut cx, |this, _| {
1344            anyhow::Ok((
1345                this.get_existing(buffer_id)?,
1346                this.downstream_client
1347                    .as_ref()
1348                    .map(|(_, project_id)| *project_id)
1349                    .context("project is not shared")?,
1350            ))
1351        })??;
1352        buffer
1353            .update(&mut cx, |buffer, _| {
1354                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
1355            })?
1356            .await?;
1357        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
1358
1359        if let Some(new_path) = envelope.payload.new_path {
1360            let new_path = ProjectPath::from_proto(new_path);
1361            this.update(&mut cx, |this, cx| {
1362                this.save_buffer_as(buffer.clone(), new_path, cx)
1363            })?
1364            .await?;
1365        } else {
1366            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
1367                .await?;
1368        }
1369
1370        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
1371            project_id,
1372            buffer_id: buffer_id.into(),
1373            version: serialize_version(buffer.saved_version()),
1374            mtime: buffer.saved_mtime().map(|time| time.into()),
1375        })
1376    }
1377
1378    pub async fn handle_close_buffer(
1379        this: Model<Self>,
1380        envelope: TypedEnvelope<proto::CloseBuffer>,
1381        mut cx: AsyncAppContext,
1382    ) -> Result<()> {
1383        let peer_id = envelope.sender_id;
1384        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1385        this.update(&mut cx, |this, _| {
1386            if let Some(buffer) = this.get(buffer_id) {
1387                if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
1388                    if shared.remove(&buffer) {
1389                        if shared.is_empty() {
1390                            this.shared_buffers.remove(&peer_id);
1391                        }
1392                        return;
1393                    }
1394                }
1395            };
1396            debug_panic!(
1397                "peer_id {} closed buffer_id {} which was either not open or already closed",
1398                peer_id,
1399                buffer_id
1400            )
1401        })
1402    }
1403
1404    pub async fn handle_buffer_saved(
1405        this: Model<Self>,
1406        envelope: TypedEnvelope<proto::BufferSaved>,
1407        mut cx: AsyncAppContext,
1408    ) -> Result<()> {
1409        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1410        let version = deserialize_version(&envelope.payload.version);
1411        let mtime = envelope.payload.mtime.map(|time| time.into());
1412        this.update(&mut cx, |this, cx| {
1413            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1414                buffer.update(cx, |buffer, cx| {
1415                    buffer.did_save(version, mtime, cx);
1416                });
1417            }
1418        })
1419    }
1420
1421    pub async fn handle_buffer_reloaded(
1422        this: Model<Self>,
1423        envelope: TypedEnvelope<proto::BufferReloaded>,
1424        mut cx: AsyncAppContext,
1425    ) -> Result<()> {
1426        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1427        let version = deserialize_version(&envelope.payload.version);
1428        let mtime = envelope.payload.mtime.map(|time| time.into());
1429        let line_ending = deserialize_line_ending(
1430            proto::LineEnding::from_i32(envelope.payload.line_ending)
1431                .ok_or_else(|| anyhow!("missing line ending"))?,
1432        );
1433        this.update(&mut cx, |this, cx| {
1434            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
1435                buffer.update(cx, |buffer, cx| {
1436                    buffer.did_reload(version, line_ending, mtime, cx);
1437                });
1438            }
1439        })
1440    }
1441
1442    pub async fn handle_blame_buffer(
1443        this: Model<Self>,
1444        envelope: TypedEnvelope<proto::BlameBuffer>,
1445        mut cx: AsyncAppContext,
1446    ) -> Result<proto::BlameBufferResponse> {
1447        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
1448        let version = deserialize_version(&envelope.payload.version);
1449        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
1450        buffer
1451            .update(&mut cx, |buffer, _| {
1452                buffer.wait_for_version(version.clone())
1453            })?
1454            .await?;
1455        let blame = this
1456            .update(&mut cx, |this, cx| {
1457                this.blame_buffer(&buffer, Some(version), cx)
1458            })?
1459            .await?;
1460        Ok(serialize_blame_buffer_response(blame))
1461    }
1462
1463    pub async fn wait_for_loading_buffer(
1464        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
1465    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
1466        loop {
1467            if let Some(result) = receiver.borrow().as_ref() {
1468                match result {
1469                    Ok(buffer) => return Ok(buffer.to_owned()),
1470                    Err(e) => return Err(e.to_owned()),
1471                }
1472            }
1473            receiver.next().await;
1474        }
1475    }
1476
1477    pub fn reload_buffers(
1478        &self,
1479        buffers: HashSet<Model<Buffer>>,
1480        push_to_history: bool,
1481        cx: &mut ModelContext<Self>,
1482    ) -> Task<Result<ProjectTransaction>> {
1483        let mut local_buffers = Vec::new();
1484        let mut remote_buffers = Vec::new();
1485        for buffer_handle in buffers {
1486            let buffer = buffer_handle.read(cx);
1487            if buffer.is_dirty() {
1488                if let Some(file) = File::from_dyn(buffer.file()) {
1489                    if file.is_local() {
1490                        local_buffers.push(buffer_handle);
1491                    } else {
1492                        remote_buffers.push(buffer_handle);
1493                    }
1494                }
1495            }
1496        }
1497
1498        let client = self.upstream_client();
1499
1500        cx.spawn(move |this, mut cx| async move {
1501            let mut project_transaction = ProjectTransaction::default();
1502            if let Some((client, project_id)) = client {
1503                let response = client
1504                    .request(proto::ReloadBuffers {
1505                        project_id,
1506                        buffer_ids: remote_buffers
1507                            .iter()
1508                            .filter_map(|buffer| {
1509                                buffer
1510                                    .update(&mut cx, |buffer, _| buffer.remote_id().into())
1511                                    .ok()
1512                            })
1513                            .collect(),
1514                    })
1515                    .await?
1516                    .transaction
1517                    .ok_or_else(|| anyhow!("missing transaction"))?;
1518                BufferStore::deserialize_project_transaction(
1519                    this,
1520                    response,
1521                    push_to_history,
1522                    cx.clone(),
1523                )
1524                .await?;
1525            }
1526
1527            for buffer in local_buffers {
1528                let transaction = buffer
1529                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
1530                    .await?;
1531                buffer.update(&mut cx, |buffer, cx| {
1532                    if let Some(transaction) = transaction {
1533                        if !push_to_history {
1534                            buffer.forget_transaction(transaction.id);
1535                        }
1536                        project_transaction.0.insert(cx.handle(), transaction);
1537                    }
1538                })?;
1539            }
1540
1541            Ok(project_transaction)
1542        })
1543    }
1544
1545    async fn handle_reload_buffers(
1546        this: Model<Self>,
1547        envelope: TypedEnvelope<proto::ReloadBuffers>,
1548        mut cx: AsyncAppContext,
1549    ) -> Result<proto::ReloadBuffersResponse> {
1550        let sender_id = envelope.original_sender_id().unwrap_or_default();
1551        let reload = this.update(&mut cx, |this, cx| {
1552            let mut buffers = HashSet::default();
1553            for buffer_id in &envelope.payload.buffer_ids {
1554                let buffer_id = BufferId::new(*buffer_id)?;
1555                buffers.insert(this.get_existing(buffer_id)?);
1556            }
1557            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
1558        })??;
1559
1560        let project_transaction = reload.await?;
1561        let project_transaction = this.update(&mut cx, |this, cx| {
1562            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
1563        })?;
1564        Ok(proto::ReloadBuffersResponse {
1565            transaction: Some(project_transaction),
1566        })
1567    }
1568
1569    pub fn create_buffer_for_peer(
1570        &mut self,
1571        buffer: &Model<Buffer>,
1572        peer_id: proto::PeerId,
1573        cx: &mut ModelContext<Self>,
1574    ) -> Task<Result<()>> {
1575        let buffer_id = buffer.read(cx).remote_id();
1576        if !self
1577            .shared_buffers
1578            .entry(peer_id)
1579            .or_default()
1580            .insert(buffer.clone())
1581        {
1582            return Task::ready(Ok(()));
1583        }
1584
1585        let Some((client, project_id)) = self.downstream_client.clone() else {
1586            return Task::ready(Ok(()));
1587        };
1588
1589        cx.spawn(|this, mut cx| async move {
1590            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
1591                return anyhow::Ok(());
1592            };
1593
1594            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
1595            let operations = operations.await;
1596            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
1597
1598            let initial_state = proto::CreateBufferForPeer {
1599                project_id,
1600                peer_id: Some(peer_id),
1601                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
1602            };
1603
1604            if client.send(initial_state).log_err().is_some() {
1605                let client = client.clone();
1606                cx.background_executor()
1607                    .spawn(async move {
1608                        let mut chunks = split_operations(operations).peekable();
1609                        while let Some(chunk) = chunks.next() {
1610                            let is_last = chunks.peek().is_none();
1611                            client.send(proto::CreateBufferForPeer {
1612                                project_id,
1613                                peer_id: Some(peer_id),
1614                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
1615                                    proto::BufferChunk {
1616                                        buffer_id: buffer_id.into(),
1617                                        operations: chunk,
1618                                        is_last,
1619                                    },
1620                                )),
1621                            })?;
1622                        }
1623                        anyhow::Ok(())
1624                    })
1625                    .await
1626                    .log_err();
1627            }
1628            Ok(())
1629        })
1630    }
1631
1632    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
1633        match &self.state {
1634            BufferStoreState::Remote {
1635                upstream_client,
1636                project_id,
1637                ..
1638            } => Some((upstream_client.clone(), *project_id)),
1639            BufferStoreState::Local { .. } => None,
1640        }
1641    }
1642
1643    pub fn forget_shared_buffers(&mut self) {
1644        self.shared_buffers.clear();
1645    }
1646
1647    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
1648        self.shared_buffers.remove(peer_id);
1649    }
1650
1651    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
1652        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
1653            self.shared_buffers.insert(new_peer_id, buffers);
1654        }
1655    }
1656
1657    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
1658        &self.shared_buffers
1659    }
1660
1661    pub fn serialize_project_transaction_for_peer(
1662        &mut self,
1663        project_transaction: ProjectTransaction,
1664        peer_id: proto::PeerId,
1665        cx: &mut ModelContext<Self>,
1666    ) -> proto::ProjectTransaction {
1667        let mut serialized_transaction = proto::ProjectTransaction {
1668            buffer_ids: Default::default(),
1669            transactions: Default::default(),
1670        };
1671        for (buffer, transaction) in project_transaction.0 {
1672            self.create_buffer_for_peer(&buffer, peer_id, cx)
1673                .detach_and_log_err(cx);
1674            serialized_transaction
1675                .buffer_ids
1676                .push(buffer.read(cx).remote_id().into());
1677            serialized_transaction
1678                .transactions
1679                .push(language::proto::serialize_transaction(&transaction));
1680        }
1681        serialized_transaction
1682    }
1683
1684    pub async fn deserialize_project_transaction(
1685        this: WeakModel<Self>,
1686        message: proto::ProjectTransaction,
1687        push_to_history: bool,
1688        mut cx: AsyncAppContext,
1689    ) -> Result<ProjectTransaction> {
1690        let mut project_transaction = ProjectTransaction::default();
1691        for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) {
1692            let buffer_id = BufferId::new(buffer_id)?;
1693            let buffer = this
1694                .update(&mut cx, |this, cx| {
1695                    this.wait_for_remote_buffer(buffer_id, cx)
1696                })?
1697                .await?;
1698            let transaction = language::proto::deserialize_transaction(transaction)?;
1699            project_transaction.0.insert(buffer, transaction);
1700        }
1701
1702        for (buffer, transaction) in &project_transaction.0 {
1703            buffer
1704                .update(&mut cx, |buffer, _| {
1705                    buffer.wait_for_edits(transaction.edit_ids.iter().copied())
1706                })?
1707                .await?;
1708
1709            if push_to_history {
1710                buffer.update(&mut cx, |buffer, _| {
1711                    buffer.push_transaction(transaction.clone(), Instant::now());
1712                })?;
1713            }
1714        }
1715
1716        Ok(project_transaction)
1717    }
1718}
1719
1720impl OpenBuffer {
1721    fn upgrade(&self) -> Option<Model<Buffer>> {
1722        match self {
1723            OpenBuffer::Buffer(handle) => handle.upgrade(),
1724            OpenBuffer::Operations(_) => None,
1725        }
1726    }
1727}
1728
1729fn is_not_found_error(error: &anyhow::Error) -> bool {
1730    error
1731        .root_cause()
1732        .downcast_ref::<io::Error>()
1733        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
1734}
1735
1736fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
1737    let entries = blame
1738        .entries
1739        .into_iter()
1740        .map(|entry| proto::BlameEntry {
1741            sha: entry.sha.as_bytes().into(),
1742            start_line: entry.range.start,
1743            end_line: entry.range.end,
1744            original_line_number: entry.original_line_number,
1745            author: entry.author.clone(),
1746            author_mail: entry.author_mail.clone(),
1747            author_time: entry.author_time,
1748            author_tz: entry.author_tz.clone(),
1749            committer: entry.committer.clone(),
1750            committer_mail: entry.committer_mail.clone(),
1751            committer_time: entry.committer_time,
1752            committer_tz: entry.committer_tz.clone(),
1753            summary: entry.summary.clone(),
1754            previous: entry.previous.clone(),
1755            filename: entry.filename.clone(),
1756        })
1757        .collect::<Vec<_>>();
1758
1759    let messages = blame
1760        .messages
1761        .into_iter()
1762        .map(|(oid, message)| proto::CommitMessage {
1763            oid: oid.as_bytes().into(),
1764            message,
1765        })
1766        .collect::<Vec<_>>();
1767
1768    let permalinks = blame
1769        .permalinks
1770        .into_iter()
1771        .map(|(oid, url)| proto::CommitPermalink {
1772            oid: oid.as_bytes().into(),
1773            permalink: url.to_string(),
1774        })
1775        .collect::<Vec<_>>();
1776
1777    proto::BlameBufferResponse {
1778        entries,
1779        messages,
1780        permalinks,
1781        remote_url: blame.remote_url,
1782    }
1783}
1784
1785fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
1786    let entries = response
1787        .entries
1788        .into_iter()
1789        .filter_map(|entry| {
1790            Some(git::blame::BlameEntry {
1791                sha: git::Oid::from_bytes(&entry.sha).ok()?,
1792                range: entry.start_line..entry.end_line,
1793                original_line_number: entry.original_line_number,
1794                committer: entry.committer,
1795                committer_time: entry.committer_time,
1796                committer_tz: entry.committer_tz,
1797                committer_mail: entry.committer_mail,
1798                author: entry.author,
1799                author_mail: entry.author_mail,
1800                author_time: entry.author_time,
1801                author_tz: entry.author_tz,
1802                summary: entry.summary,
1803                previous: entry.previous,
1804                filename: entry.filename,
1805            })
1806        })
1807        .collect::<Vec<_>>();
1808
1809    let messages = response
1810        .messages
1811        .into_iter()
1812        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
1813        .collect::<HashMap<_, _>>();
1814
1815    let permalinks = response
1816        .permalinks
1817        .into_iter()
1818        .filter_map(|permalink| {
1819            Some((
1820                git::Oid::from_bytes(&permalink.oid).ok()?,
1821                Url::from_str(&permalink.permalink).ok()?,
1822            ))
1823        })
1824        .collect::<HashMap<_, _>>();
1825
1826    Blame {
1827        entries,
1828        permalinks,
1829        messages,
1830        remote_url: response.remote_url,
1831    }
1832}