lib.rs

   1pub mod fs;
   2mod ignore;
   3
   4use self::ignore::IgnoreStack;
   5use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
   6use anyhow::{anyhow, Result};
   7use buffer::{self, Buffer, History, LanguageRegistry, Operation, Rope};
   8use clock::ReplicaId;
   9pub use fs::*;
  10use futures::{Stream, StreamExt};
  11use fuzzy::CharBag;
  12use gpui::{
  13    executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
  14    Task, UpgradeModelHandle, WeakModelHandle,
  15};
  16use lazy_static::lazy_static;
  17use parking_lot::Mutex;
  18use postage::{
  19    prelude::{Sink as _, Stream as _},
  20    watch,
  21};
  22use rpc_client::{self as rpc, proto, PeerId, TypedEnvelope};
  23use serde::Deserialize;
  24use smol::channel::{self, Sender};
  25use std::{
  26    any::Any,
  27    cmp::{self, Ordering},
  28    collections::HashMap,
  29    convert::{TryFrom, TryInto},
  30    ffi::{OsStr, OsString},
  31    fmt,
  32    future::Future,
  33    ops::Deref,
  34    path::{Path, PathBuf},
  35    sync::{
  36        atomic::{AtomicUsize, Ordering::SeqCst},
  37        Arc,
  38    },
  39    time::{Duration, SystemTime},
  40};
  41use sum_tree::Bias;
  42use sum_tree::{self, Edit, SeekTarget, SumTree};
  43use util::TryFutureExt;
  44
  45lazy_static! {
  46    static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
  47}
  48
  49#[derive(Clone, Debug)]
  50enum ScanState {
  51    Idle,
  52    Scanning,
  53    Err(Arc<anyhow::Error>),
  54}
  55
  56pub enum Worktree {
  57    Local(LocalWorktree),
  58    Remote(RemoteWorktree),
  59}
  60
  61pub enum Event {
  62    Closed,
  63}
  64
  65impl Entity for Worktree {
  66    type Event = Event;
  67
  68    fn release(&mut self, cx: &mut MutableAppContext) {
  69        match self {
  70            Self::Local(tree) => {
  71                if let Some(worktree_id) = *tree.remote_id.borrow() {
  72                    let rpc = tree.rpc.clone();
  73                    cx.spawn(|_| async move {
  74                        if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
  75                            log::error!("error closing worktree: {}", err);
  76                        }
  77                    })
  78                    .detach();
  79                }
  80            }
  81            Self::Remote(tree) => {
  82                let rpc = tree.rpc.clone();
  83                let worktree_id = tree.remote_id;
  84                cx.spawn(|_| async move {
  85                    if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
  86                        log::error!("error closing worktree: {}", err);
  87                    }
  88                })
  89                .detach();
  90            }
  91        }
  92    }
  93}
  94
  95impl Worktree {
  96    pub async fn open_local(
  97        rpc: Arc<rpc::Client>,
  98        path: impl Into<Arc<Path>>,
  99        fs: Arc<dyn Fs>,
 100        languages: Arc<LanguageRegistry>,
 101        cx: &mut AsyncAppContext,
 102    ) -> Result<ModelHandle<Self>> {
 103        let (tree, scan_states_tx) =
 104            LocalWorktree::new(rpc, path, fs.clone(), languages, cx).await?;
 105        tree.update(cx, |tree, cx| {
 106            let tree = tree.as_local_mut().unwrap();
 107            let abs_path = tree.snapshot.abs_path.clone();
 108            let background_snapshot = tree.background_snapshot.clone();
 109            let background = cx.background().clone();
 110            tree._background_scanner_task = Some(cx.background().spawn(async move {
 111                let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
 112                let scanner =
 113                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
 114                scanner.run(events).await;
 115            }));
 116        });
 117        Ok(tree)
 118    }
 119
 120    pub async fn open_remote(
 121        rpc: Arc<rpc::Client>,
 122        id: u64,
 123        languages: Arc<LanguageRegistry>,
 124        cx: &mut AsyncAppContext,
 125    ) -> Result<ModelHandle<Self>> {
 126        let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
 127        Worktree::remote(response, rpc, languages, cx).await
 128    }
 129
 130    async fn remote(
 131        join_response: proto::JoinWorktreeResponse,
 132        rpc: Arc<rpc::Client>,
 133        languages: Arc<LanguageRegistry>,
 134        cx: &mut AsyncAppContext,
 135    ) -> Result<ModelHandle<Self>> {
 136        let worktree = join_response
 137            .worktree
 138            .ok_or_else(|| anyhow!("empty worktree"))?;
 139
 140        let remote_id = worktree.id;
 141        let replica_id = join_response.replica_id as ReplicaId;
 142        let peers = join_response.peers;
 143        let root_char_bag: CharBag = worktree
 144            .root_name
 145            .chars()
 146            .map(|c| c.to_ascii_lowercase())
 147            .collect();
 148        let root_name = worktree.root_name.clone();
 149        let (entries_by_path, entries_by_id) = cx
 150            .background()
 151            .spawn(async move {
 152                let mut entries_by_path_edits = Vec::new();
 153                let mut entries_by_id_edits = Vec::new();
 154                for entry in worktree.entries {
 155                    match Entry::try_from((&root_char_bag, entry)) {
 156                        Ok(entry) => {
 157                            entries_by_id_edits.push(Edit::Insert(PathEntry {
 158                                id: entry.id,
 159                                path: entry.path.clone(),
 160                                is_ignored: entry.is_ignored,
 161                                scan_id: 0,
 162                            }));
 163                            entries_by_path_edits.push(Edit::Insert(entry));
 164                        }
 165                        Err(err) => log::warn!("error for remote worktree entry {:?}", err),
 166                    }
 167                }
 168
 169                let mut entries_by_path = SumTree::new();
 170                let mut entries_by_id = SumTree::new();
 171                entries_by_path.edit(entries_by_path_edits, &());
 172                entries_by_id.edit(entries_by_id_edits, &());
 173                (entries_by_path, entries_by_id)
 174            })
 175            .await;
 176
 177        let worktree = cx.update(|cx| {
 178            cx.add_model(|cx: &mut ModelContext<Worktree>| {
 179                let snapshot = Snapshot {
 180                    id: cx.model_id(),
 181                    scan_id: 0,
 182                    abs_path: Path::new("").into(),
 183                    root_name,
 184                    root_char_bag,
 185                    ignores: Default::default(),
 186                    entries_by_path,
 187                    entries_by_id,
 188                    removed_entry_ids: Default::default(),
 189                    next_entry_id: Default::default(),
 190                };
 191
 192                let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
 193                let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
 194
 195                cx.background()
 196                    .spawn(async move {
 197                        while let Some(update) = updates_rx.recv().await {
 198                            let mut snapshot = snapshot_tx.borrow().clone();
 199                            if let Err(error) = snapshot.apply_update(update) {
 200                                log::error!("error applying worktree update: {}", error);
 201                            }
 202                            *snapshot_tx.borrow_mut() = snapshot;
 203                        }
 204                    })
 205                    .detach();
 206
 207                {
 208                    let mut snapshot_rx = snapshot_rx.clone();
 209                    cx.spawn_weak(|this, mut cx| async move {
 210                        while let Some(_) = snapshot_rx.recv().await {
 211                            if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
 212                                this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
 213                            } else {
 214                                break;
 215                            }
 216                        }
 217                    })
 218                    .detach();
 219                }
 220
 221                let _subscriptions = vec![
 222                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_add_peer),
 223                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer),
 224                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_update),
 225                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
 226                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
 227                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
 228                ];
 229
 230                Worktree::Remote(RemoteWorktree {
 231                    remote_id,
 232                    replica_id,
 233                    snapshot,
 234                    snapshot_rx,
 235                    updates_tx,
 236                    rpc: rpc.clone(),
 237                    open_buffers: Default::default(),
 238                    peers: peers
 239                        .into_iter()
 240                        .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
 241                        .collect(),
 242                    queued_operations: Default::default(),
 243                    languages,
 244                    _subscriptions,
 245                })
 246            })
 247        });
 248
 249        Ok(worktree)
 250    }
 251
 252    pub fn as_local(&self) -> Option<&LocalWorktree> {
 253        if let Worktree::Local(worktree) = self {
 254            Some(worktree)
 255        } else {
 256            None
 257        }
 258    }
 259
 260    pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
 261        if let Worktree::Local(worktree) = self {
 262            Some(worktree)
 263        } else {
 264            None
 265        }
 266    }
 267
 268    pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
 269        if let Worktree::Remote(worktree) = self {
 270            Some(worktree)
 271        } else {
 272            None
 273        }
 274    }
 275
 276    pub fn snapshot(&self) -> Snapshot {
 277        match self {
 278            Worktree::Local(worktree) => worktree.snapshot(),
 279            Worktree::Remote(worktree) => worktree.snapshot(),
 280        }
 281    }
 282
 283    pub fn replica_id(&self) -> ReplicaId {
 284        match self {
 285            Worktree::Local(_) => 0,
 286            Worktree::Remote(worktree) => worktree.replica_id,
 287        }
 288    }
 289
 290    pub fn languages(&self) -> &Arc<LanguageRegistry> {
 291        match self {
 292            Worktree::Local(worktree) => &worktree.languages,
 293            Worktree::Remote(worktree) => &worktree.languages,
 294        }
 295    }
 296
 297    pub fn handle_add_peer(
 298        &mut self,
 299        envelope: TypedEnvelope<proto::AddPeer>,
 300        _: Arc<rpc::Client>,
 301        cx: &mut ModelContext<Self>,
 302    ) -> Result<()> {
 303        match self {
 304            Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
 305            Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
 306        }
 307    }
 308
 309    pub fn handle_remove_peer(
 310        &mut self,
 311        envelope: TypedEnvelope<proto::RemovePeer>,
 312        _: Arc<rpc::Client>,
 313        cx: &mut ModelContext<Self>,
 314    ) -> Result<()> {
 315        match self {
 316            Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
 317            Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
 318        }
 319    }
 320
 321    pub fn handle_update(
 322        &mut self,
 323        envelope: TypedEnvelope<proto::UpdateWorktree>,
 324        _: Arc<rpc::Client>,
 325        cx: &mut ModelContext<Self>,
 326    ) -> anyhow::Result<()> {
 327        self.as_remote_mut()
 328            .unwrap()
 329            .update_from_remote(envelope, cx)
 330    }
 331
 332    pub fn handle_open_buffer(
 333        &mut self,
 334        envelope: TypedEnvelope<proto::OpenBuffer>,
 335        rpc: Arc<rpc::Client>,
 336        cx: &mut ModelContext<Self>,
 337    ) -> anyhow::Result<()> {
 338        let receipt = envelope.receipt();
 339
 340        let response = self
 341            .as_local_mut()
 342            .unwrap()
 343            .open_remote_buffer(envelope, cx);
 344
 345        cx.background()
 346            .spawn(
 347                async move {
 348                    rpc.respond(receipt, response.await?).await?;
 349                    Ok(())
 350                }
 351                .log_err(),
 352            )
 353            .detach();
 354
 355        Ok(())
 356    }
 357
 358    pub fn handle_close_buffer(
 359        &mut self,
 360        envelope: TypedEnvelope<proto::CloseBuffer>,
 361        _: Arc<rpc::Client>,
 362        cx: &mut ModelContext<Self>,
 363    ) -> anyhow::Result<()> {
 364        self.as_local_mut()
 365            .unwrap()
 366            .close_remote_buffer(envelope, cx)
 367    }
 368
 369    pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
 370        match self {
 371            Worktree::Local(worktree) => &worktree.peers,
 372            Worktree::Remote(worktree) => &worktree.peers,
 373        }
 374    }
 375
 376    pub fn open_buffer(
 377        &mut self,
 378        path: impl AsRef<Path>,
 379        cx: &mut ModelContext<Self>,
 380    ) -> Task<Result<ModelHandle<Buffer>>> {
 381        match self {
 382            Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
 383            Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
 384        }
 385    }
 386
 387    #[cfg(feature = "test-support")]
 388    pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
 389        let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
 390            Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
 391            Worktree::Remote(worktree) => {
 392                Box::new(worktree.open_buffers.values().filter_map(|buf| {
 393                    if let RemoteBuffer::Loaded(buf) = buf {
 394                        Some(buf)
 395                    } else {
 396                        None
 397                    }
 398                }))
 399            }
 400        };
 401
 402        let path = path.as_ref();
 403        open_buffers
 404            .find(|buffer| {
 405                if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
 406                    file.path().as_ref() == path
 407                } else {
 408                    false
 409                }
 410            })
 411            .is_some()
 412    }
 413
 414    pub fn handle_update_buffer(
 415        &mut self,
 416        envelope: TypedEnvelope<proto::UpdateBuffer>,
 417        _: Arc<rpc::Client>,
 418        cx: &mut ModelContext<Self>,
 419    ) -> Result<()> {
 420        let payload = envelope.payload.clone();
 421        let buffer_id = payload.buffer_id as usize;
 422        let ops = payload
 423            .operations
 424            .into_iter()
 425            .map(|op| op.try_into())
 426            .collect::<anyhow::Result<Vec<_>>>()?;
 427
 428        match self {
 429            Worktree::Local(worktree) => {
 430                let buffer = worktree
 431                    .open_buffers
 432                    .get(&buffer_id)
 433                    .and_then(|buf| buf.upgrade(cx))
 434                    .ok_or_else(|| {
 435                        anyhow!("invalid buffer {} in update buffer message", buffer_id)
 436                    })?;
 437                buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
 438            }
 439            Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
 440                Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
 441                Some(RemoteBuffer::Loaded(buffer)) => {
 442                    if let Some(buffer) = buffer.upgrade(cx) {
 443                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
 444                    } else {
 445                        worktree
 446                            .open_buffers
 447                            .insert(buffer_id, RemoteBuffer::Operations(ops));
 448                    }
 449                }
 450                None => {
 451                    worktree
 452                        .open_buffers
 453                        .insert(buffer_id, RemoteBuffer::Operations(ops));
 454                }
 455            },
 456        }
 457
 458        Ok(())
 459    }
 460
 461    pub fn handle_save_buffer(
 462        &mut self,
 463        envelope: TypedEnvelope<proto::SaveBuffer>,
 464        rpc: Arc<rpc::Client>,
 465        cx: &mut ModelContext<Self>,
 466    ) -> Result<()> {
 467        let sender_id = envelope.original_sender_id()?;
 468        let buffer = self
 469            .as_local()
 470            .unwrap()
 471            .shared_buffers
 472            .get(&sender_id)
 473            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
 474            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
 475
 476        let receipt = envelope.receipt();
 477        let worktree_id = envelope.payload.worktree_id;
 478        let buffer_id = envelope.payload.buffer_id;
 479        let save = cx.spawn(|_, mut cx| async move {
 480            buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
 481        });
 482
 483        cx.background()
 484            .spawn(
 485                async move {
 486                    let (version, mtime) = save.await?;
 487
 488                    rpc.respond(
 489                        receipt,
 490                        proto::BufferSaved {
 491                            worktree_id,
 492                            buffer_id,
 493                            version: (&version).into(),
 494                            mtime: Some(mtime.into()),
 495                        },
 496                    )
 497                    .await?;
 498
 499                    Ok(())
 500                }
 501                .log_err(),
 502            )
 503            .detach();
 504
 505        Ok(())
 506    }
 507
 508    pub fn handle_buffer_saved(
 509        &mut self,
 510        envelope: TypedEnvelope<proto::BufferSaved>,
 511        _: Arc<rpc::Client>,
 512        cx: &mut ModelContext<Self>,
 513    ) -> Result<()> {
 514        let payload = envelope.payload.clone();
 515        let worktree = self.as_remote_mut().unwrap();
 516        if let Some(buffer) = worktree
 517            .open_buffers
 518            .get(&(payload.buffer_id as usize))
 519            .and_then(|buf| buf.upgrade(cx))
 520        {
 521            buffer.update(cx, |buffer, cx| {
 522                let version = payload.version.try_into()?;
 523                let mtime = payload
 524                    .mtime
 525                    .ok_or_else(|| anyhow!("missing mtime"))?
 526                    .into();
 527                buffer.did_save(version, mtime, None, cx);
 528                Result::<_, anyhow::Error>::Ok(())
 529            })?;
 530        }
 531        Ok(())
 532    }
 533
 534    pub fn handle_unshare(
 535        &mut self,
 536        _: TypedEnvelope<proto::UnshareWorktree>,
 537        _: Arc<rpc::Client>,
 538        cx: &mut ModelContext<Self>,
 539    ) -> Result<()> {
 540        cx.emit(Event::Closed);
 541        Ok(())
 542    }
 543
 544    fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
 545        match self {
 546            Self::Local(worktree) => {
 547                let is_fake_fs = worktree.fs.is_fake();
 548                worktree.snapshot = worktree.background_snapshot.lock().clone();
 549                if worktree.is_scanning() {
 550                    if worktree.poll_task.is_none() {
 551                        worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
 552                            if is_fake_fs {
 553                                smol::future::yield_now().await;
 554                            } else {
 555                                smol::Timer::after(Duration::from_millis(100)).await;
 556                            }
 557                            this.update(&mut cx, |this, cx| {
 558                                this.as_local_mut().unwrap().poll_task = None;
 559                                this.poll_snapshot(cx);
 560                            })
 561                        }));
 562                    }
 563                } else {
 564                    worktree.poll_task.take();
 565                    self.update_open_buffers(cx);
 566                }
 567            }
 568            Self::Remote(worktree) => {
 569                worktree.snapshot = worktree.snapshot_rx.borrow().clone();
 570                self.update_open_buffers(cx);
 571            }
 572        };
 573
 574        cx.notify();
 575    }
 576
 577    fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
 578        let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
 579            Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
 580            Self::Remote(worktree) => {
 581                Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
 582                    if let RemoteBuffer::Loaded(buf) = buf {
 583                        Some((id, buf))
 584                    } else {
 585                        None
 586                    }
 587                }))
 588            }
 589        };
 590
 591        let mut buffers_to_delete = Vec::new();
 592        for (buffer_id, buffer) in open_buffers {
 593            if let Some(buffer) = buffer.upgrade(cx) {
 594                buffer.update(cx, |buffer, cx| {
 595                    let buffer_is_clean = !buffer.is_dirty();
 596
 597                    if let Some(file) = buffer.file_mut() {
 598                        let mut file_changed = false;
 599
 600                        if let Some(entry) = file
 601                            .entry_id()
 602                            .and_then(|entry_id| self.entry_for_id(entry_id))
 603                        {
 604                            if entry.path != *file.path() {
 605                                file.set_path(entry.path.clone());
 606                                file_changed = true;
 607                            }
 608
 609                            if entry.mtime != file.mtime() {
 610                                file.set_mtime(entry.mtime);
 611                                file_changed = true;
 612                                if let Some(worktree) = self.as_local() {
 613                                    if buffer_is_clean {
 614                                        let abs_path = worktree.absolutize(file.path().as_ref());
 615                                        refresh_buffer(abs_path, &worktree.fs, cx);
 616                                    }
 617                                }
 618                            }
 619                        } else if let Some(entry) = self.entry_for_path(file.path().as_ref()) {
 620                            file.set_entry_id(Some(entry.id));
 621                            file.set_mtime(entry.mtime);
 622                            if let Some(worktree) = self.as_local() {
 623                                if buffer_is_clean {
 624                                    let abs_path = worktree.absolutize(file.path().as_ref());
 625                                    refresh_buffer(abs_path, &worktree.fs, cx);
 626                                }
 627                            }
 628                            file_changed = true;
 629                        } else if !file.is_deleted() {
 630                            if buffer_is_clean {
 631                                cx.emit(buffer::Event::Dirtied);
 632                            }
 633                            file.set_entry_id(None);
 634                            file_changed = true;
 635                        }
 636
 637                        if file_changed {
 638                            cx.emit(buffer::Event::FileHandleChanged);
 639                        }
 640                    }
 641                });
 642            } else {
 643                buffers_to_delete.push(*buffer_id);
 644            }
 645        }
 646
 647        for buffer_id in buffers_to_delete {
 648            match self {
 649                Self::Local(worktree) => {
 650                    worktree.open_buffers.remove(&buffer_id);
 651                }
 652                Self::Remote(worktree) => {
 653                    worktree.open_buffers.remove(&buffer_id);
 654                }
 655            }
 656        }
 657    }
 658}
 659
 660impl Deref for Worktree {
 661    type Target = Snapshot;
 662
 663    fn deref(&self) -> &Self::Target {
 664        match self {
 665            Worktree::Local(worktree) => &worktree.snapshot,
 666            Worktree::Remote(worktree) => &worktree.snapshot,
 667        }
 668    }
 669}
 670
 671pub struct LocalWorktree {
 672    snapshot: Snapshot,
 673    config: WorktreeConfig,
 674    background_snapshot: Arc<Mutex<Snapshot>>,
 675    last_scan_state_rx: watch::Receiver<ScanState>,
 676    _background_scanner_task: Option<Task<()>>,
 677    _maintain_remote_id_task: Task<Option<()>>,
 678    poll_task: Option<Task<()>>,
 679    remote_id: watch::Receiver<Option<u64>>,
 680    share: Option<ShareState>,
 681    open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
 682    shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
 683    peers: HashMap<PeerId, ReplicaId>,
 684    languages: Arc<LanguageRegistry>,
 685    queued_operations: Vec<(u64, Operation)>,
 686    rpc: Arc<rpc::Client>,
 687    fs: Arc<dyn Fs>,
 688}
 689
 690#[derive(Default, Deserialize)]
 691struct WorktreeConfig {
 692    collaborators: Vec<String>,
 693}
 694
 695impl LocalWorktree {
 696    async fn new(
 697        rpc: Arc<rpc::Client>,
 698        path: impl Into<Arc<Path>>,
 699        fs: Arc<dyn Fs>,
 700        languages: Arc<LanguageRegistry>,
 701        cx: &mut AsyncAppContext,
 702    ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
 703        let abs_path = path.into();
 704        let path: Arc<Path> = Arc::from(Path::new(""));
 705        let next_entry_id = AtomicUsize::new(0);
 706
 707        // After determining whether the root entry is a file or a directory, populate the
 708        // snapshot's "root name", which will be used for the purpose of fuzzy matching.
 709        let root_name = abs_path
 710            .file_name()
 711            .map_or(String::new(), |f| f.to_string_lossy().to_string());
 712        let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
 713        let metadata = fs.metadata(&abs_path).await?;
 714
 715        let mut config = WorktreeConfig::default();
 716        if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
 717            if let Ok(parsed) = toml::from_str(&zed_toml) {
 718                config = parsed;
 719            }
 720        }
 721
 722        let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
 723        let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
 724        let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
 725            let mut snapshot = Snapshot {
 726                id: cx.model_id(),
 727                scan_id: 0,
 728                abs_path,
 729                root_name: root_name.clone(),
 730                root_char_bag,
 731                ignores: Default::default(),
 732                entries_by_path: Default::default(),
 733                entries_by_id: Default::default(),
 734                removed_entry_ids: Default::default(),
 735                next_entry_id: Arc::new(next_entry_id),
 736            };
 737            if let Some(metadata) = metadata {
 738                snapshot.insert_entry(
 739                    Entry::new(
 740                        path.into(),
 741                        &metadata,
 742                        &snapshot.next_entry_id,
 743                        snapshot.root_char_bag,
 744                    ),
 745                    fs.as_ref(),
 746                );
 747            }
 748
 749            let (mut remote_id_tx, remote_id_rx) = watch::channel();
 750            let _maintain_remote_id_task = cx.spawn_weak({
 751                let rpc = rpc.clone();
 752                move |this, cx| {
 753                    async move {
 754                        let mut status = rpc.status();
 755                        while let Some(status) = status.recv().await {
 756                            if let Some(this) = this.upgrade(&cx) {
 757                                let remote_id = if let rpc::Status::Connected { .. } = status {
 758                                    let collaborator_logins = this.read_with(&cx, |this, _| {
 759                                        this.as_local().unwrap().config.collaborators.clone()
 760                                    });
 761                                    let response = rpc
 762                                        .request(proto::OpenWorktree {
 763                                            root_name: root_name.clone(),
 764                                            collaborator_logins,
 765                                        })
 766                                        .await?;
 767
 768                                    Some(response.worktree_id)
 769                                } else {
 770                                    None
 771                                };
 772                                if remote_id_tx.send(remote_id).await.is_err() {
 773                                    break;
 774                                }
 775                            }
 776                        }
 777                        Ok(())
 778                    }
 779                    .log_err()
 780                }
 781            });
 782
 783            let tree = Self {
 784                snapshot: snapshot.clone(),
 785                config,
 786                remote_id: remote_id_rx,
 787                background_snapshot: Arc::new(Mutex::new(snapshot)),
 788                last_scan_state_rx,
 789                _background_scanner_task: None,
 790                _maintain_remote_id_task,
 791                share: None,
 792                poll_task: None,
 793                open_buffers: Default::default(),
 794                shared_buffers: Default::default(),
 795                queued_operations: Default::default(),
 796                peers: Default::default(),
 797                languages,
 798                rpc,
 799                fs,
 800            };
 801
 802            cx.spawn_weak(|this, mut cx| async move {
 803                while let Ok(scan_state) = scan_states_rx.recv().await {
 804                    if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
 805                        let to_send = handle.update(&mut cx, |this, cx| {
 806                            last_scan_state_tx.blocking_send(scan_state).ok();
 807                            this.poll_snapshot(cx);
 808                            let tree = this.as_local_mut().unwrap();
 809                            if !tree.is_scanning() {
 810                                if let Some(share) = tree.share.as_ref() {
 811                                    return Some((tree.snapshot(), share.snapshots_tx.clone()));
 812                                }
 813                            }
 814                            None
 815                        });
 816
 817                        if let Some((snapshot, snapshots_to_send_tx)) = to_send {
 818                            if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
 819                                log::error!("error submitting snapshot to send {}", err);
 820                            }
 821                        }
 822                    } else {
 823                        break;
 824                    }
 825                }
 826            })
 827            .detach();
 828
 829            Worktree::Local(tree)
 830        });
 831
 832        Ok((tree, scan_states_tx))
 833    }
 834
 835    pub fn open_buffer(
 836        &mut self,
 837        path: &Path,
 838        cx: &mut ModelContext<Worktree>,
 839    ) -> Task<Result<ModelHandle<Buffer>>> {
 840        let handle = cx.handle();
 841
 842        // If there is already a buffer for the given path, then return it.
 843        let mut existing_buffer = None;
 844        self.open_buffers.retain(|_buffer_id, buffer| {
 845            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
 846                if let Some(file) = buffer.read(cx.as_ref()).file() {
 847                    if file.worktree_id() == handle.id() && file.path().as_ref() == path {
 848                        existing_buffer = Some(buffer);
 849                    }
 850                }
 851                true
 852            } else {
 853                false
 854            }
 855        });
 856
 857        let path = Arc::from(path);
 858        cx.spawn(|this, mut cx| async move {
 859            if let Some(existing_buffer) = existing_buffer {
 860                Ok(existing_buffer)
 861            } else {
 862                let (file, contents) = this
 863                    .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
 864                    .await?;
 865                let language = this.read_with(&cx, |this, cx| {
 866                    use buffer::File;
 867
 868                    this.languages()
 869                        .select_language(file.full_path(cx))
 870                        .cloned()
 871                });
 872                let buffer = cx.add_model(|cx| {
 873                    Buffer::from_history(
 874                        0,
 875                        History::new(contents.into()),
 876                        Some(Box::new(file)),
 877                        language,
 878                        cx,
 879                    )
 880                });
 881                this.update(&mut cx, |this, _| {
 882                    let this = this
 883                        .as_local_mut()
 884                        .ok_or_else(|| anyhow!("must be a local worktree"))?;
 885                    this.open_buffers.insert(buffer.id(), buffer.downgrade());
 886                    Ok(buffer)
 887                })
 888            }
 889        })
 890    }
 891
 892    pub fn open_remote_buffer(
 893        &mut self,
 894        envelope: TypedEnvelope<proto::OpenBuffer>,
 895        cx: &mut ModelContext<Worktree>,
 896    ) -> Task<Result<proto::OpenBufferResponse>> {
 897        let peer_id = envelope.original_sender_id();
 898        let path = Path::new(&envelope.payload.path);
 899
 900        let buffer = self.open_buffer(path, cx);
 901
 902        cx.spawn(|this, mut cx| async move {
 903            let buffer = buffer.await?;
 904            this.update(&mut cx, |this, cx| {
 905                this.as_local_mut()
 906                    .unwrap()
 907                    .shared_buffers
 908                    .entry(peer_id?)
 909                    .or_default()
 910                    .insert(buffer.id() as u64, buffer.clone());
 911
 912                Ok(proto::OpenBufferResponse {
 913                    buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
 914                })
 915            })
 916        })
 917    }
 918
 919    pub fn close_remote_buffer(
 920        &mut self,
 921        envelope: TypedEnvelope<proto::CloseBuffer>,
 922        cx: &mut ModelContext<Worktree>,
 923    ) -> Result<()> {
 924        if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
 925            shared_buffers.remove(&envelope.payload.buffer_id);
 926            cx.notify();
 927        }
 928
 929        Ok(())
 930    }
 931
 932    pub fn add_peer(
 933        &mut self,
 934        envelope: TypedEnvelope<proto::AddPeer>,
 935        cx: &mut ModelContext<Worktree>,
 936    ) -> Result<()> {
 937        let peer = envelope
 938            .payload
 939            .peer
 940            .as_ref()
 941            .ok_or_else(|| anyhow!("empty peer"))?;
 942        self.peers
 943            .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
 944        cx.notify();
 945
 946        Ok(())
 947    }
 948
 949    pub fn remove_peer(
 950        &mut self,
 951        envelope: TypedEnvelope<proto::RemovePeer>,
 952        cx: &mut ModelContext<Worktree>,
 953    ) -> Result<()> {
 954        let peer_id = PeerId(envelope.payload.peer_id);
 955        let replica_id = self
 956            .peers
 957            .remove(&peer_id)
 958            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
 959        self.shared_buffers.remove(&peer_id);
 960        for (_, buffer) in &self.open_buffers {
 961            if let Some(buffer) = buffer.upgrade(cx) {
 962                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
 963            }
 964        }
 965        cx.notify();
 966
 967        Ok(())
 968    }
 969
 970    pub fn scan_complete(&self) -> impl Future<Output = ()> {
 971        let mut scan_state_rx = self.last_scan_state_rx.clone();
 972        async move {
 973            let mut scan_state = Some(scan_state_rx.borrow().clone());
 974            while let Some(ScanState::Scanning) = scan_state {
 975                scan_state = scan_state_rx.recv().await;
 976            }
 977        }
 978    }
 979
 980    pub fn remote_id(&self) -> Option<u64> {
 981        *self.remote_id.borrow()
 982    }
 983
 984    pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
 985        let mut remote_id = self.remote_id.clone();
 986        async move {
 987            while let Some(remote_id) = remote_id.recv().await {
 988                if remote_id.is_some() {
 989                    return remote_id;
 990                }
 991            }
 992            None
 993        }
 994    }
 995
 996    fn is_scanning(&self) -> bool {
 997        if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
 998            true
 999        } else {
1000            false
1001        }
1002    }
1003
1004    pub fn snapshot(&self) -> Snapshot {
1005        self.snapshot.clone()
1006    }
1007
1008    pub fn abs_path(&self) -> &Path {
1009        self.snapshot.abs_path.as_ref()
1010    }
1011
1012    pub fn contains_abs_path(&self, path: &Path) -> bool {
1013        path.starts_with(&self.snapshot.abs_path)
1014    }
1015
1016    fn absolutize(&self, path: &Path) -> PathBuf {
1017        if path.file_name().is_some() {
1018            self.snapshot.abs_path.join(path)
1019        } else {
1020            self.snapshot.abs_path.to_path_buf()
1021        }
1022    }
1023
1024    fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1025        let handle = cx.handle();
1026        let path = Arc::from(path);
1027        let abs_path = self.absolutize(&path);
1028        let background_snapshot = self.background_snapshot.clone();
1029        let fs = self.fs.clone();
1030        cx.spawn(|this, mut cx| async move {
1031            let text = fs.load(&abs_path).await?;
1032            // Eagerly populate the snapshot with an updated entry for the loaded file
1033            let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1034            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1035            Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
1036        })
1037    }
1038
1039    pub fn save_buffer_as(
1040        &self,
1041        buffer: ModelHandle<Buffer>,
1042        path: impl Into<Arc<Path>>,
1043        text: Rope,
1044        cx: &mut ModelContext<Worktree>,
1045    ) -> Task<Result<File>> {
1046        let save = self.save(path, text, cx);
1047        cx.spawn(|this, mut cx| async move {
1048            let entry = save.await?;
1049            this.update(&mut cx, |this, cx| {
1050                this.as_local_mut()
1051                    .unwrap()
1052                    .open_buffers
1053                    .insert(buffer.id(), buffer.downgrade());
1054                Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
1055            })
1056        })
1057    }
1058
1059    fn save(
1060        &self,
1061        path: impl Into<Arc<Path>>,
1062        text: Rope,
1063        cx: &mut ModelContext<Worktree>,
1064    ) -> Task<Result<Entry>> {
1065        let path = path.into();
1066        let abs_path = self.absolutize(&path);
1067        let background_snapshot = self.background_snapshot.clone();
1068        let fs = self.fs.clone();
1069        let save = cx.background().spawn(async move {
1070            fs.save(&abs_path, &text).await?;
1071            refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1072        });
1073
1074        cx.spawn(|this, mut cx| async move {
1075            let entry = save.await?;
1076            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1077            Ok(entry)
1078        })
1079    }
1080
1081    pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1082        let snapshot = self.snapshot();
1083        let share_request = self.share_request(cx);
1084        let rpc = self.rpc.clone();
1085        cx.spawn(|this, mut cx| async move {
1086            let share_request = if let Some(request) = share_request.await {
1087                request
1088            } else {
1089                return Err(anyhow!("failed to open worktree on the server"));
1090            };
1091
1092            let remote_id = share_request.worktree.as_ref().unwrap().id;
1093            let share_response = rpc.request(share_request).await?;
1094
1095            log::info!("sharing worktree {:?}", share_response);
1096            let (snapshots_to_send_tx, snapshots_to_send_rx) =
1097                smol::channel::unbounded::<Snapshot>();
1098
1099            cx.background()
1100                .spawn({
1101                    let rpc = rpc.clone();
1102                    async move {
1103                        let mut prev_snapshot = snapshot;
1104                        while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1105                            let message = snapshot.build_update(&prev_snapshot, remote_id, false);
1106                            match rpc.send(message).await {
1107                                Ok(()) => prev_snapshot = snapshot,
1108                                Err(err) => log::error!("error sending snapshot diff {}", err),
1109                            }
1110                        }
1111                    }
1112                })
1113                .detach();
1114
1115            this.update(&mut cx, |worktree, cx| {
1116                let _subscriptions = vec![
1117                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer),
1118                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer),
1119                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1120                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1121                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1122                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1123                ];
1124
1125                let worktree = worktree.as_local_mut().unwrap();
1126                worktree.share = Some(ShareState {
1127                    snapshots_tx: snapshots_to_send_tx,
1128                    _subscriptions,
1129                });
1130            });
1131
1132            Ok(remote_id)
1133        })
1134    }
1135
1136    pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1137        self.share.take();
1138        let rpc = self.rpc.clone();
1139        let remote_id = self.remote_id();
1140        cx.foreground()
1141            .spawn(
1142                async move {
1143                    if let Some(worktree_id) = remote_id {
1144                        rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1145                    }
1146                    Ok(())
1147                }
1148                .log_err(),
1149            )
1150            .detach()
1151    }
1152
1153    fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1154        let remote_id = self.next_remote_id();
1155        let snapshot = self.snapshot();
1156        let root_name = self.root_name.clone();
1157        cx.background().spawn(async move {
1158            remote_id.await.map(|id| {
1159                let entries = snapshot
1160                    .entries_by_path
1161                    .cursor::<()>()
1162                    .filter(|e| !e.is_ignored)
1163                    .map(Into::into)
1164                    .collect();
1165                proto::ShareWorktree {
1166                    worktree: Some(proto::Worktree {
1167                        id,
1168                        root_name,
1169                        entries,
1170                    }),
1171                }
1172            })
1173        })
1174    }
1175}
1176
1177fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
1178    let contents = smol::block_on(fs.load(&abs_path))?;
1179    let parent = abs_path.parent().unwrap_or(Path::new("/"));
1180    let mut builder = GitignoreBuilder::new(parent);
1181    for line in contents.lines() {
1182        builder.add_line(Some(abs_path.into()), line)?;
1183    }
1184    Ok(builder.build()?)
1185}
1186
1187pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1188    let fs = fs.clone();
1189    cx.spawn(|buffer, mut cx| async move {
1190        let new_text = fs.load(&abs_path).await;
1191        match new_text {
1192            Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1193            Ok(new_text) => {
1194                buffer
1195                    .update(&mut cx, |buffer, cx| {
1196                        buffer.set_text_from_disk(new_text.into(), cx)
1197                    })
1198                    .await;
1199            }
1200        }
1201    })
1202    .detach()
1203}
1204
1205impl Deref for LocalWorktree {
1206    type Target = Snapshot;
1207
1208    fn deref(&self) -> &Self::Target {
1209        &self.snapshot
1210    }
1211}
1212
1213impl fmt::Debug for LocalWorktree {
1214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1215        self.snapshot.fmt(f)
1216    }
1217}
1218
1219struct ShareState {
1220    snapshots_tx: Sender<Snapshot>,
1221    _subscriptions: Vec<rpc::Subscription>,
1222}
1223
1224pub struct RemoteWorktree {
1225    remote_id: u64,
1226    snapshot: Snapshot,
1227    snapshot_rx: watch::Receiver<Snapshot>,
1228    rpc: Arc<rpc::Client>,
1229    updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1230    replica_id: ReplicaId,
1231    open_buffers: HashMap<usize, RemoteBuffer>,
1232    peers: HashMap<PeerId, ReplicaId>,
1233    languages: Arc<LanguageRegistry>,
1234    queued_operations: Vec<(u64, Operation)>,
1235    _subscriptions: Vec<rpc::Subscription>,
1236}
1237
1238impl RemoteWorktree {
1239    pub fn open_buffer(
1240        &mut self,
1241        path: &Path,
1242        cx: &mut ModelContext<Worktree>,
1243    ) -> Task<Result<ModelHandle<Buffer>>> {
1244        let mut existing_buffer = None;
1245        self.open_buffers.retain(|_buffer_id, buffer| {
1246            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1247                if let Some(file) = buffer.read(cx.as_ref()).file() {
1248                    if file.worktree_id() == cx.model_id() && file.path().as_ref() == path {
1249                        existing_buffer = Some(buffer);
1250                    }
1251                }
1252                true
1253            } else {
1254                false
1255            }
1256        });
1257
1258        let rpc = self.rpc.clone();
1259        let replica_id = self.replica_id;
1260        let remote_worktree_id = self.remote_id;
1261        let path = path.to_string_lossy().to_string();
1262        cx.spawn_weak(|this, mut cx| async move {
1263            if let Some(existing_buffer) = existing_buffer {
1264                Ok(existing_buffer)
1265            } else {
1266                let entry = this
1267                    .upgrade(&cx)
1268                    .ok_or_else(|| anyhow!("worktree was closed"))?
1269                    .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1270                    .ok_or_else(|| anyhow!("file does not exist"))?;
1271                let response = rpc
1272                    .request(proto::OpenBuffer {
1273                        worktree_id: remote_worktree_id as u64,
1274                        path,
1275                    })
1276                    .await?;
1277
1278                let this = this
1279                    .upgrade(&cx)
1280                    .ok_or_else(|| anyhow!("worktree was closed"))?;
1281                let file = File::new(entry.id, this.clone(), entry.path, entry.mtime);
1282                let language = this.read_with(&cx, |this, cx| {
1283                    use buffer::File;
1284
1285                    this.languages()
1286                        .select_language(file.full_path(cx))
1287                        .cloned()
1288                });
1289                let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1290                let buffer_id = remote_buffer.id as usize;
1291                let buffer = cx.add_model(|cx| {
1292                    Buffer::from_proto(
1293                        replica_id,
1294                        remote_buffer,
1295                        Some(Box::new(file)),
1296                        language,
1297                        cx,
1298                    )
1299                    .unwrap()
1300                });
1301                this.update(&mut cx, |this, cx| {
1302                    let this = this.as_remote_mut().unwrap();
1303                    if let Some(RemoteBuffer::Operations(pending_ops)) = this
1304                        .open_buffers
1305                        .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1306                    {
1307                        buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1308                    }
1309                    Result::<_, anyhow::Error>::Ok(())
1310                })?;
1311                Ok(buffer)
1312            }
1313        })
1314    }
1315
1316    pub fn remote_id(&self) -> u64 {
1317        self.remote_id
1318    }
1319
1320    pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1321        for (_, buffer) in self.open_buffers.drain() {
1322            if let RemoteBuffer::Loaded(buffer) = buffer {
1323                if let Some(buffer) = buffer.upgrade(cx) {
1324                    buffer.update(cx, |buffer, cx| buffer.close(cx))
1325                }
1326            }
1327        }
1328    }
1329
1330    fn snapshot(&self) -> Snapshot {
1331        self.snapshot.clone()
1332    }
1333
1334    fn update_from_remote(
1335        &mut self,
1336        envelope: TypedEnvelope<proto::UpdateWorktree>,
1337        cx: &mut ModelContext<Worktree>,
1338    ) -> Result<()> {
1339        let mut tx = self.updates_tx.clone();
1340        let payload = envelope.payload.clone();
1341        cx.background()
1342            .spawn(async move {
1343                tx.send(payload).await.expect("receiver runs to completion");
1344            })
1345            .detach();
1346
1347        Ok(())
1348    }
1349
1350    pub fn add_peer(
1351        &mut self,
1352        envelope: TypedEnvelope<proto::AddPeer>,
1353        cx: &mut ModelContext<Worktree>,
1354    ) -> Result<()> {
1355        let peer = envelope
1356            .payload
1357            .peer
1358            .as_ref()
1359            .ok_or_else(|| anyhow!("empty peer"))?;
1360        self.peers
1361            .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1362        cx.notify();
1363        Ok(())
1364    }
1365
1366    pub fn remove_peer(
1367        &mut self,
1368        envelope: TypedEnvelope<proto::RemovePeer>,
1369        cx: &mut ModelContext<Worktree>,
1370    ) -> Result<()> {
1371        let peer_id = PeerId(envelope.payload.peer_id);
1372        let replica_id = self
1373            .peers
1374            .remove(&peer_id)
1375            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1376        for (_, buffer) in &self.open_buffers {
1377            if let Some(buffer) = buffer.upgrade(cx) {
1378                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1379            }
1380        }
1381        cx.notify();
1382        Ok(())
1383    }
1384}
1385
1386enum RemoteBuffer {
1387    Operations(Vec<Operation>),
1388    Loaded(WeakModelHandle<Buffer>),
1389}
1390
1391impl RemoteBuffer {
1392    fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1393        match self {
1394            Self::Operations(_) => None,
1395            Self::Loaded(buffer) => buffer.upgrade(cx),
1396        }
1397    }
1398}
1399
1400#[derive(Clone)]
1401pub struct Snapshot {
1402    id: usize,
1403    scan_id: usize,
1404    abs_path: Arc<Path>,
1405    root_name: String,
1406    root_char_bag: CharBag,
1407    ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1408    entries_by_path: SumTree<Entry>,
1409    entries_by_id: SumTree<PathEntry>,
1410    removed_entry_ids: HashMap<u64, usize>,
1411    next_entry_id: Arc<AtomicUsize>,
1412}
1413
1414impl Snapshot {
1415    pub fn id(&self) -> usize {
1416        self.id
1417    }
1418
1419    pub fn build_update(
1420        &self,
1421        other: &Self,
1422        worktree_id: u64,
1423        include_ignored: bool,
1424    ) -> proto::UpdateWorktree {
1425        let mut updated_entries = Vec::new();
1426        let mut removed_entries = Vec::new();
1427        let mut self_entries = self
1428            .entries_by_id
1429            .cursor::<()>()
1430            .filter(|e| include_ignored || !e.is_ignored)
1431            .peekable();
1432        let mut other_entries = other
1433            .entries_by_id
1434            .cursor::<()>()
1435            .filter(|e| include_ignored || !e.is_ignored)
1436            .peekable();
1437        loop {
1438            match (self_entries.peek(), other_entries.peek()) {
1439                (Some(self_entry), Some(other_entry)) => {
1440                    match Ord::cmp(&self_entry.id, &other_entry.id) {
1441                        Ordering::Less => {
1442                            let entry = self.entry_for_id(self_entry.id).unwrap().into();
1443                            updated_entries.push(entry);
1444                            self_entries.next();
1445                        }
1446                        Ordering::Equal => {
1447                            if self_entry.scan_id != other_entry.scan_id {
1448                                let entry = self.entry_for_id(self_entry.id).unwrap().into();
1449                                updated_entries.push(entry);
1450                            }
1451
1452                            self_entries.next();
1453                            other_entries.next();
1454                        }
1455                        Ordering::Greater => {
1456                            removed_entries.push(other_entry.id as u64);
1457                            other_entries.next();
1458                        }
1459                    }
1460                }
1461                (Some(self_entry), None) => {
1462                    let entry = self.entry_for_id(self_entry.id).unwrap().into();
1463                    updated_entries.push(entry);
1464                    self_entries.next();
1465                }
1466                (None, Some(other_entry)) => {
1467                    removed_entries.push(other_entry.id as u64);
1468                    other_entries.next();
1469                }
1470                (None, None) => break,
1471            }
1472        }
1473
1474        proto::UpdateWorktree {
1475            updated_entries,
1476            removed_entries,
1477            worktree_id,
1478        }
1479    }
1480
1481    fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1482        self.scan_id += 1;
1483        let scan_id = self.scan_id;
1484
1485        let mut entries_by_path_edits = Vec::new();
1486        let mut entries_by_id_edits = Vec::new();
1487        for entry_id in update.removed_entries {
1488            let entry_id = entry_id as usize;
1489            let entry = self
1490                .entry_for_id(entry_id)
1491                .ok_or_else(|| anyhow!("unknown entry"))?;
1492            entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1493            entries_by_id_edits.push(Edit::Remove(entry.id));
1494        }
1495
1496        for entry in update.updated_entries {
1497            let entry = Entry::try_from((&self.root_char_bag, entry))?;
1498            if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1499                entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1500            }
1501            entries_by_id_edits.push(Edit::Insert(PathEntry {
1502                id: entry.id,
1503                path: entry.path.clone(),
1504                is_ignored: entry.is_ignored,
1505                scan_id,
1506            }));
1507            entries_by_path_edits.push(Edit::Insert(entry));
1508        }
1509
1510        self.entries_by_path.edit(entries_by_path_edits, &());
1511        self.entries_by_id.edit(entries_by_id_edits, &());
1512
1513        Ok(())
1514    }
1515
1516    pub fn file_count(&self) -> usize {
1517        self.entries_by_path.summary().file_count
1518    }
1519
1520    pub fn visible_file_count(&self) -> usize {
1521        self.entries_by_path.summary().visible_file_count
1522    }
1523
1524    fn traverse_from_offset(
1525        &self,
1526        include_dirs: bool,
1527        include_ignored: bool,
1528        start_offset: usize,
1529    ) -> Traversal {
1530        let mut cursor = self.entries_by_path.cursor();
1531        cursor.seek(
1532            &TraversalTarget::Count {
1533                count: start_offset,
1534                include_dirs,
1535                include_ignored,
1536            },
1537            Bias::Right,
1538            &(),
1539        );
1540        Traversal {
1541            cursor,
1542            include_dirs,
1543            include_ignored,
1544        }
1545    }
1546
1547    fn traverse_from_path(
1548        &self,
1549        include_dirs: bool,
1550        include_ignored: bool,
1551        path: &Path,
1552    ) -> Traversal {
1553        let mut cursor = self.entries_by_path.cursor();
1554        cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
1555        Traversal {
1556            cursor,
1557            include_dirs,
1558            include_ignored,
1559        }
1560    }
1561
1562    pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
1563        self.traverse_from_offset(false, include_ignored, start)
1564    }
1565
1566    pub fn entries(&self, include_ignored: bool) -> Traversal {
1567        self.traverse_from_offset(true, include_ignored, 0)
1568    }
1569
1570    pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1571        let empty_path = Path::new("");
1572        self.entries_by_path
1573            .cursor::<()>()
1574            .filter(move |entry| entry.path.as_ref() != empty_path)
1575            .map(|entry| &entry.path)
1576    }
1577
1578    fn child_entries<'a>(&'a self, parent_path: &'a Path) -> ChildEntriesIter<'a> {
1579        let mut cursor = self.entries_by_path.cursor();
1580        cursor.seek(&TraversalTarget::Path(parent_path), Bias::Right, &());
1581        let traversal = Traversal {
1582            cursor,
1583            include_dirs: true,
1584            include_ignored: true,
1585        };
1586        ChildEntriesIter {
1587            traversal,
1588            parent_path,
1589        }
1590    }
1591
1592    pub fn root_entry(&self) -> Option<&Entry> {
1593        self.entry_for_path("")
1594    }
1595
1596    pub fn root_name(&self) -> &str {
1597        &self.root_name
1598    }
1599
1600    pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1601        let path = path.as_ref();
1602        self.traverse_from_path(true, true, path)
1603            .entry()
1604            .and_then(|entry| {
1605                if entry.path.as_ref() == path {
1606                    Some(entry)
1607                } else {
1608                    None
1609                }
1610            })
1611    }
1612
1613    pub fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1614        let entry = self.entries_by_id.get(&id, &())?;
1615        self.entry_for_path(&entry.path)
1616    }
1617
1618    pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1619        self.entry_for_path(path.as_ref()).map(|e| e.inode)
1620    }
1621
1622    fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
1623        if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1624            let abs_path = self.abs_path.join(&entry.path);
1625            match build_gitignore(&abs_path, fs) {
1626                Ok(ignore) => {
1627                    let ignore_dir_path = entry.path.parent().unwrap();
1628                    self.ignores
1629                        .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1630                }
1631                Err(error) => {
1632                    log::error!(
1633                        "error loading .gitignore file {:?} - {:?}",
1634                        &entry.path,
1635                        error
1636                    );
1637                }
1638            }
1639        }
1640
1641        self.reuse_entry_id(&mut entry);
1642        self.entries_by_path.insert_or_replace(entry.clone(), &());
1643        self.entries_by_id.insert_or_replace(
1644            PathEntry {
1645                id: entry.id,
1646                path: entry.path.clone(),
1647                is_ignored: entry.is_ignored,
1648                scan_id: self.scan_id,
1649            },
1650            &(),
1651        );
1652        entry
1653    }
1654
1655    fn populate_dir(
1656        &mut self,
1657        parent_path: Arc<Path>,
1658        entries: impl IntoIterator<Item = Entry>,
1659        ignore: Option<Arc<Gitignore>>,
1660    ) {
1661        let mut parent_entry = self
1662            .entries_by_path
1663            .get(&PathKey(parent_path.clone()), &())
1664            .unwrap()
1665            .clone();
1666        if let Some(ignore) = ignore {
1667            self.ignores.insert(parent_path, (ignore, self.scan_id));
1668        }
1669        if matches!(parent_entry.kind, EntryKind::PendingDir) {
1670            parent_entry.kind = EntryKind::Dir;
1671        } else {
1672            unreachable!();
1673        }
1674
1675        let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1676        let mut entries_by_id_edits = Vec::new();
1677
1678        for mut entry in entries {
1679            self.reuse_entry_id(&mut entry);
1680            entries_by_id_edits.push(Edit::Insert(PathEntry {
1681                id: entry.id,
1682                path: entry.path.clone(),
1683                is_ignored: entry.is_ignored,
1684                scan_id: self.scan_id,
1685            }));
1686            entries_by_path_edits.push(Edit::Insert(entry));
1687        }
1688
1689        self.entries_by_path.edit(entries_by_path_edits, &());
1690        self.entries_by_id.edit(entries_by_id_edits, &());
1691    }
1692
1693    fn reuse_entry_id(&mut self, entry: &mut Entry) {
1694        if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1695            entry.id = removed_entry_id;
1696        } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1697            entry.id = existing_entry.id;
1698        }
1699    }
1700
1701    fn remove_path(&mut self, path: &Path) {
1702        let mut new_entries;
1703        let removed_entries;
1704        {
1705            let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
1706            new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
1707            removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
1708            new_entries.push_tree(cursor.suffix(&()), &());
1709        }
1710        self.entries_by_path = new_entries;
1711
1712        let mut entries_by_id_edits = Vec::new();
1713        for entry in removed_entries.cursor::<()>() {
1714            let removed_entry_id = self
1715                .removed_entry_ids
1716                .entry(entry.inode)
1717                .or_insert(entry.id);
1718            *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1719            entries_by_id_edits.push(Edit::Remove(entry.id));
1720        }
1721        self.entries_by_id.edit(entries_by_id_edits, &());
1722
1723        if path.file_name() == Some(&GITIGNORE) {
1724            if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1725                *scan_id = self.scan_id;
1726            }
1727        }
1728    }
1729
1730    fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1731        let mut new_ignores = Vec::new();
1732        for ancestor in path.ancestors().skip(1) {
1733            if let Some((ignore, _)) = self.ignores.get(ancestor) {
1734                new_ignores.push((ancestor, Some(ignore.clone())));
1735            } else {
1736                new_ignores.push((ancestor, None));
1737            }
1738        }
1739
1740        let mut ignore_stack = IgnoreStack::none();
1741        for (parent_path, ignore) in new_ignores.into_iter().rev() {
1742            if ignore_stack.is_path_ignored(&parent_path, true) {
1743                ignore_stack = IgnoreStack::all();
1744                break;
1745            } else if let Some(ignore) = ignore {
1746                ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1747            }
1748        }
1749
1750        if ignore_stack.is_path_ignored(path, is_dir) {
1751            ignore_stack = IgnoreStack::all();
1752        }
1753
1754        ignore_stack
1755    }
1756}
1757
1758impl fmt::Debug for Snapshot {
1759    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1760        for entry in self.entries_by_path.cursor::<()>() {
1761            for _ in entry.path.ancestors().skip(1) {
1762                write!(f, " ")?;
1763            }
1764            writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1765        }
1766        Ok(())
1767    }
1768}
1769
1770#[derive(Clone, PartialEq)]
1771pub struct File {
1772    entry_id: Option<usize>,
1773    worktree: ModelHandle<Worktree>,
1774    pub path: Arc<Path>,
1775    pub mtime: SystemTime,
1776}
1777
1778impl File {
1779    pub fn new(
1780        entry_id: usize,
1781        worktree: ModelHandle<Worktree>,
1782        path: Arc<Path>,
1783        mtime: SystemTime,
1784    ) -> Self {
1785        Self {
1786            entry_id: Some(entry_id),
1787            worktree,
1788            path,
1789            mtime,
1790        }
1791    }
1792}
1793
1794impl buffer::File for File {
1795    fn worktree_id(&self) -> usize {
1796        self.worktree.id()
1797    }
1798
1799    fn entry_id(&self) -> Option<usize> {
1800        self.entry_id
1801    }
1802
1803    fn set_entry_id(&mut self, entry_id: Option<usize>) {
1804        self.entry_id = entry_id;
1805    }
1806
1807    fn mtime(&self) -> SystemTime {
1808        self.mtime
1809    }
1810
1811    fn set_mtime(&mut self, mtime: SystemTime) {
1812        self.mtime = mtime;
1813    }
1814
1815    fn path(&self) -> &Arc<Path> {
1816        &self.path
1817    }
1818
1819    fn set_path(&mut self, path: Arc<Path>) {
1820        self.path = path;
1821    }
1822
1823    fn full_path(&self, cx: &AppContext) -> PathBuf {
1824        let worktree = self.worktree.read(cx);
1825        let mut full_path = PathBuf::new();
1826        full_path.push(worktree.root_name());
1827        full_path.push(&self.path);
1828        full_path
1829    }
1830
1831    /// Returns the last component of this handle's absolute path. If this handle refers to the root
1832    /// of its worktree, then this method will return the name of the worktree itself.
1833    fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1834        self.path
1835            .file_name()
1836            .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1837            .map(Into::into)
1838    }
1839
1840    fn is_deleted(&self) -> bool {
1841        self.entry_id.is_none()
1842    }
1843
1844    fn save(
1845        &self,
1846        buffer_id: u64,
1847        text: Rope,
1848        version: clock::Global,
1849        cx: &mut MutableAppContext,
1850    ) -> Task<Result<(clock::Global, SystemTime)>> {
1851        self.worktree.update(cx, |worktree, cx| match worktree {
1852            Worktree::Local(worktree) => {
1853                let rpc = worktree.rpc.clone();
1854                let worktree_id = *worktree.remote_id.borrow();
1855                let save = worktree.save(self.path.clone(), text, cx);
1856                cx.background().spawn(async move {
1857                    let entry = save.await?;
1858                    if let Some(worktree_id) = worktree_id {
1859                        rpc.send(proto::BufferSaved {
1860                            worktree_id,
1861                            buffer_id,
1862                            version: (&version).into(),
1863                            mtime: Some(entry.mtime.into()),
1864                        })
1865                        .await?;
1866                    }
1867                    Ok((version, entry.mtime))
1868                })
1869            }
1870            Worktree::Remote(worktree) => {
1871                let rpc = worktree.rpc.clone();
1872                let worktree_id = worktree.remote_id;
1873                cx.foreground().spawn(async move {
1874                    let response = rpc
1875                        .request(proto::SaveBuffer {
1876                            worktree_id,
1877                            buffer_id,
1878                        })
1879                        .await?;
1880                    let version = response.version.try_into()?;
1881                    let mtime = response
1882                        .mtime
1883                        .ok_or_else(|| anyhow!("missing mtime"))?
1884                        .into();
1885                    Ok((version, mtime))
1886                })
1887            }
1888        })
1889    }
1890
1891    fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1892        self.worktree.update(cx, |worktree, cx| {
1893            if let Some((rpc, remote_id)) = match worktree {
1894                Worktree::Local(worktree) => worktree
1895                    .remote_id
1896                    .borrow()
1897                    .map(|id| (worktree.rpc.clone(), id)),
1898                Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1899            } {
1900                cx.spawn(|worktree, mut cx| async move {
1901                    if let Err(error) = rpc
1902                        .request(proto::UpdateBuffer {
1903                            worktree_id: remote_id,
1904                            buffer_id,
1905                            operations: vec![(&operation).into()],
1906                        })
1907                        .await
1908                    {
1909                        worktree.update(&mut cx, |worktree, _| {
1910                            log::error!("error sending buffer operation: {}", error);
1911                            match worktree {
1912                                Worktree::Local(t) => &mut t.queued_operations,
1913                                Worktree::Remote(t) => &mut t.queued_operations,
1914                            }
1915                            .push((buffer_id, operation));
1916                        });
1917                    }
1918                })
1919                .detach();
1920            }
1921        });
1922    }
1923
1924    fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1925        self.worktree.update(cx, |worktree, cx| {
1926            if let Worktree::Remote(worktree) = worktree {
1927                let worktree_id = worktree.remote_id;
1928                let rpc = worktree.rpc.clone();
1929                cx.background()
1930                    .spawn(async move {
1931                        if let Err(error) = rpc
1932                            .send(proto::CloseBuffer {
1933                                worktree_id,
1934                                buffer_id,
1935                            })
1936                            .await
1937                        {
1938                            log::error!("error closing remote buffer: {}", error);
1939                        }
1940                    })
1941                    .detach();
1942            }
1943        });
1944    }
1945
1946    fn boxed_clone(&self) -> Box<dyn buffer::File> {
1947        Box::new(self.clone())
1948    }
1949
1950    fn as_any(&self) -> &dyn Any {
1951        self
1952    }
1953}
1954
1955#[derive(Clone, Debug)]
1956pub struct Entry {
1957    pub id: usize,
1958    pub kind: EntryKind,
1959    pub path: Arc<Path>,
1960    pub inode: u64,
1961    pub mtime: SystemTime,
1962    pub is_symlink: bool,
1963    pub is_ignored: bool,
1964}
1965
1966#[derive(Clone, Debug)]
1967pub enum EntryKind {
1968    PendingDir,
1969    Dir,
1970    File(CharBag),
1971}
1972
1973impl Entry {
1974    fn new(
1975        path: Arc<Path>,
1976        metadata: &fs::Metadata,
1977        next_entry_id: &AtomicUsize,
1978        root_char_bag: CharBag,
1979    ) -> Self {
1980        Self {
1981            id: next_entry_id.fetch_add(1, SeqCst),
1982            kind: if metadata.is_dir {
1983                EntryKind::PendingDir
1984            } else {
1985                EntryKind::File(char_bag_for_path(root_char_bag, &path))
1986            },
1987            path,
1988            inode: metadata.inode,
1989            mtime: metadata.mtime,
1990            is_symlink: metadata.is_symlink,
1991            is_ignored: false,
1992        }
1993    }
1994
1995    pub fn is_dir(&self) -> bool {
1996        matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1997    }
1998
1999    pub fn is_file(&self) -> bool {
2000        matches!(self.kind, EntryKind::File(_))
2001    }
2002}
2003
2004impl sum_tree::Item for Entry {
2005    type Summary = EntrySummary;
2006
2007    fn summary(&self) -> Self::Summary {
2008        let visible_count = if self.is_ignored { 0 } else { 1 };
2009        let file_count;
2010        let visible_file_count;
2011        if self.is_file() {
2012            file_count = 1;
2013            visible_file_count = visible_count;
2014        } else {
2015            file_count = 0;
2016            visible_file_count = 0;
2017        }
2018
2019        EntrySummary {
2020            max_path: self.path.clone(),
2021            count: 1,
2022            visible_count,
2023            file_count,
2024            visible_file_count,
2025        }
2026    }
2027}
2028
2029impl sum_tree::KeyedItem for Entry {
2030    type Key = PathKey;
2031
2032    fn key(&self) -> Self::Key {
2033        PathKey(self.path.clone())
2034    }
2035}
2036
2037#[derive(Clone, Debug)]
2038pub struct EntrySummary {
2039    max_path: Arc<Path>,
2040    count: usize,
2041    visible_count: usize,
2042    file_count: usize,
2043    visible_file_count: usize,
2044}
2045
2046impl Default for EntrySummary {
2047    fn default() -> Self {
2048        Self {
2049            max_path: Arc::from(Path::new("")),
2050            count: 0,
2051            visible_count: 0,
2052            file_count: 0,
2053            visible_file_count: 0,
2054        }
2055    }
2056}
2057
2058impl sum_tree::Summary for EntrySummary {
2059    type Context = ();
2060
2061    fn add_summary(&mut self, rhs: &Self, _: &()) {
2062        self.max_path = rhs.max_path.clone();
2063        self.visible_count += rhs.visible_count;
2064        self.file_count += rhs.file_count;
2065        self.visible_file_count += rhs.visible_file_count;
2066    }
2067}
2068
2069#[derive(Clone, Debug)]
2070struct PathEntry {
2071    id: usize,
2072    path: Arc<Path>,
2073    is_ignored: bool,
2074    scan_id: usize,
2075}
2076
2077impl sum_tree::Item for PathEntry {
2078    type Summary = PathEntrySummary;
2079
2080    fn summary(&self) -> Self::Summary {
2081        PathEntrySummary { max_id: self.id }
2082    }
2083}
2084
2085impl sum_tree::KeyedItem for PathEntry {
2086    type Key = usize;
2087
2088    fn key(&self) -> Self::Key {
2089        self.id
2090    }
2091}
2092
2093#[derive(Clone, Debug, Default)]
2094struct PathEntrySummary {
2095    max_id: usize,
2096}
2097
2098impl sum_tree::Summary for PathEntrySummary {
2099    type Context = ();
2100
2101    fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2102        self.max_id = summary.max_id;
2103    }
2104}
2105
2106impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2107    fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2108        *self = summary.max_id;
2109    }
2110}
2111
2112#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2113pub struct PathKey(Arc<Path>);
2114
2115impl Default for PathKey {
2116    fn default() -> Self {
2117        Self(Path::new("").into())
2118    }
2119}
2120
2121impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2122    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2123        self.0 = summary.max_path.clone();
2124    }
2125}
2126
2127struct BackgroundScanner {
2128    fs: Arc<dyn Fs>,
2129    snapshot: Arc<Mutex<Snapshot>>,
2130    notify: Sender<ScanState>,
2131    executor: Arc<executor::Background>,
2132}
2133
2134impl BackgroundScanner {
2135    fn new(
2136        snapshot: Arc<Mutex<Snapshot>>,
2137        notify: Sender<ScanState>,
2138        fs: Arc<dyn Fs>,
2139        executor: Arc<executor::Background>,
2140    ) -> Self {
2141        Self {
2142            fs,
2143            snapshot,
2144            notify,
2145            executor,
2146        }
2147    }
2148
2149    fn abs_path(&self) -> Arc<Path> {
2150        self.snapshot.lock().abs_path.clone()
2151    }
2152
2153    fn snapshot(&self) -> Snapshot {
2154        self.snapshot.lock().clone()
2155    }
2156
2157    async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2158        if self.notify.send(ScanState::Scanning).await.is_err() {
2159            return;
2160        }
2161
2162        if let Err(err) = self.scan_dirs().await {
2163            if self
2164                .notify
2165                .send(ScanState::Err(Arc::new(err)))
2166                .await
2167                .is_err()
2168            {
2169                return;
2170            }
2171        }
2172
2173        if self.notify.send(ScanState::Idle).await.is_err() {
2174            return;
2175        }
2176
2177        futures::pin_mut!(events_rx);
2178        while let Some(events) = events_rx.next().await {
2179            if self.notify.send(ScanState::Scanning).await.is_err() {
2180                break;
2181            }
2182
2183            if !self.process_events(events).await {
2184                break;
2185            }
2186
2187            if self.notify.send(ScanState::Idle).await.is_err() {
2188                break;
2189            }
2190        }
2191    }
2192
2193    async fn scan_dirs(&mut self) -> Result<()> {
2194        let root_char_bag;
2195        let next_entry_id;
2196        let is_dir;
2197        {
2198            let snapshot = self.snapshot.lock();
2199            root_char_bag = snapshot.root_char_bag;
2200            next_entry_id = snapshot.next_entry_id.clone();
2201            is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2202        };
2203
2204        if is_dir {
2205            let path: Arc<Path> = Arc::from(Path::new(""));
2206            let abs_path = self.abs_path();
2207            let (tx, rx) = channel::unbounded();
2208            tx.send(ScanJob {
2209                abs_path: abs_path.to_path_buf(),
2210                path,
2211                ignore_stack: IgnoreStack::none(),
2212                scan_queue: tx.clone(),
2213            })
2214            .await
2215            .unwrap();
2216            drop(tx);
2217
2218            self.executor
2219                .scoped(|scope| {
2220                    for _ in 0..self.executor.num_cpus() {
2221                        scope.spawn(async {
2222                            while let Ok(job) = rx.recv().await {
2223                                if let Err(err) = self
2224                                    .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2225                                    .await
2226                                {
2227                                    log::error!("error scanning {:?}: {}", job.abs_path, err);
2228                                }
2229                            }
2230                        });
2231                    }
2232                })
2233                .await;
2234        }
2235
2236        Ok(())
2237    }
2238
2239    async fn scan_dir(
2240        &self,
2241        root_char_bag: CharBag,
2242        next_entry_id: Arc<AtomicUsize>,
2243        job: &ScanJob,
2244    ) -> Result<()> {
2245        let mut new_entries: Vec<Entry> = Vec::new();
2246        let mut new_jobs: Vec<ScanJob> = Vec::new();
2247        let mut ignore_stack = job.ignore_stack.clone();
2248        let mut new_ignore = None;
2249
2250        let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2251        while let Some(child_abs_path) = child_paths.next().await {
2252            let child_abs_path = match child_abs_path {
2253                Ok(child_abs_path) => child_abs_path,
2254                Err(error) => {
2255                    log::error!("error processing entry {:?}", error);
2256                    continue;
2257                }
2258            };
2259            let child_name = child_abs_path.file_name().unwrap();
2260            let child_path: Arc<Path> = job.path.join(child_name).into();
2261            let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2262                Some(metadata) => metadata,
2263                None => continue,
2264            };
2265
2266            // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2267            if child_name == *GITIGNORE {
2268                match build_gitignore(&child_abs_path, self.fs.as_ref()) {
2269                    Ok(ignore) => {
2270                        let ignore = Arc::new(ignore);
2271                        ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2272                        new_ignore = Some(ignore);
2273                    }
2274                    Err(error) => {
2275                        log::error!(
2276                            "error loading .gitignore file {:?} - {:?}",
2277                            child_name,
2278                            error
2279                        );
2280                    }
2281                }
2282
2283                // Update ignore status of any child entries we've already processed to reflect the
2284                // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2285                // there should rarely be too numerous. Update the ignore stack associated with any
2286                // new jobs as well.
2287                let mut new_jobs = new_jobs.iter_mut();
2288                for entry in &mut new_entries {
2289                    entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2290                    if entry.is_dir() {
2291                        new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2292                            IgnoreStack::all()
2293                        } else {
2294                            ignore_stack.clone()
2295                        };
2296                    }
2297                }
2298            }
2299
2300            let mut child_entry = Entry::new(
2301                child_path.clone(),
2302                &child_metadata,
2303                &next_entry_id,
2304                root_char_bag,
2305            );
2306
2307            if child_metadata.is_dir {
2308                let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2309                child_entry.is_ignored = is_ignored;
2310                new_entries.push(child_entry);
2311                new_jobs.push(ScanJob {
2312                    abs_path: child_abs_path,
2313                    path: child_path,
2314                    ignore_stack: if is_ignored {
2315                        IgnoreStack::all()
2316                    } else {
2317                        ignore_stack.clone()
2318                    },
2319                    scan_queue: job.scan_queue.clone(),
2320                });
2321            } else {
2322                child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2323                new_entries.push(child_entry);
2324            };
2325        }
2326
2327        self.snapshot
2328            .lock()
2329            .populate_dir(job.path.clone(), new_entries, new_ignore);
2330        for new_job in new_jobs {
2331            job.scan_queue.send(new_job).await.unwrap();
2332        }
2333
2334        Ok(())
2335    }
2336
2337    async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2338        let mut snapshot = self.snapshot();
2339        snapshot.scan_id += 1;
2340
2341        let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2342            abs_path
2343        } else {
2344            return false;
2345        };
2346        let root_char_bag = snapshot.root_char_bag;
2347        let next_entry_id = snapshot.next_entry_id.clone();
2348
2349        events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2350        events.dedup_by(|a, b| a.path.starts_with(&b.path));
2351
2352        for event in &events {
2353            match event.path.strip_prefix(&root_abs_path) {
2354                Ok(path) => snapshot.remove_path(&path),
2355                Err(_) => {
2356                    log::error!(
2357                        "unexpected event {:?} for root path {:?}",
2358                        event.path,
2359                        root_abs_path
2360                    );
2361                    continue;
2362                }
2363            }
2364        }
2365
2366        let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2367        for event in events {
2368            let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2369                Ok(path) => Arc::from(path.to_path_buf()),
2370                Err(_) => {
2371                    log::error!(
2372                        "unexpected event {:?} for root path {:?}",
2373                        event.path,
2374                        root_abs_path
2375                    );
2376                    continue;
2377                }
2378            };
2379
2380            match self.fs.metadata(&event.path).await {
2381                Ok(Some(metadata)) => {
2382                    let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2383                    let mut fs_entry = Entry::new(
2384                        path.clone(),
2385                        &metadata,
2386                        snapshot.next_entry_id.as_ref(),
2387                        snapshot.root_char_bag,
2388                    );
2389                    fs_entry.is_ignored = ignore_stack.is_all();
2390                    snapshot.insert_entry(fs_entry, self.fs.as_ref());
2391                    if metadata.is_dir {
2392                        scan_queue_tx
2393                            .send(ScanJob {
2394                                abs_path: event.path,
2395                                path,
2396                                ignore_stack,
2397                                scan_queue: scan_queue_tx.clone(),
2398                            })
2399                            .await
2400                            .unwrap();
2401                    }
2402                }
2403                Ok(None) => {}
2404                Err(err) => {
2405                    // TODO - create a special 'error' entry in the entries tree to mark this
2406                    log::error!("error reading file on event {:?}", err);
2407                }
2408            }
2409        }
2410
2411        *self.snapshot.lock() = snapshot;
2412
2413        // Scan any directories that were created as part of this event batch.
2414        drop(scan_queue_tx);
2415        self.executor
2416            .scoped(|scope| {
2417                for _ in 0..self.executor.num_cpus() {
2418                    scope.spawn(async {
2419                        while let Ok(job) = scan_queue_rx.recv().await {
2420                            if let Err(err) = self
2421                                .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2422                                .await
2423                            {
2424                                log::error!("error scanning {:?}: {}", job.abs_path, err);
2425                            }
2426                        }
2427                    });
2428                }
2429            })
2430            .await;
2431
2432        // Attempt to detect renames only over a single batch of file-system events.
2433        self.snapshot.lock().removed_entry_ids.clear();
2434
2435        self.update_ignore_statuses().await;
2436        true
2437    }
2438
2439    async fn update_ignore_statuses(&self) {
2440        let mut snapshot = self.snapshot();
2441
2442        let mut ignores_to_update = Vec::new();
2443        let mut ignores_to_delete = Vec::new();
2444        for (parent_path, (_, scan_id)) in &snapshot.ignores {
2445            if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2446                ignores_to_update.push(parent_path.clone());
2447            }
2448
2449            let ignore_path = parent_path.join(&*GITIGNORE);
2450            if snapshot.entry_for_path(ignore_path).is_none() {
2451                ignores_to_delete.push(parent_path.clone());
2452            }
2453        }
2454
2455        for parent_path in ignores_to_delete {
2456            snapshot.ignores.remove(&parent_path);
2457            self.snapshot.lock().ignores.remove(&parent_path);
2458        }
2459
2460        let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2461        ignores_to_update.sort_unstable();
2462        let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2463        while let Some(parent_path) = ignores_to_update.next() {
2464            while ignores_to_update
2465                .peek()
2466                .map_or(false, |p| p.starts_with(&parent_path))
2467            {
2468                ignores_to_update.next().unwrap();
2469            }
2470
2471            let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2472            ignore_queue_tx
2473                .send(UpdateIgnoreStatusJob {
2474                    path: parent_path,
2475                    ignore_stack,
2476                    ignore_queue: ignore_queue_tx.clone(),
2477                })
2478                .await
2479                .unwrap();
2480        }
2481        drop(ignore_queue_tx);
2482
2483        self.executor
2484            .scoped(|scope| {
2485                for _ in 0..self.executor.num_cpus() {
2486                    scope.spawn(async {
2487                        while let Ok(job) = ignore_queue_rx.recv().await {
2488                            self.update_ignore_status(job, &snapshot).await;
2489                        }
2490                    });
2491                }
2492            })
2493            .await;
2494    }
2495
2496    async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2497        let mut ignore_stack = job.ignore_stack;
2498        if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2499            ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2500        }
2501
2502        let mut entries_by_id_edits = Vec::new();
2503        let mut entries_by_path_edits = Vec::new();
2504        for mut entry in snapshot.child_entries(&job.path).cloned() {
2505            let was_ignored = entry.is_ignored;
2506            entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2507            if entry.is_dir() {
2508                let child_ignore_stack = if entry.is_ignored {
2509                    IgnoreStack::all()
2510                } else {
2511                    ignore_stack.clone()
2512                };
2513                job.ignore_queue
2514                    .send(UpdateIgnoreStatusJob {
2515                        path: entry.path.clone(),
2516                        ignore_stack: child_ignore_stack,
2517                        ignore_queue: job.ignore_queue.clone(),
2518                    })
2519                    .await
2520                    .unwrap();
2521            }
2522
2523            if entry.is_ignored != was_ignored {
2524                let mut path_entry = snapshot.entries_by_id.get(&entry.id, &()).unwrap().clone();
2525                path_entry.scan_id = snapshot.scan_id;
2526                path_entry.is_ignored = entry.is_ignored;
2527                entries_by_id_edits.push(Edit::Insert(path_entry));
2528                entries_by_path_edits.push(Edit::Insert(entry));
2529            }
2530        }
2531
2532        let mut snapshot = self.snapshot.lock();
2533        snapshot.entries_by_path.edit(entries_by_path_edits, &());
2534        snapshot.entries_by_id.edit(entries_by_id_edits, &());
2535    }
2536}
2537
2538async fn refresh_entry(
2539    fs: &dyn Fs,
2540    snapshot: &Mutex<Snapshot>,
2541    path: Arc<Path>,
2542    abs_path: &Path,
2543) -> Result<Entry> {
2544    let root_char_bag;
2545    let next_entry_id;
2546    {
2547        let snapshot = snapshot.lock();
2548        root_char_bag = snapshot.root_char_bag;
2549        next_entry_id = snapshot.next_entry_id.clone();
2550    }
2551    let entry = Entry::new(
2552        path,
2553        &fs.metadata(abs_path)
2554            .await?
2555            .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2556        &next_entry_id,
2557        root_char_bag,
2558    );
2559    Ok(snapshot.lock().insert_entry(entry, fs))
2560}
2561
2562fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2563    let mut result = root_char_bag;
2564    result.extend(
2565        path.to_string_lossy()
2566            .chars()
2567            .map(|c| c.to_ascii_lowercase()),
2568    );
2569    result
2570}
2571
2572struct ScanJob {
2573    abs_path: PathBuf,
2574    path: Arc<Path>,
2575    ignore_stack: Arc<IgnoreStack>,
2576    scan_queue: Sender<ScanJob>,
2577}
2578
2579struct UpdateIgnoreStatusJob {
2580    path: Arc<Path>,
2581    ignore_stack: Arc<IgnoreStack>,
2582    ignore_queue: Sender<UpdateIgnoreStatusJob>,
2583}
2584
2585pub trait WorktreeHandle {
2586    #[cfg(test)]
2587    fn flush_fs_events<'a>(
2588        &self,
2589        cx: &'a gpui::TestAppContext,
2590    ) -> futures::future::LocalBoxFuture<'a, ()>;
2591}
2592
2593impl WorktreeHandle for ModelHandle<Worktree> {
2594    // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2595    // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2596    // extra directory scans, and emit extra scan-state notifications.
2597    //
2598    // This function mutates the worktree's directory and waits for those mutations to be picked up,
2599    // to ensure that all redundant FS events have already been processed.
2600    #[cfg(test)]
2601    fn flush_fs_events<'a>(
2602        &self,
2603        cx: &'a gpui::TestAppContext,
2604    ) -> futures::future::LocalBoxFuture<'a, ()> {
2605        use smol::future::FutureExt;
2606
2607        let filename = "fs-event-sentinel";
2608        let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2609        let tree = self.clone();
2610        async move {
2611            std::fs::write(root_path.join(filename), "").unwrap();
2612            tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2613                .await;
2614
2615            std::fs::remove_file(root_path.join(filename)).unwrap();
2616            tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2617                .await;
2618
2619            cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2620                .await;
2621        }
2622        .boxed_local()
2623    }
2624}
2625
2626#[derive(Clone, Debug)]
2627struct TraversalProgress<'a> {
2628    max_path: &'a Path,
2629    count: usize,
2630    visible_count: usize,
2631    file_count: usize,
2632    visible_file_count: usize,
2633}
2634
2635impl<'a> TraversalProgress<'a> {
2636    fn count(&self, include_dirs: bool, include_ignored: bool) -> usize {
2637        match (include_ignored, include_dirs) {
2638            (true, true) => self.count,
2639            (true, false) => self.file_count,
2640            (false, true) => self.visible_count,
2641            (false, false) => self.visible_file_count,
2642        }
2643    }
2644}
2645
2646impl<'a> sum_tree::Dimension<'a, EntrySummary> for TraversalProgress<'a> {
2647    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2648        self.max_path = summary.max_path.as_ref();
2649        self.count += summary.count;
2650        self.visible_count += summary.visible_count;
2651        self.file_count += summary.file_count;
2652        self.visible_file_count += summary.visible_file_count;
2653    }
2654}
2655
2656impl<'a> Default for TraversalProgress<'a> {
2657    fn default() -> Self {
2658        Self {
2659            max_path: Path::new(""),
2660            count: 0,
2661            visible_count: 0,
2662            file_count: 0,
2663            visible_file_count: 0,
2664        }
2665    }
2666}
2667
2668pub struct Traversal<'a> {
2669    cursor: sum_tree::Cursor<'a, Entry, TraversalProgress<'a>>,
2670    include_ignored: bool,
2671    include_dirs: bool,
2672}
2673
2674impl<'a> Traversal<'a> {
2675    pub fn advance(&mut self) -> bool {
2676        self.advance_to_offset(self.offset() + 1)
2677    }
2678
2679    pub fn advance_to_offset(&mut self, offset: usize) -> bool {
2680        self.cursor.seek_forward(
2681            &TraversalTarget::Count {
2682                count: offset,
2683                include_dirs: self.include_dirs,
2684                include_ignored: self.include_ignored,
2685            },
2686            Bias::Right,
2687            &(),
2688        )
2689    }
2690
2691    pub fn advance_to_sibling(&mut self) -> bool {
2692        while let Some(entry) = self.cursor.item() {
2693            self.cursor.seek_forward(
2694                &TraversalTarget::PathSuccessor(&entry.path),
2695                Bias::Left,
2696                &(),
2697            );
2698            if let Some(entry) = self.cursor.item() {
2699                if (self.include_dirs || !entry.is_dir())
2700                    && (self.include_ignored || !entry.is_ignored)
2701                {
2702                    return true;
2703                }
2704            }
2705        }
2706        false
2707    }
2708
2709    pub fn entry(&self) -> Option<&'a Entry> {
2710        self.cursor.item()
2711    }
2712
2713    pub fn offset(&self) -> usize {
2714        self.cursor
2715            .start()
2716            .count(self.include_dirs, self.include_ignored)
2717    }
2718}
2719
2720impl<'a> Iterator for Traversal<'a> {
2721    type Item = &'a Entry;
2722
2723    fn next(&mut self) -> Option<Self::Item> {
2724        if let Some(item) = self.entry() {
2725            self.advance();
2726            Some(item)
2727        } else {
2728            None
2729        }
2730    }
2731}
2732
2733#[derive(Debug)]
2734enum TraversalTarget<'a> {
2735    Path(&'a Path),
2736    PathSuccessor(&'a Path),
2737    Count {
2738        count: usize,
2739        include_ignored: bool,
2740        include_dirs: bool,
2741    },
2742}
2743
2744impl<'a, 'b> SeekTarget<'a, EntrySummary, TraversalProgress<'a>> for TraversalTarget<'b> {
2745    fn cmp(&self, cursor_location: &TraversalProgress<'a>, _: &()) -> Ordering {
2746        match self {
2747            TraversalTarget::Path(path) => path.cmp(&cursor_location.max_path),
2748            TraversalTarget::PathSuccessor(path) => {
2749                if !cursor_location.max_path.starts_with(path) {
2750                    Ordering::Equal
2751                } else {
2752                    Ordering::Greater
2753                }
2754            }
2755            TraversalTarget::Count {
2756                count,
2757                include_dirs,
2758                include_ignored,
2759            } => Ord::cmp(
2760                count,
2761                &cursor_location.count(*include_dirs, *include_ignored),
2762            ),
2763        }
2764    }
2765}
2766
2767struct ChildEntriesIter<'a> {
2768    parent_path: &'a Path,
2769    traversal: Traversal<'a>,
2770}
2771
2772impl<'a> Iterator for ChildEntriesIter<'a> {
2773    type Item = &'a Entry;
2774
2775    fn next(&mut self) -> Option<Self::Item> {
2776        if let Some(item) = self.traversal.entry() {
2777            if item.path.starts_with(&self.parent_path) {
2778                self.traversal.advance_to_sibling();
2779                return Some(item);
2780            }
2781        }
2782        None
2783    }
2784}
2785
2786impl<'a> From<&'a Entry> for proto::Entry {
2787    fn from(entry: &'a Entry) -> Self {
2788        Self {
2789            id: entry.id as u64,
2790            is_dir: entry.is_dir(),
2791            path: entry.path.to_string_lossy().to_string(),
2792            inode: entry.inode,
2793            mtime: Some(entry.mtime.into()),
2794            is_symlink: entry.is_symlink,
2795            is_ignored: entry.is_ignored,
2796        }
2797    }
2798}
2799
2800impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2801    type Error = anyhow::Error;
2802
2803    fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2804        if let Some(mtime) = entry.mtime {
2805            let kind = if entry.is_dir {
2806                EntryKind::Dir
2807            } else {
2808                let mut char_bag = root_char_bag.clone();
2809                char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2810                EntryKind::File(char_bag)
2811            };
2812            let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2813            Ok(Entry {
2814                id: entry.id as usize,
2815                kind,
2816                path: path.clone(),
2817                inode: entry.inode,
2818                mtime: mtime.into(),
2819                is_symlink: entry.is_symlink,
2820                is_ignored: entry.is_ignored,
2821            })
2822        } else {
2823            Err(anyhow!(
2824                "missing mtime in remote worktree entry {:?}",
2825                entry.path
2826            ))
2827        }
2828    }
2829}
2830
2831#[cfg(test)]
2832mod tests {
2833    use super::*;
2834    use crate::fs::FakeFs;
2835    use anyhow::Result;
2836    use fs::RealFs;
2837    use rand::prelude::*;
2838    use rpc_client::test::FakeServer;
2839    use serde_json::json;
2840    use std::{cell::RefCell, rc::Rc};
2841    use std::{
2842        env,
2843        fmt::Write,
2844        time::{SystemTime, UNIX_EPOCH},
2845    };
2846    use util::test::temp_tree;
2847
2848    #[gpui::test]
2849    async fn test_traversal(cx: gpui::TestAppContext) {
2850        let fs = FakeFs::new();
2851        fs.insert_tree(
2852            "/root",
2853            json!({
2854               ".gitignore": "a/b\n",
2855               "a": {
2856                   "b": "",
2857                   "c": "",
2858               }
2859            }),
2860        )
2861        .await;
2862
2863        let tree = Worktree::open_local(
2864            rpc::Client::new(),
2865            Arc::from(Path::new("/root")),
2866            Arc::new(fs),
2867            Default::default(),
2868            &mut cx.to_async(),
2869        )
2870        .await
2871        .unwrap();
2872        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2873            .await;
2874
2875        tree.read_with(&cx, |tree, _| {
2876            assert_eq!(
2877                tree.entries(false)
2878                    .map(|entry| entry.path.as_ref())
2879                    .collect::<Vec<_>>(),
2880                vec![
2881                    Path::new(""),
2882                    Path::new(".gitignore"),
2883                    Path::new("a"),
2884                    Path::new("a/c"),
2885                ]
2886            );
2887        })
2888    }
2889
2890    #[gpui::test]
2891    async fn test_save_file(mut cx: gpui::TestAppContext) {
2892        let dir = temp_tree(json!({
2893            "file1": "the old contents",
2894        }));
2895        let tree = Worktree::open_local(
2896            rpc::Client::new(),
2897            dir.path(),
2898            Arc::new(RealFs),
2899            Default::default(),
2900            &mut cx.to_async(),
2901        )
2902        .await
2903        .unwrap();
2904        let buffer = tree
2905            .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2906            .await
2907            .unwrap();
2908        let save = buffer.update(&mut cx, |buffer, cx| {
2909            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2910            buffer.save(cx).unwrap()
2911        });
2912        save.await.unwrap();
2913
2914        let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2915        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2916    }
2917
2918    #[gpui::test]
2919    async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2920        let dir = temp_tree(json!({
2921            "file1": "the old contents",
2922        }));
2923        let file_path = dir.path().join("file1");
2924
2925        let tree = Worktree::open_local(
2926            rpc::Client::new(),
2927            file_path.clone(),
2928            Arc::new(RealFs),
2929            Default::default(),
2930            &mut cx.to_async(),
2931        )
2932        .await
2933        .unwrap();
2934        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2935            .await;
2936        cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2937
2938        let buffer = tree
2939            .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2940            .await
2941            .unwrap();
2942        let save = buffer.update(&mut cx, |buffer, cx| {
2943            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2944            buffer.save(cx).unwrap()
2945        });
2946        save.await.unwrap();
2947
2948        let new_text = std::fs::read_to_string(file_path).unwrap();
2949        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2950    }
2951
2952    #[gpui::test]
2953    async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2954        let dir = temp_tree(json!({
2955            "a": {
2956                "file1": "",
2957                "file2": "",
2958                "file3": "",
2959            },
2960            "b": {
2961                "c": {
2962                    "file4": "",
2963                    "file5": "",
2964                }
2965            }
2966        }));
2967
2968        let user_id = 5;
2969        let mut client = rpc::Client::new();
2970        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
2971        let tree = Worktree::open_local(
2972            client,
2973            dir.path(),
2974            Arc::new(RealFs),
2975            Default::default(),
2976            &mut cx.to_async(),
2977        )
2978        .await
2979        .unwrap();
2980
2981        let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2982            let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2983            async move { buffer.await.unwrap() }
2984        };
2985        let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2986            tree.read_with(cx, |tree, _| {
2987                tree.entry_for_path(path)
2988                    .expect(&format!("no entry for path {}", path))
2989                    .id
2990            })
2991        };
2992
2993        let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2994        let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2995        let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2996        let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2997
2998        let file2_id = id_for_path("a/file2", &cx);
2999        let file3_id = id_for_path("a/file3", &cx);
3000        let file4_id = id_for_path("b/c/file4", &cx);
3001
3002        // Wait for the initial scan.
3003        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3004            .await;
3005
3006        // Create a remote copy of this worktree.
3007        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3008        let worktree_id = 1;
3009        let share_request = tree.update(&mut cx, |tree, cx| {
3010            tree.as_local().unwrap().share_request(cx)
3011        });
3012        let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3013        server
3014            .respond(
3015                open_worktree.receipt(),
3016                proto::OpenWorktreeResponse { worktree_id: 1 },
3017            )
3018            .await;
3019
3020        let remote = Worktree::remote(
3021            proto::JoinWorktreeResponse {
3022                worktree: share_request.await.unwrap().worktree,
3023                replica_id: 1,
3024                peers: Vec::new(),
3025            },
3026            rpc::Client::new(),
3027            Default::default(),
3028            &mut cx.to_async(),
3029        )
3030        .await
3031        .unwrap();
3032
3033        cx.read(|cx| {
3034            assert!(!buffer2.read(cx).is_dirty());
3035            assert!(!buffer3.read(cx).is_dirty());
3036            assert!(!buffer4.read(cx).is_dirty());
3037            assert!(!buffer5.read(cx).is_dirty());
3038        });
3039
3040        // Rename and delete files and directories.
3041        tree.flush_fs_events(&cx).await;
3042        std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3043        std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3044        std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3045        std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3046        tree.flush_fs_events(&cx).await;
3047
3048        let expected_paths = vec![
3049            "a",
3050            "a/file1",
3051            "a/file2.new",
3052            "b",
3053            "d",
3054            "d/file3",
3055            "d/file4",
3056        ];
3057
3058        cx.read(|app| {
3059            assert_eq!(
3060                tree.read(app)
3061                    .paths()
3062                    .map(|p| p.to_str().unwrap())
3063                    .collect::<Vec<_>>(),
3064                expected_paths
3065            );
3066
3067            assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3068            assert_eq!(id_for_path("d/file3", &cx), file3_id);
3069            assert_eq!(id_for_path("d/file4", &cx), file4_id);
3070
3071            assert_eq!(
3072                buffer2.read(app).file().unwrap().path().as_ref(),
3073                Path::new("a/file2.new")
3074            );
3075            assert_eq!(
3076                buffer3.read(app).file().unwrap().path().as_ref(),
3077                Path::new("d/file3")
3078            );
3079            assert_eq!(
3080                buffer4.read(app).file().unwrap().path().as_ref(),
3081                Path::new("d/file4")
3082            );
3083            assert_eq!(
3084                buffer5.read(app).file().unwrap().path().as_ref(),
3085                Path::new("b/c/file5")
3086            );
3087
3088            assert!(!buffer2.read(app).file().unwrap().is_deleted());
3089            assert!(!buffer3.read(app).file().unwrap().is_deleted());
3090            assert!(!buffer4.read(app).file().unwrap().is_deleted());
3091            assert!(buffer5.read(app).file().unwrap().is_deleted());
3092        });
3093
3094        // Update the remote worktree. Check that it becomes consistent with the
3095        // local worktree.
3096        remote.update(&mut cx, |remote, cx| {
3097            let update_message =
3098                tree.read(cx)
3099                    .snapshot()
3100                    .build_update(&initial_snapshot, worktree_id, true);
3101            remote
3102                .as_remote_mut()
3103                .unwrap()
3104                .snapshot
3105                .apply_update(update_message)
3106                .unwrap();
3107
3108            assert_eq!(
3109                remote
3110                    .paths()
3111                    .map(|p| p.to_str().unwrap())
3112                    .collect::<Vec<_>>(),
3113                expected_paths
3114            );
3115        });
3116    }
3117
3118    #[gpui::test]
3119    async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
3120        let dir = temp_tree(json!({
3121            ".git": {},
3122            ".gitignore": "ignored-dir\n",
3123            "tracked-dir": {
3124                "tracked-file1": "tracked contents",
3125            },
3126            "ignored-dir": {
3127                "ignored-file1": "ignored contents",
3128            }
3129        }));
3130
3131        let tree = Worktree::open_local(
3132            rpc::Client::new(),
3133            dir.path(),
3134            Arc::new(RealFs),
3135            Default::default(),
3136            &mut cx.to_async(),
3137        )
3138        .await
3139        .unwrap();
3140        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3141            .await;
3142        tree.flush_fs_events(&cx).await;
3143        cx.read(|cx| {
3144            let tree = tree.read(cx);
3145            let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3146            let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3147            assert_eq!(tracked.is_ignored, false);
3148            assert_eq!(ignored.is_ignored, true);
3149        });
3150
3151        std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3152        std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3153        tree.flush_fs_events(&cx).await;
3154        cx.read(|cx| {
3155            let tree = tree.read(cx);
3156            let dot_git = tree.entry_for_path(".git").unwrap();
3157            let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3158            let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3159            assert_eq!(tracked.is_ignored, false);
3160            assert_eq!(ignored.is_ignored, true);
3161            assert_eq!(dot_git.is_ignored, true);
3162        });
3163    }
3164
3165    #[gpui::test]
3166    async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3167        let user_id = 100;
3168        let mut client = rpc::Client::new();
3169        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3170
3171        let fs = Arc::new(FakeFs::new());
3172        fs.insert_tree(
3173            "/path",
3174            json!({
3175                "to": {
3176                    "the-dir": {
3177                        ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3178                        "a.txt": "a-contents",
3179                    },
3180                },
3181            }),
3182        )
3183        .await;
3184
3185        let worktree = Worktree::open_local(
3186            client.clone(),
3187            "/path/to/the-dir".as_ref(),
3188            fs,
3189            Default::default(),
3190            &mut cx.to_async(),
3191        )
3192        .await
3193        .unwrap();
3194
3195        {
3196            let cx = cx.to_async();
3197            client.authenticate_and_connect(&cx).await.unwrap();
3198        }
3199
3200        let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3201        assert_eq!(
3202            open_worktree.payload,
3203            proto::OpenWorktree {
3204                root_name: "the-dir".to_string(),
3205                collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3206            }
3207        );
3208
3209        server
3210            .respond(
3211                open_worktree.receipt(),
3212                proto::OpenWorktreeResponse { worktree_id: 5 },
3213            )
3214            .await;
3215        let remote_id = worktree
3216            .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3217            .await;
3218        assert_eq!(remote_id, Some(5));
3219
3220        cx.update(move |_| drop(worktree));
3221        server.receive::<proto::CloseWorktree>().await.unwrap();
3222    }
3223
3224    #[gpui::test]
3225    async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
3226        use std::fs;
3227
3228        let dir = temp_tree(json!({
3229            "file1": "abc",
3230            "file2": "def",
3231            "file3": "ghi",
3232        }));
3233        let tree = Worktree::open_local(
3234            rpc::Client::new(),
3235            dir.path(),
3236            Arc::new(RealFs),
3237            Default::default(),
3238            &mut cx.to_async(),
3239        )
3240        .await
3241        .unwrap();
3242        tree.flush_fs_events(&cx).await;
3243        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3244            .await;
3245
3246        let buffer1 = tree
3247            .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3248            .await
3249            .unwrap();
3250        let events = Rc::new(RefCell::new(Vec::new()));
3251
3252        // initially, the buffer isn't dirty.
3253        buffer1.update(&mut cx, |buffer, cx| {
3254            cx.subscribe(&buffer1, {
3255                let events = events.clone();
3256                move |_, _, event, _| events.borrow_mut().push(event.clone())
3257            })
3258            .detach();
3259
3260            assert!(!buffer.is_dirty());
3261            assert!(events.borrow().is_empty());
3262
3263            buffer.edit(vec![1..2], "", cx);
3264        });
3265
3266        // after the first edit, the buffer is dirty, and emits a dirtied event.
3267        buffer1.update(&mut cx, |buffer, cx| {
3268            assert!(buffer.text() == "ac");
3269            assert!(buffer.is_dirty());
3270            assert_eq!(
3271                *events.borrow(),
3272                &[buffer::Event::Edited, buffer::Event::Dirtied]
3273            );
3274            events.borrow_mut().clear();
3275            buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
3276        });
3277
3278        // after saving, the buffer is not dirty, and emits a saved event.
3279        buffer1.update(&mut cx, |buffer, cx| {
3280            assert!(!buffer.is_dirty());
3281            assert_eq!(*events.borrow(), &[buffer::Event::Saved]);
3282            events.borrow_mut().clear();
3283
3284            buffer.edit(vec![1..1], "B", cx);
3285            buffer.edit(vec![2..2], "D", cx);
3286        });
3287
3288        // after editing again, the buffer is dirty, and emits another dirty event.
3289        buffer1.update(&mut cx, |buffer, cx| {
3290            assert!(buffer.text() == "aBDc");
3291            assert!(buffer.is_dirty());
3292            assert_eq!(
3293                *events.borrow(),
3294                &[
3295                    buffer::Event::Edited,
3296                    buffer::Event::Dirtied,
3297                    buffer::Event::Edited
3298                ],
3299            );
3300            events.borrow_mut().clear();
3301
3302            // TODO - currently, after restoring the buffer to its
3303            // previously-saved state, the is still considered dirty.
3304            buffer.edit(vec![1..3], "", cx);
3305            assert!(buffer.text() == "ac");
3306            assert!(buffer.is_dirty());
3307        });
3308
3309        assert_eq!(*events.borrow(), &[buffer::Event::Edited]);
3310
3311        // When a file is deleted, the buffer is considered dirty.
3312        let events = Rc::new(RefCell::new(Vec::new()));
3313        let buffer2 = tree
3314            .update(&mut cx, |tree, cx| tree.open_buffer("file2", cx))
3315            .await
3316            .unwrap();
3317        buffer2.update(&mut cx, |_, cx| {
3318            cx.subscribe(&buffer2, {
3319                let events = events.clone();
3320                move |_, _, event, _| events.borrow_mut().push(event.clone())
3321            })
3322            .detach();
3323        });
3324
3325        fs::remove_file(dir.path().join("file2")).unwrap();
3326        buffer2.condition(&cx, |b, _| b.is_dirty()).await;
3327        assert_eq!(
3328            *events.borrow(),
3329            &[buffer::Event::Dirtied, buffer::Event::FileHandleChanged]
3330        );
3331
3332        // When a file is already dirty when deleted, we don't emit a Dirtied event.
3333        let events = Rc::new(RefCell::new(Vec::new()));
3334        let buffer3 = tree
3335            .update(&mut cx, |tree, cx| tree.open_buffer("file3", cx))
3336            .await
3337            .unwrap();
3338        buffer3.update(&mut cx, |_, cx| {
3339            cx.subscribe(&buffer3, {
3340                let events = events.clone();
3341                move |_, _, event, _| events.borrow_mut().push(event.clone())
3342            })
3343            .detach();
3344        });
3345
3346        tree.flush_fs_events(&cx).await;
3347        buffer3.update(&mut cx, |buffer, cx| {
3348            buffer.edit(Some(0..0), "x", cx);
3349        });
3350        events.borrow_mut().clear();
3351        fs::remove_file(dir.path().join("file3")).unwrap();
3352        buffer3
3353            .condition(&cx, |_, _| !events.borrow().is_empty())
3354            .await;
3355        assert_eq!(*events.borrow(), &[buffer::Event::FileHandleChanged]);
3356        cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
3357    }
3358
3359    #[gpui::test]
3360    async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
3361        use buffer::{Point, Selection, SelectionGoal, ToPoint};
3362        use std::fs;
3363
3364        let initial_contents = "aaa\nbbbbb\nc\n";
3365        let dir = temp_tree(json!({ "the-file": initial_contents }));
3366        let tree = Worktree::open_local(
3367            rpc::Client::new(),
3368            dir.path(),
3369            Arc::new(RealFs),
3370            Default::default(),
3371            &mut cx.to_async(),
3372        )
3373        .await
3374        .unwrap();
3375        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3376            .await;
3377
3378        let abs_path = dir.path().join("the-file");
3379        let buffer = tree
3380            .update(&mut cx, |tree, cx| {
3381                tree.open_buffer(Path::new("the-file"), cx)
3382            })
3383            .await
3384            .unwrap();
3385
3386        // Add a cursor at the start of each row.
3387        let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
3388            assert!(!buffer.is_dirty());
3389            buffer.add_selection_set(
3390                (0..3)
3391                    .map(|row| {
3392                        let anchor = buffer.anchor_at(Point::new(row, 0), Bias::Right);
3393                        Selection {
3394                            id: row as usize,
3395                            start: anchor.clone(),
3396                            end: anchor,
3397                            reversed: false,
3398                            goal: SelectionGoal::None,
3399                        }
3400                    })
3401                    .collect::<Vec<_>>(),
3402                cx,
3403            )
3404        });
3405
3406        // Change the file on disk, adding two new lines of text, and removing
3407        // one line.
3408        buffer.read_with(&cx, |buffer, _| {
3409            assert!(!buffer.is_dirty());
3410            assert!(!buffer.has_conflict());
3411        });
3412        let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
3413        fs::write(&abs_path, new_contents).unwrap();
3414
3415        // Because the buffer was not modified, it is reloaded from disk. Its
3416        // contents are edited according to the diff between the old and new
3417        // file contents.
3418        buffer
3419            .condition(&cx, |buffer, _| buffer.text() != initial_contents)
3420            .await;
3421
3422        buffer.update(&mut cx, |buffer, _| {
3423            assert_eq!(buffer.text(), new_contents);
3424            assert!(!buffer.is_dirty());
3425            assert!(!buffer.has_conflict());
3426
3427            let set = buffer.selection_set(selection_set_id).unwrap();
3428            let cursor_positions = set
3429                .selections
3430                .iter()
3431                .map(|selection| {
3432                    assert_eq!(selection.start, selection.end);
3433                    selection.start.to_point(&*buffer)
3434                })
3435                .collect::<Vec<_>>();
3436            assert_eq!(
3437                cursor_positions,
3438                &[Point::new(1, 0), Point::new(3, 0), Point::new(4, 0),]
3439            );
3440        });
3441
3442        // Modify the buffer
3443        buffer.update(&mut cx, |buffer, cx| {
3444            buffer.edit(vec![0..0], " ", cx);
3445            assert!(buffer.is_dirty());
3446        });
3447
3448        // Change the file on disk again, adding blank lines to the beginning.
3449        fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
3450
3451        // Becaues the buffer is modified, it doesn't reload from disk, but is
3452        // marked as having a conflict.
3453        buffer
3454            .condition(&cx, |buffer, _| buffer.has_conflict())
3455            .await;
3456    }
3457
3458    #[gpui::test(iterations = 100)]
3459    fn test_random(mut rng: StdRng) {
3460        let operations = env::var("OPERATIONS")
3461            .map(|o| o.parse().unwrap())
3462            .unwrap_or(40);
3463        let initial_entries = env::var("INITIAL_ENTRIES")
3464            .map(|o| o.parse().unwrap())
3465            .unwrap_or(20);
3466
3467        let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3468        for _ in 0..initial_entries {
3469            randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3470        }
3471        log::info!("Generated initial tree");
3472
3473        let (notify_tx, _notify_rx) = smol::channel::unbounded();
3474        let fs = Arc::new(RealFs);
3475        let next_entry_id = Arc::new(AtomicUsize::new(0));
3476        let mut initial_snapshot = Snapshot {
3477            id: 0,
3478            scan_id: 0,
3479            abs_path: root_dir.path().into(),
3480            entries_by_path: Default::default(),
3481            entries_by_id: Default::default(),
3482            removed_entry_ids: Default::default(),
3483            ignores: Default::default(),
3484            root_name: Default::default(),
3485            root_char_bag: Default::default(),
3486            next_entry_id: next_entry_id.clone(),
3487        };
3488        initial_snapshot.insert_entry(
3489            Entry::new(
3490                Path::new("").into(),
3491                &smol::block_on(fs.metadata(root_dir.path()))
3492                    .unwrap()
3493                    .unwrap(),
3494                &next_entry_id,
3495                Default::default(),
3496            ),
3497            fs.as_ref(),
3498        );
3499        let mut scanner = BackgroundScanner::new(
3500            Arc::new(Mutex::new(initial_snapshot.clone())),
3501            notify_tx,
3502            fs.clone(),
3503            Arc::new(gpui::executor::Background::new()),
3504        );
3505        smol::block_on(scanner.scan_dirs()).unwrap();
3506        scanner.snapshot().check_invariants();
3507
3508        let mut events = Vec::new();
3509        let mut snapshots = Vec::new();
3510        let mut mutations_len = operations;
3511        while mutations_len > 1 {
3512            if !events.is_empty() && rng.gen_bool(0.4) {
3513                let len = rng.gen_range(0..=events.len());
3514                let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3515                log::info!("Delivering events: {:#?}", to_deliver);
3516                smol::block_on(scanner.process_events(to_deliver));
3517                scanner.snapshot().check_invariants();
3518            } else {
3519                events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3520                mutations_len -= 1;
3521            }
3522
3523            if rng.gen_bool(0.2) {
3524                snapshots.push(scanner.snapshot());
3525            }
3526        }
3527        log::info!("Quiescing: {:#?}", events);
3528        smol::block_on(scanner.process_events(events));
3529        scanner.snapshot().check_invariants();
3530
3531        let (notify_tx, _notify_rx) = smol::channel::unbounded();
3532        let mut new_scanner = BackgroundScanner::new(
3533            Arc::new(Mutex::new(initial_snapshot)),
3534            notify_tx,
3535            scanner.fs.clone(),
3536            scanner.executor.clone(),
3537        );
3538        smol::block_on(new_scanner.scan_dirs()).unwrap();
3539        assert_eq!(
3540            scanner.snapshot().to_vec(true),
3541            new_scanner.snapshot().to_vec(true)
3542        );
3543
3544        for mut prev_snapshot in snapshots {
3545            let include_ignored = rng.gen::<bool>();
3546            if !include_ignored {
3547                let mut entries_by_path_edits = Vec::new();
3548                let mut entries_by_id_edits = Vec::new();
3549                for entry in prev_snapshot
3550                    .entries_by_id
3551                    .cursor::<()>()
3552                    .filter(|e| e.is_ignored)
3553                {
3554                    entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
3555                    entries_by_id_edits.push(Edit::Remove(entry.id));
3556                }
3557
3558                prev_snapshot
3559                    .entries_by_path
3560                    .edit(entries_by_path_edits, &());
3561                prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
3562            }
3563
3564            let update = scanner
3565                .snapshot()
3566                .build_update(&prev_snapshot, 0, include_ignored);
3567            prev_snapshot.apply_update(update).unwrap();
3568            assert_eq!(
3569                prev_snapshot.to_vec(true),
3570                scanner.snapshot().to_vec(include_ignored)
3571            );
3572        }
3573    }
3574
3575    fn randomly_mutate_tree(
3576        root_path: &Path,
3577        insertion_probability: f64,
3578        rng: &mut impl Rng,
3579    ) -> Result<Vec<fsevent::Event>> {
3580        let root_path = root_path.canonicalize().unwrap();
3581        let (dirs, files) = read_dir_recursive(root_path.clone());
3582
3583        let mut events = Vec::new();
3584        let mut record_event = |path: PathBuf| {
3585            events.push(fsevent::Event {
3586                event_id: SystemTime::now()
3587                    .duration_since(UNIX_EPOCH)
3588                    .unwrap()
3589                    .as_secs(),
3590                flags: fsevent::StreamFlags::empty(),
3591                path,
3592            });
3593        };
3594
3595        if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3596            let path = dirs.choose(rng).unwrap();
3597            let new_path = path.join(gen_name(rng));
3598
3599            if rng.gen() {
3600                log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3601                std::fs::create_dir(&new_path)?;
3602            } else {
3603                log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3604                std::fs::write(&new_path, "")?;
3605            }
3606            record_event(new_path);
3607        } else if rng.gen_bool(0.05) {
3608            let ignore_dir_path = dirs.choose(rng).unwrap();
3609            let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3610
3611            let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3612            let files_to_ignore = {
3613                let len = rng.gen_range(0..=subfiles.len());
3614                subfiles.choose_multiple(rng, len)
3615            };
3616            let dirs_to_ignore = {
3617                let len = rng.gen_range(0..subdirs.len());
3618                subdirs.choose_multiple(rng, len)
3619            };
3620
3621            let mut ignore_contents = String::new();
3622            for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3623                write!(
3624                    ignore_contents,
3625                    "{}\n",
3626                    path_to_ignore
3627                        .strip_prefix(&ignore_dir_path)?
3628                        .to_str()
3629                        .unwrap()
3630                )
3631                .unwrap();
3632            }
3633            log::info!(
3634                "Creating {:?} with contents:\n{}",
3635                ignore_path.strip_prefix(&root_path)?,
3636                ignore_contents
3637            );
3638            std::fs::write(&ignore_path, ignore_contents).unwrap();
3639            record_event(ignore_path);
3640        } else {
3641            let old_path = {
3642                let file_path = files.choose(rng);
3643                let dir_path = dirs[1..].choose(rng);
3644                file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3645            };
3646
3647            let is_rename = rng.gen();
3648            if is_rename {
3649                let new_path_parent = dirs
3650                    .iter()
3651                    .filter(|d| !d.starts_with(old_path))
3652                    .choose(rng)
3653                    .unwrap();
3654
3655                let overwrite_existing_dir =
3656                    !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3657                let new_path = if overwrite_existing_dir {
3658                    std::fs::remove_dir_all(&new_path_parent).ok();
3659                    new_path_parent.to_path_buf()
3660                } else {
3661                    new_path_parent.join(gen_name(rng))
3662                };
3663
3664                log::info!(
3665                    "Renaming {:?} to {}{:?}",
3666                    old_path.strip_prefix(&root_path)?,
3667                    if overwrite_existing_dir {
3668                        "overwrite "
3669                    } else {
3670                        ""
3671                    },
3672                    new_path.strip_prefix(&root_path)?
3673                );
3674                std::fs::rename(&old_path, &new_path)?;
3675                record_event(old_path.clone());
3676                record_event(new_path);
3677            } else if old_path.is_dir() {
3678                let (dirs, files) = read_dir_recursive(old_path.clone());
3679
3680                log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3681                std::fs::remove_dir_all(&old_path).unwrap();
3682                for file in files {
3683                    record_event(file);
3684                }
3685                for dir in dirs {
3686                    record_event(dir);
3687                }
3688            } else {
3689                log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3690                std::fs::remove_file(old_path).unwrap();
3691                record_event(old_path.clone());
3692            }
3693        }
3694
3695        Ok(events)
3696    }
3697
3698    fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3699        let child_entries = std::fs::read_dir(&path).unwrap();
3700        let mut dirs = vec![path];
3701        let mut files = Vec::new();
3702        for child_entry in child_entries {
3703            let child_path = child_entry.unwrap().path();
3704            if child_path.is_dir() {
3705                let (child_dirs, child_files) = read_dir_recursive(child_path);
3706                dirs.extend(child_dirs);
3707                files.extend(child_files);
3708            } else {
3709                files.push(child_path);
3710            }
3711        }
3712        (dirs, files)
3713    }
3714
3715    fn gen_name(rng: &mut impl Rng) -> String {
3716        (0..6)
3717            .map(|_| rng.sample(rand::distributions::Alphanumeric))
3718            .map(char::from)
3719            .collect()
3720    }
3721
3722    impl Snapshot {
3723        fn check_invariants(&self) {
3724            let mut files = self.files(true, 0);
3725            let mut visible_files = self.files(false, 0);
3726            for entry in self.entries_by_path.cursor::<()>() {
3727                if entry.is_file() {
3728                    assert_eq!(files.next().unwrap().inode, entry.inode);
3729                    if !entry.is_ignored {
3730                        assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3731                    }
3732                }
3733            }
3734            assert!(files.next().is_none());
3735            assert!(visible_files.next().is_none());
3736
3737            let mut bfs_paths = Vec::new();
3738            let mut stack = vec![Path::new("")];
3739            while let Some(path) = stack.pop() {
3740                bfs_paths.push(path);
3741                let ix = stack.len();
3742                for child_entry in self.child_entries(path) {
3743                    stack.insert(ix, &child_entry.path);
3744                }
3745            }
3746
3747            let dfs_paths = self
3748                .entries_by_path
3749                .cursor::<()>()
3750                .map(|e| e.path.as_ref())
3751                .collect::<Vec<_>>();
3752            assert_eq!(bfs_paths, dfs_paths);
3753
3754            for (ignore_parent_path, _) in &self.ignores {
3755                assert!(self.entry_for_path(ignore_parent_path).is_some());
3756                assert!(self
3757                    .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3758                    .is_some());
3759            }
3760        }
3761
3762        fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
3763            let mut paths = Vec::new();
3764            for entry in self.entries_by_path.cursor::<()>() {
3765                if include_ignored || !entry.is_ignored {
3766                    paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3767                }
3768            }
3769            paths.sort_by(|a, b| a.0.cmp(&b.0));
3770            paths
3771        }
3772    }
3773}