worktree.rs

   1mod ignore;
   2
   3use self::ignore::IgnoreStack;
   4use crate::{
   5    editor::{self, Buffer, History, Operation, Rope},
   6    fs::{self, Fs},
   7    fuzzy,
   8    fuzzy::CharBag,
   9    language::LanguageRegistry,
  10    rpc::{self, proto, Status},
  11    time::{self, ReplicaId},
  12    util::{Bias, TryFutureExt},
  13};
  14use ::ignore::gitignore::Gitignore;
  15use anyhow::{anyhow, Result};
  16use futures::{Stream, StreamExt};
  17pub use fuzzy::{match_paths, PathMatch};
  18use gpui::{
  19    executor,
  20    sum_tree::{self, Cursor, Edit, SumTree},
  21    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task,
  22    UpgradeModelHandle, WeakModelHandle,
  23};
  24use lazy_static::lazy_static;
  25use parking_lot::Mutex;
  26use postage::{
  27    prelude::{Sink as _, Stream as _},
  28    watch,
  29};
  30use serde::Deserialize;
  31use smol::channel::{self, Sender};
  32use std::{
  33    cmp::{self, Ordering},
  34    collections::HashMap,
  35    convert::{TryFrom, TryInto},
  36    ffi::{OsStr, OsString},
  37    fmt,
  38    future::Future,
  39    ops::Deref,
  40    path::{Path, PathBuf},
  41    sync::{
  42        atomic::{AtomicUsize, Ordering::SeqCst},
  43        Arc,
  44    },
  45    time::{Duration, SystemTime},
  46};
  47use zrpc::{PeerId, TypedEnvelope};
  48
  49lazy_static! {
  50    static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
  51}
  52
  53#[derive(Clone, Debug)]
  54enum ScanState {
  55    Idle,
  56    Scanning,
  57    Err(Arc<anyhow::Error>),
  58}
  59
  60pub enum Worktree {
  61    Local(LocalWorktree),
  62    Remote(RemoteWorktree),
  63}
  64
  65impl Entity for Worktree {
  66    type Event = ();
  67
  68    fn release(&mut self, cx: &mut MutableAppContext) {
  69        let rpc = match self {
  70            Self::Local(tree) => tree
  71                .remote_id
  72                .borrow()
  73                .map(|remote_id| (tree.rpc.clone(), remote_id)),
  74            Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
  75        };
  76
  77        if let Some((rpc, worktree_id)) = rpc {
  78            cx.spawn(|_| async move {
  79                if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
  80                    log::error!("error closing worktree {}: {}", worktree_id, err);
  81                }
  82            })
  83            .detach();
  84        }
  85    }
  86}
  87
  88impl Worktree {
  89    pub async fn open_local(
  90        rpc: Arc<rpc::Client>,
  91        path: impl Into<Arc<Path>>,
  92        fs: Arc<dyn Fs>,
  93        languages: Arc<LanguageRegistry>,
  94        cx: &mut AsyncAppContext,
  95    ) -> Result<ModelHandle<Self>> {
  96        let (tree, scan_states_tx) =
  97            LocalWorktree::new(rpc, path, fs.clone(), languages, cx).await?;
  98        tree.update(cx, |tree, cx| {
  99            let tree = tree.as_local_mut().unwrap();
 100            let abs_path = tree.snapshot.abs_path.clone();
 101            let background_snapshot = tree.background_snapshot.clone();
 102            let background = cx.background().clone();
 103            tree._background_scanner_task = Some(cx.background().spawn(async move {
 104                let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
 105                let scanner =
 106                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
 107                scanner.run(events).await;
 108            }));
 109        });
 110        Ok(tree)
 111    }
 112
 113    pub async fn open_remote(
 114        rpc: Arc<rpc::Client>,
 115        id: u64,
 116        languages: Arc<LanguageRegistry>,
 117        cx: &mut AsyncAppContext,
 118    ) -> Result<ModelHandle<Self>> {
 119        let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
 120        Worktree::remote(response, rpc, languages, cx).await
 121    }
 122
 123    async fn remote(
 124        join_response: proto::JoinWorktreeResponse,
 125        rpc: Arc<rpc::Client>,
 126        languages: Arc<LanguageRegistry>,
 127        cx: &mut AsyncAppContext,
 128    ) -> Result<ModelHandle<Self>> {
 129        let worktree = join_response
 130            .worktree
 131            .ok_or_else(|| anyhow!("empty worktree"))?;
 132
 133        let remote_id = worktree.id;
 134        let replica_id = join_response.replica_id as ReplicaId;
 135        let peers = join_response.peers;
 136        let root_char_bag: CharBag = worktree
 137            .root_name
 138            .chars()
 139            .map(|c| c.to_ascii_lowercase())
 140            .collect();
 141        let root_name = worktree.root_name.clone();
 142        let (entries_by_path, entries_by_id) = cx
 143            .background()
 144            .spawn(async move {
 145                let mut entries_by_path_edits = Vec::new();
 146                let mut entries_by_id_edits = Vec::new();
 147                for entry in worktree.entries {
 148                    match Entry::try_from((&root_char_bag, entry)) {
 149                        Ok(entry) => {
 150                            entries_by_id_edits.push(Edit::Insert(PathEntry {
 151                                id: entry.id,
 152                                path: entry.path.clone(),
 153                                scan_id: 0,
 154                            }));
 155                            entries_by_path_edits.push(Edit::Insert(entry));
 156                        }
 157                        Err(err) => log::warn!("error for remote worktree entry {:?}", err),
 158                    }
 159                }
 160
 161                let mut entries_by_path = SumTree::new();
 162                let mut entries_by_id = SumTree::new();
 163                entries_by_path.edit(entries_by_path_edits, &());
 164                entries_by_id.edit(entries_by_id_edits, &());
 165                (entries_by_path, entries_by_id)
 166            })
 167            .await;
 168
 169        let worktree = cx.update(|cx| {
 170            cx.add_model(|cx: &mut ModelContext<Worktree>| {
 171                let snapshot = Snapshot {
 172                    id: cx.model_id(),
 173                    scan_id: 0,
 174                    abs_path: Path::new("").into(),
 175                    root_name,
 176                    root_char_bag,
 177                    ignores: Default::default(),
 178                    entries_by_path,
 179                    entries_by_id,
 180                    removed_entry_ids: Default::default(),
 181                    next_entry_id: Default::default(),
 182                };
 183
 184                let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
 185                let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
 186
 187                cx.background()
 188                    .spawn(async move {
 189                        while let Some(update) = updates_rx.recv().await {
 190                            let mut snapshot = snapshot_tx.borrow().clone();
 191                            if let Err(error) = snapshot.apply_update(update) {
 192                                log::error!("error applying worktree update: {}", error);
 193                            }
 194                            *snapshot_tx.borrow_mut() = snapshot;
 195                        }
 196                    })
 197                    .detach();
 198
 199                {
 200                    let mut snapshot_rx = snapshot_rx.clone();
 201                    cx.spawn_weak(|this, mut cx| async move {
 202                        while let Some(_) = snapshot_rx.recv().await {
 203                            if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
 204                                this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
 205                            } else {
 206                                break;
 207                            }
 208                        }
 209                    })
 210                    .detach();
 211                }
 212
 213                let _subscriptions = vec![
 214                    rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
 215                    rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
 216                    rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
 217                    rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
 218                    rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
 219                ];
 220
 221                Worktree::Remote(RemoteWorktree {
 222                    remote_id,
 223                    replica_id,
 224                    snapshot,
 225                    snapshot_rx,
 226                    updates_tx,
 227                    rpc: rpc.clone(),
 228                    open_buffers: Default::default(),
 229                    peers: peers
 230                        .into_iter()
 231                        .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
 232                        .collect(),
 233                    queued_operations: Default::default(),
 234                    languages,
 235                    _subscriptions,
 236                })
 237            })
 238        });
 239
 240        Ok(worktree)
 241    }
 242
 243    pub fn as_local(&self) -> Option<&LocalWorktree> {
 244        if let Worktree::Local(worktree) = self {
 245            Some(worktree)
 246        } else {
 247            None
 248        }
 249    }
 250
 251    pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
 252        if let Worktree::Local(worktree) = self {
 253            Some(worktree)
 254        } else {
 255            None
 256        }
 257    }
 258
 259    pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
 260        if let Worktree::Remote(worktree) = self {
 261            Some(worktree)
 262        } else {
 263            None
 264        }
 265    }
 266
 267    pub fn snapshot(&self) -> Snapshot {
 268        match self {
 269            Worktree::Local(worktree) => worktree.snapshot(),
 270            Worktree::Remote(worktree) => worktree.snapshot(),
 271        }
 272    }
 273
 274    pub fn replica_id(&self) -> ReplicaId {
 275        match self {
 276            Worktree::Local(_) => 0,
 277            Worktree::Remote(worktree) => worktree.replica_id,
 278        }
 279    }
 280
 281    pub fn handle_add_peer(
 282        &mut self,
 283        envelope: TypedEnvelope<proto::AddPeer>,
 284        _: Arc<rpc::Client>,
 285        cx: &mut ModelContext<Self>,
 286    ) -> Result<()> {
 287        match self {
 288            Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
 289            Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
 290        }
 291    }
 292
 293    pub fn handle_remove_peer(
 294        &mut self,
 295        envelope: TypedEnvelope<proto::RemovePeer>,
 296        _: Arc<rpc::Client>,
 297        cx: &mut ModelContext<Self>,
 298    ) -> Result<()> {
 299        match self {
 300            Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
 301            Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
 302        }
 303    }
 304
 305    pub fn handle_update(
 306        &mut self,
 307        envelope: TypedEnvelope<proto::UpdateWorktree>,
 308        _: Arc<rpc::Client>,
 309        cx: &mut ModelContext<Self>,
 310    ) -> anyhow::Result<()> {
 311        self.as_remote_mut()
 312            .unwrap()
 313            .update_from_remote(envelope, cx)
 314    }
 315
 316    pub fn handle_open_buffer(
 317        &mut self,
 318        envelope: TypedEnvelope<proto::OpenBuffer>,
 319        rpc: Arc<rpc::Client>,
 320        cx: &mut ModelContext<Self>,
 321    ) -> anyhow::Result<()> {
 322        let receipt = envelope.receipt();
 323
 324        let response = self
 325            .as_local_mut()
 326            .unwrap()
 327            .open_remote_buffer(envelope, cx);
 328
 329        cx.background()
 330            .spawn(
 331                async move {
 332                    rpc.respond(receipt, response.await?).await?;
 333                    Ok(())
 334                }
 335                .log_err(),
 336            )
 337            .detach();
 338
 339        Ok(())
 340    }
 341
 342    pub fn handle_close_buffer(
 343        &mut self,
 344        envelope: TypedEnvelope<proto::CloseBuffer>,
 345        _: Arc<rpc::Client>,
 346        cx: &mut ModelContext<Self>,
 347    ) -> anyhow::Result<()> {
 348        self.as_local_mut()
 349            .unwrap()
 350            .close_remote_buffer(envelope, cx)
 351    }
 352
 353    pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
 354        match self {
 355            Worktree::Local(worktree) => &worktree.peers,
 356            Worktree::Remote(worktree) => &worktree.peers,
 357        }
 358    }
 359
 360    pub fn open_buffer(
 361        &mut self,
 362        path: impl AsRef<Path>,
 363        cx: &mut ModelContext<Self>,
 364    ) -> Task<Result<ModelHandle<Buffer>>> {
 365        match self {
 366            Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
 367            Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
 368        }
 369    }
 370
 371    #[cfg(feature = "test-support")]
 372    pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
 373        let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
 374            Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
 375            Worktree::Remote(worktree) => {
 376                Box::new(worktree.open_buffers.values().filter_map(|buf| {
 377                    if let RemoteBuffer::Loaded(buf) = buf {
 378                        Some(buf)
 379                    } else {
 380                        None
 381                    }
 382                }))
 383            }
 384        };
 385
 386        let path = path.as_ref();
 387        open_buffers
 388            .find(|buffer| {
 389                if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
 390                    file.path.as_ref() == path
 391                } else {
 392                    false
 393                }
 394            })
 395            .is_some()
 396    }
 397
 398    pub fn handle_update_buffer(
 399        &mut self,
 400        envelope: TypedEnvelope<proto::UpdateBuffer>,
 401        _: Arc<rpc::Client>,
 402        cx: &mut ModelContext<Self>,
 403    ) -> Result<()> {
 404        let payload = envelope.payload.clone();
 405        let buffer_id = payload.buffer_id as usize;
 406        let ops = payload
 407            .operations
 408            .into_iter()
 409            .map(|op| op.try_into())
 410            .collect::<anyhow::Result<Vec<_>>>()?;
 411
 412        match self {
 413            Worktree::Local(worktree) => {
 414                let buffer = worktree
 415                    .open_buffers
 416                    .get(&buffer_id)
 417                    .and_then(|buf| buf.upgrade(cx))
 418                    .ok_or_else(|| {
 419                        anyhow!("invalid buffer {} in update buffer message", buffer_id)
 420                    })?;
 421                buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
 422            }
 423            Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
 424                Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
 425                Some(RemoteBuffer::Loaded(buffer)) => {
 426                    if let Some(buffer) = buffer.upgrade(cx) {
 427                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
 428                    } else {
 429                        worktree
 430                            .open_buffers
 431                            .insert(buffer_id, RemoteBuffer::Operations(ops));
 432                    }
 433                }
 434                None => {
 435                    worktree
 436                        .open_buffers
 437                        .insert(buffer_id, RemoteBuffer::Operations(ops));
 438                }
 439            },
 440        }
 441
 442        Ok(())
 443    }
 444
 445    pub fn handle_save_buffer(
 446        &mut self,
 447        envelope: TypedEnvelope<proto::SaveBuffer>,
 448        rpc: Arc<rpc::Client>,
 449        cx: &mut ModelContext<Self>,
 450    ) -> Result<()> {
 451        let sender_id = envelope.original_sender_id()?;
 452        let buffer = self
 453            .as_local()
 454            .unwrap()
 455            .shared_buffers
 456            .get(&sender_id)
 457            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
 458            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
 459
 460        let receipt = envelope.receipt();
 461        let worktree_id = envelope.payload.worktree_id;
 462        let buffer_id = envelope.payload.buffer_id;
 463        let save = cx.spawn(|_, mut cx| async move {
 464            buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
 465        });
 466
 467        cx.background()
 468            .spawn(
 469                async move {
 470                    let (version, mtime) = save.await?;
 471
 472                    rpc.respond(
 473                        receipt,
 474                        proto::BufferSaved {
 475                            worktree_id,
 476                            buffer_id,
 477                            version: (&version).into(),
 478                            mtime: Some(mtime.into()),
 479                        },
 480                    )
 481                    .await?;
 482
 483                    Ok(())
 484                }
 485                .log_err(),
 486            )
 487            .detach();
 488
 489        Ok(())
 490    }
 491
 492    pub fn handle_buffer_saved(
 493        &mut self,
 494        envelope: TypedEnvelope<proto::BufferSaved>,
 495        _: Arc<rpc::Client>,
 496        cx: &mut ModelContext<Self>,
 497    ) -> Result<()> {
 498        let payload = envelope.payload.clone();
 499        let worktree = self.as_remote_mut().unwrap();
 500        if let Some(buffer) = worktree
 501            .open_buffers
 502            .get(&(payload.buffer_id as usize))
 503            .and_then(|buf| buf.upgrade(cx))
 504        {
 505            buffer.update(cx, |buffer, cx| {
 506                let version = payload.version.try_into()?;
 507                let mtime = payload
 508                    .mtime
 509                    .ok_or_else(|| anyhow!("missing mtime"))?
 510                    .into();
 511                buffer.did_save(version, mtime, cx);
 512                Result::<_, anyhow::Error>::Ok(())
 513            })?;
 514        }
 515        Ok(())
 516    }
 517
 518    fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
 519        match self {
 520            Self::Local(worktree) => {
 521                let is_fake_fs = worktree.fs.is_fake();
 522                worktree.snapshot = worktree.background_snapshot.lock().clone();
 523                if worktree.is_scanning() {
 524                    if worktree.poll_task.is_none() {
 525                        worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
 526                            if is_fake_fs {
 527                                smol::future::yield_now().await;
 528                            } else {
 529                                smol::Timer::after(Duration::from_millis(100)).await;
 530                            }
 531                            this.update(&mut cx, |this, cx| {
 532                                this.as_local_mut().unwrap().poll_task = None;
 533                                this.poll_snapshot(cx);
 534                            })
 535                        }));
 536                    }
 537                } else {
 538                    worktree.poll_task.take();
 539                    self.update_open_buffers(cx);
 540                }
 541            }
 542            Self::Remote(worktree) => {
 543                worktree.snapshot = worktree.snapshot_rx.borrow().clone();
 544                self.update_open_buffers(cx);
 545            }
 546        };
 547
 548        cx.notify();
 549    }
 550
 551    fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
 552        let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
 553            Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
 554            Self::Remote(worktree) => {
 555                Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
 556                    if let RemoteBuffer::Loaded(buf) = buf {
 557                        Some((id, buf))
 558                    } else {
 559                        None
 560                    }
 561                }))
 562            }
 563        };
 564
 565        let mut buffers_to_delete = Vec::new();
 566        for (buffer_id, buffer) in open_buffers {
 567            if let Some(buffer) = buffer.upgrade(cx) {
 568                buffer.update(cx, |buffer, cx| {
 569                    let buffer_is_clean = !buffer.is_dirty();
 570
 571                    if let Some(file) = buffer.file_mut() {
 572                        let mut file_changed = false;
 573
 574                        if let Some(entry) = file
 575                            .entry_id
 576                            .and_then(|entry_id| self.entry_for_id(entry_id))
 577                        {
 578                            if entry.path != file.path {
 579                                file.path = entry.path.clone();
 580                                file_changed = true;
 581                            }
 582
 583                            if entry.mtime != file.mtime {
 584                                file.mtime = entry.mtime;
 585                                file_changed = true;
 586                                if let Some(worktree) = self.as_local() {
 587                                    if buffer_is_clean {
 588                                        let abs_path = worktree.absolutize(&file.path);
 589                                        refresh_buffer(abs_path, &worktree.fs, cx);
 590                                    }
 591                                }
 592                            }
 593                        } else if let Some(entry) = self.entry_for_path(&file.path) {
 594                            file.entry_id = Some(entry.id);
 595                            file.mtime = entry.mtime;
 596                            if let Some(worktree) = self.as_local() {
 597                                if buffer_is_clean {
 598                                    let abs_path = worktree.absolutize(&file.path);
 599                                    refresh_buffer(abs_path, &worktree.fs, cx);
 600                                }
 601                            }
 602                            file_changed = true;
 603                        } else if !file.is_deleted() {
 604                            if buffer_is_clean {
 605                                cx.emit(editor::buffer::Event::Dirtied);
 606                            }
 607                            file.entry_id = None;
 608                            file_changed = true;
 609                        }
 610
 611                        if file_changed {
 612                            cx.emit(editor::buffer::Event::FileHandleChanged);
 613                        }
 614                    }
 615                });
 616            } else {
 617                buffers_to_delete.push(*buffer_id);
 618            }
 619        }
 620
 621        for buffer_id in buffers_to_delete {
 622            match self {
 623                Self::Local(worktree) => {
 624                    worktree.open_buffers.remove(&buffer_id);
 625                }
 626                Self::Remote(worktree) => {
 627                    worktree.open_buffers.remove(&buffer_id);
 628                }
 629            }
 630        }
 631    }
 632}
 633
 634impl Deref for Worktree {
 635    type Target = Snapshot;
 636
 637    fn deref(&self) -> &Self::Target {
 638        match self {
 639            Worktree::Local(worktree) => &worktree.snapshot,
 640            Worktree::Remote(worktree) => &worktree.snapshot,
 641        }
 642    }
 643}
 644
 645pub struct LocalWorktree {
 646    snapshot: Snapshot,
 647    config: WorktreeConfig,
 648    background_snapshot: Arc<Mutex<Snapshot>>,
 649    last_scan_state_rx: watch::Receiver<ScanState>,
 650    _background_scanner_task: Option<Task<()>>,
 651    _maintain_remote_id_task: Task<Option<()>>,
 652    poll_task: Option<Task<()>>,
 653    remote_id: watch::Receiver<Option<u64>>,
 654    share: Option<ShareState>,
 655    open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
 656    shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
 657    peers: HashMap<PeerId, ReplicaId>,
 658    languages: Arc<LanguageRegistry>,
 659    queued_operations: Vec<(u64, Operation)>,
 660    rpc: Arc<rpc::Client>,
 661    fs: Arc<dyn Fs>,
 662}
 663
 664#[derive(Default, Deserialize)]
 665struct WorktreeConfig {
 666    collaborators: Vec<String>,
 667}
 668
 669impl LocalWorktree {
 670    async fn new(
 671        rpc: Arc<rpc::Client>,
 672        path: impl Into<Arc<Path>>,
 673        fs: Arc<dyn Fs>,
 674        languages: Arc<LanguageRegistry>,
 675        cx: &mut AsyncAppContext,
 676    ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
 677        let abs_path = path.into();
 678        let path: Arc<Path> = Arc::from(Path::new(""));
 679        let next_entry_id = AtomicUsize::new(0);
 680
 681        // After determining whether the root entry is a file or a directory, populate the
 682        // snapshot's "root name", which will be used for the purpose of fuzzy matching.
 683        let root_name = abs_path
 684            .file_name()
 685            .map_or(String::new(), |f| f.to_string_lossy().to_string());
 686        let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
 687        let metadata = fs.metadata(&abs_path).await?;
 688
 689        let mut config = WorktreeConfig::default();
 690        if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
 691            if let Ok(parsed) = toml::from_str(&zed_toml) {
 692                config = parsed;
 693            }
 694        }
 695
 696        let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
 697        let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
 698        let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
 699            let mut snapshot = Snapshot {
 700                id: cx.model_id(),
 701                scan_id: 0,
 702                abs_path,
 703                root_name: root_name.clone(),
 704                root_char_bag,
 705                ignores: Default::default(),
 706                entries_by_path: Default::default(),
 707                entries_by_id: Default::default(),
 708                removed_entry_ids: Default::default(),
 709                next_entry_id: Arc::new(next_entry_id),
 710            };
 711            if let Some(metadata) = metadata {
 712                snapshot.insert_entry(Entry::new(
 713                    path.into(),
 714                    &metadata,
 715                    &snapshot.next_entry_id,
 716                    snapshot.root_char_bag,
 717                ));
 718            }
 719
 720            let (mut remote_id_tx, remote_id_rx) = watch::channel();
 721            let _maintain_remote_id_task = cx.spawn_weak({
 722                let rpc = rpc.clone();
 723                move |this, cx| {
 724                    async move {
 725                        let mut status = rpc.status();
 726                        while let Some(status) = status.recv().await {
 727                            if let Some(this) = this.upgrade(&cx) {
 728                                let remote_id = if let Status::Connected { .. } = status {
 729                                    let collaborator_logins = this.read_with(&cx, |this, _| {
 730                                        this.as_local().unwrap().config.collaborators.clone()
 731                                    });
 732                                    let response = rpc
 733                                        .request(proto::OpenWorktree {
 734                                            root_name: root_name.clone(),
 735                                            collaborator_logins,
 736                                        })
 737                                        .await?;
 738
 739                                    Some(response.worktree_id)
 740                                } else {
 741                                    None
 742                                };
 743                                if remote_id_tx.send(remote_id).await.is_err() {
 744                                    break;
 745                                }
 746                            }
 747                        }
 748                        Ok(())
 749                    }
 750                    .log_err()
 751                }
 752            });
 753
 754            let tree = Self {
 755                snapshot: snapshot.clone(),
 756                config,
 757                remote_id: remote_id_rx,
 758                background_snapshot: Arc::new(Mutex::new(snapshot)),
 759                last_scan_state_rx,
 760                _background_scanner_task: None,
 761                _maintain_remote_id_task,
 762                share: None,
 763                poll_task: None,
 764                open_buffers: Default::default(),
 765                shared_buffers: Default::default(),
 766                queued_operations: Default::default(),
 767                peers: Default::default(),
 768                languages,
 769                rpc,
 770                fs,
 771            };
 772
 773            cx.spawn_weak(|this, mut cx| async move {
 774                while let Ok(scan_state) = scan_states_rx.recv().await {
 775                    if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
 776                        let to_send = handle.update(&mut cx, |this, cx| {
 777                            last_scan_state_tx.blocking_send(scan_state).ok();
 778                            this.poll_snapshot(cx);
 779                            let tree = this.as_local_mut().unwrap();
 780                            if !tree.is_scanning() {
 781                                if let Some(share) = tree.share.as_ref() {
 782                                    return Some((tree.snapshot(), share.snapshots_tx.clone()));
 783                                }
 784                            }
 785                            None
 786                        });
 787
 788                        if let Some((snapshot, snapshots_to_send_tx)) = to_send {
 789                            if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
 790                                log::error!("error submitting snapshot to send {}", err);
 791                            }
 792                        }
 793                    } else {
 794                        break;
 795                    }
 796                }
 797            })
 798            .detach();
 799
 800            Worktree::Local(tree)
 801        });
 802
 803        Ok((tree, scan_states_tx))
 804    }
 805
 806    pub fn open_buffer(
 807        &mut self,
 808        path: &Path,
 809        cx: &mut ModelContext<Worktree>,
 810    ) -> Task<Result<ModelHandle<Buffer>>> {
 811        let handle = cx.handle();
 812
 813        // If there is already a buffer for the given path, then return it.
 814        let mut existing_buffer = None;
 815        self.open_buffers.retain(|_buffer_id, buffer| {
 816            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
 817                if let Some(file) = buffer.read(cx.as_ref()).file() {
 818                    if file.worktree_id() == handle.id() && file.path.as_ref() == path {
 819                        existing_buffer = Some(buffer);
 820                    }
 821                }
 822                true
 823            } else {
 824                false
 825            }
 826        });
 827
 828        let languages = self.languages.clone();
 829        let path = Arc::from(path);
 830        cx.spawn(|this, mut cx| async move {
 831            if let Some(existing_buffer) = existing_buffer {
 832                Ok(existing_buffer)
 833            } else {
 834                let (file, contents) = this
 835                    .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
 836                    .await?;
 837                let language = languages.select_language(&path).cloned();
 838                let buffer = cx.add_model(|cx| {
 839                    Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
 840                });
 841                this.update(&mut cx, |this, _| {
 842                    let this = this
 843                        .as_local_mut()
 844                        .ok_or_else(|| anyhow!("must be a local worktree"))?;
 845                    this.open_buffers.insert(buffer.id(), buffer.downgrade());
 846                    Ok(buffer)
 847                })
 848            }
 849        })
 850    }
 851
 852    pub fn open_remote_buffer(
 853        &mut self,
 854        envelope: TypedEnvelope<proto::OpenBuffer>,
 855        cx: &mut ModelContext<Worktree>,
 856    ) -> Task<Result<proto::OpenBufferResponse>> {
 857        let peer_id = envelope.original_sender_id();
 858        let path = Path::new(&envelope.payload.path);
 859
 860        let buffer = self.open_buffer(path, cx);
 861
 862        cx.spawn(|this, mut cx| async move {
 863            let buffer = buffer.await?;
 864            this.update(&mut cx, |this, cx| {
 865                this.as_local_mut()
 866                    .unwrap()
 867                    .shared_buffers
 868                    .entry(peer_id?)
 869                    .or_default()
 870                    .insert(buffer.id() as u64, buffer.clone());
 871
 872                Ok(proto::OpenBufferResponse {
 873                    buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
 874                })
 875            })
 876        })
 877    }
 878
 879    pub fn close_remote_buffer(
 880        &mut self,
 881        envelope: TypedEnvelope<proto::CloseBuffer>,
 882        cx: &mut ModelContext<Worktree>,
 883    ) -> Result<()> {
 884        if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
 885            shared_buffers.remove(&envelope.payload.buffer_id);
 886            cx.notify();
 887        }
 888
 889        Ok(())
 890    }
 891
 892    pub fn add_peer(
 893        &mut self,
 894        envelope: TypedEnvelope<proto::AddPeer>,
 895        cx: &mut ModelContext<Worktree>,
 896    ) -> Result<()> {
 897        let peer = envelope
 898            .payload
 899            .peer
 900            .as_ref()
 901            .ok_or_else(|| anyhow!("empty peer"))?;
 902        self.peers
 903            .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
 904        cx.notify();
 905
 906        Ok(())
 907    }
 908
 909    pub fn remove_peer(
 910        &mut self,
 911        envelope: TypedEnvelope<proto::RemovePeer>,
 912        cx: &mut ModelContext<Worktree>,
 913    ) -> Result<()> {
 914        let peer_id = PeerId(envelope.payload.peer_id);
 915        let replica_id = self
 916            .peers
 917            .remove(&peer_id)
 918            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
 919        self.shared_buffers.remove(&peer_id);
 920        for (_, buffer) in &self.open_buffers {
 921            if let Some(buffer) = buffer.upgrade(cx) {
 922                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
 923            }
 924        }
 925        cx.notify();
 926
 927        Ok(())
 928    }
 929
 930    pub fn scan_complete(&self) -> impl Future<Output = ()> {
 931        let mut scan_state_rx = self.last_scan_state_rx.clone();
 932        async move {
 933            let mut scan_state = Some(scan_state_rx.borrow().clone());
 934            while let Some(ScanState::Scanning) = scan_state {
 935                scan_state = scan_state_rx.recv().await;
 936            }
 937        }
 938    }
 939
 940    pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
 941        let mut remote_id = self.remote_id.clone();
 942        async move {
 943            while let Some(remote_id) = remote_id.recv().await {
 944                if remote_id.is_some() {
 945                    return remote_id;
 946                }
 947            }
 948            None
 949        }
 950    }
 951
 952    fn is_scanning(&self) -> bool {
 953        if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
 954            true
 955        } else {
 956            false
 957        }
 958    }
 959
 960    pub fn snapshot(&self) -> Snapshot {
 961        self.snapshot.clone()
 962    }
 963
 964    pub fn abs_path(&self) -> &Path {
 965        self.snapshot.abs_path.as_ref()
 966    }
 967
 968    pub fn contains_abs_path(&self, path: &Path) -> bool {
 969        path.starts_with(&self.snapshot.abs_path)
 970    }
 971
 972    fn absolutize(&self, path: &Path) -> PathBuf {
 973        if path.file_name().is_some() {
 974            self.snapshot.abs_path.join(path)
 975        } else {
 976            self.snapshot.abs_path.to_path_buf()
 977        }
 978    }
 979
 980    fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
 981        let handle = cx.handle();
 982        let path = Arc::from(path);
 983        let abs_path = self.absolutize(&path);
 984        let background_snapshot = self.background_snapshot.clone();
 985        let fs = self.fs.clone();
 986        cx.spawn(|this, mut cx| async move {
 987            let text = fs.load(&abs_path).await?;
 988            // Eagerly populate the snapshot with an updated entry for the loaded file
 989            let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
 990            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
 991            Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
 992        })
 993    }
 994
 995    pub fn save_buffer_as(
 996        &self,
 997        buffer: ModelHandle<Buffer>,
 998        path: impl Into<Arc<Path>>,
 999        text: Rope,
1000        cx: &mut ModelContext<Worktree>,
1001    ) -> Task<Result<File>> {
1002        let save = self.save(path, text, cx);
1003        cx.spawn(|this, mut cx| async move {
1004            let entry = save.await?;
1005            this.update(&mut cx, |this, cx| {
1006                this.as_local_mut()
1007                    .unwrap()
1008                    .open_buffers
1009                    .insert(buffer.id(), buffer.downgrade());
1010                Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
1011            })
1012        })
1013    }
1014
1015    fn save(
1016        &self,
1017        path: impl Into<Arc<Path>>,
1018        text: Rope,
1019        cx: &mut ModelContext<Worktree>,
1020    ) -> Task<Result<Entry>> {
1021        let path = path.into();
1022        let abs_path = self.absolutize(&path);
1023        let background_snapshot = self.background_snapshot.clone();
1024        let fs = self.fs.clone();
1025        let save = cx.background().spawn(async move {
1026            fs.save(&abs_path, &text).await?;
1027            refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1028        });
1029
1030        cx.spawn(|this, mut cx| async move {
1031            let entry = save.await?;
1032            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1033            Ok(entry)
1034        })
1035    }
1036
1037    pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1038        let snapshot = self.snapshot();
1039        let share_request = self.share_request(cx);
1040        let rpc = self.rpc.clone();
1041        cx.spawn(|this, mut cx| async move {
1042            let share_request = if let Some(request) = share_request.await {
1043                request
1044            } else {
1045                return Err(anyhow!("failed to open worktree on the server"));
1046            };
1047
1048            let remote_id = share_request.worktree.as_ref().unwrap().id;
1049            let share_response = rpc.request(share_request).await?;
1050
1051            log::info!("sharing worktree {:?}", share_response);
1052            let (snapshots_to_send_tx, snapshots_to_send_rx) =
1053                smol::channel::unbounded::<Snapshot>();
1054
1055            cx.background()
1056                .spawn({
1057                    let rpc = rpc.clone();
1058                    async move {
1059                        let mut prev_snapshot = snapshot;
1060                        while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1061                            let message = snapshot.build_update(&prev_snapshot, remote_id);
1062                            match rpc.send(message).await {
1063                                Ok(()) => prev_snapshot = snapshot,
1064                                Err(err) => log::error!("error sending snapshot diff {}", err),
1065                            }
1066                        }
1067                    }
1068                })
1069                .detach();
1070
1071            this.update(&mut cx, |worktree, cx| {
1072                let _subscriptions = vec![
1073                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
1074                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
1075                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
1076                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
1077                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
1078                    rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
1079                ];
1080
1081                let worktree = worktree.as_local_mut().unwrap();
1082                worktree.share = Some(ShareState {
1083                    snapshots_tx: snapshots_to_send_tx,
1084                    _subscriptions,
1085                });
1086            });
1087
1088            Ok(remote_id)
1089        })
1090    }
1091
1092    fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1093        let remote_id = self.next_remote_id();
1094        let snapshot = self.snapshot();
1095        let root_name = self.root_name.clone();
1096        cx.background().spawn(async move {
1097            remote_id.await.map(|id| {
1098                let entries = snapshot
1099                    .entries_by_path
1100                    .cursor::<(), ()>()
1101                    .map(Into::into)
1102                    .collect();
1103                proto::ShareWorktree {
1104                    worktree: Some(proto::Worktree {
1105                        id,
1106                        root_name,
1107                        entries,
1108                    }),
1109                }
1110            })
1111        })
1112    }
1113}
1114
1115pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1116    let fs = fs.clone();
1117    cx.spawn(|buffer, mut cx| async move {
1118        let new_text = fs.load(&abs_path).await;
1119        match new_text {
1120            Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1121            Ok(new_text) => {
1122                buffer
1123                    .update(&mut cx, |buffer, cx| {
1124                        buffer.set_text_from_disk(new_text.into(), cx)
1125                    })
1126                    .await;
1127            }
1128        }
1129    })
1130    .detach()
1131}
1132
1133impl Deref for LocalWorktree {
1134    type Target = Snapshot;
1135
1136    fn deref(&self) -> &Self::Target {
1137        &self.snapshot
1138    }
1139}
1140
1141impl fmt::Debug for LocalWorktree {
1142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1143        self.snapshot.fmt(f)
1144    }
1145}
1146
1147struct ShareState {
1148    snapshots_tx: Sender<Snapshot>,
1149    _subscriptions: Vec<rpc::Subscription>,
1150}
1151
1152pub struct RemoteWorktree {
1153    remote_id: u64,
1154    snapshot: Snapshot,
1155    snapshot_rx: watch::Receiver<Snapshot>,
1156    rpc: Arc<rpc::Client>,
1157    updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1158    replica_id: ReplicaId,
1159    open_buffers: HashMap<usize, RemoteBuffer>,
1160    peers: HashMap<PeerId, ReplicaId>,
1161    languages: Arc<LanguageRegistry>,
1162    queued_operations: Vec<(u64, Operation)>,
1163    _subscriptions: Vec<rpc::Subscription>,
1164}
1165
1166impl RemoteWorktree {
1167    pub fn open_buffer(
1168        &mut self,
1169        path: &Path,
1170        cx: &mut ModelContext<Worktree>,
1171    ) -> Task<Result<ModelHandle<Buffer>>> {
1172        let handle = cx.handle();
1173        let mut existing_buffer = None;
1174        self.open_buffers.retain(|_buffer_id, buffer| {
1175            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1176                if let Some(file) = buffer.read(cx.as_ref()).file() {
1177                    if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1178                        existing_buffer = Some(buffer);
1179                    }
1180                }
1181                true
1182            } else {
1183                false
1184            }
1185        });
1186
1187        let rpc = self.rpc.clone();
1188        let languages = self.languages.clone();
1189        let replica_id = self.replica_id;
1190        let remote_worktree_id = self.remote_id;
1191        let path = path.to_string_lossy().to_string();
1192        cx.spawn(|this, mut cx| async move {
1193            if let Some(existing_buffer) = existing_buffer {
1194                Ok(existing_buffer)
1195            } else {
1196                let entry = this
1197                    .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1198                    .ok_or_else(|| anyhow!("file does not exist"))?;
1199                let file = File::new(entry.id, handle, entry.path, entry.mtime);
1200                let language = languages.select_language(&path).cloned();
1201                let response = rpc
1202                    .request(proto::OpenBuffer {
1203                        worktree_id: remote_worktree_id as u64,
1204                        path,
1205                    })
1206                    .await?;
1207                let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1208                let buffer_id = remote_buffer.id as usize;
1209                let buffer = cx.add_model(|cx| {
1210                    Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1211                });
1212                this.update(&mut cx, |this, cx| {
1213                    let this = this.as_remote_mut().unwrap();
1214                    if let Some(RemoteBuffer::Operations(pending_ops)) = this
1215                        .open_buffers
1216                        .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1217                    {
1218                        buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1219                    }
1220                    Result::<_, anyhow::Error>::Ok(())
1221                })?;
1222                Ok(buffer)
1223            }
1224        })
1225    }
1226
1227    fn snapshot(&self) -> Snapshot {
1228        self.snapshot.clone()
1229    }
1230
1231    fn update_from_remote(
1232        &mut self,
1233        envelope: TypedEnvelope<proto::UpdateWorktree>,
1234        cx: &mut ModelContext<Worktree>,
1235    ) -> Result<()> {
1236        let mut tx = self.updates_tx.clone();
1237        let payload = envelope.payload.clone();
1238        cx.background()
1239            .spawn(async move {
1240                tx.send(payload).await.expect("receiver runs to completion");
1241            })
1242            .detach();
1243
1244        Ok(())
1245    }
1246
1247    pub fn add_peer(
1248        &mut self,
1249        envelope: TypedEnvelope<proto::AddPeer>,
1250        cx: &mut ModelContext<Worktree>,
1251    ) -> Result<()> {
1252        let peer = envelope
1253            .payload
1254            .peer
1255            .as_ref()
1256            .ok_or_else(|| anyhow!("empty peer"))?;
1257        self.peers
1258            .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1259        cx.notify();
1260        Ok(())
1261    }
1262
1263    pub fn remove_peer(
1264        &mut self,
1265        envelope: TypedEnvelope<proto::RemovePeer>,
1266        cx: &mut ModelContext<Worktree>,
1267    ) -> Result<()> {
1268        let peer_id = PeerId(envelope.payload.peer_id);
1269        let replica_id = self
1270            .peers
1271            .remove(&peer_id)
1272            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1273        for (_, buffer) in &self.open_buffers {
1274            if let Some(buffer) = buffer.upgrade(cx) {
1275                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1276            }
1277        }
1278        cx.notify();
1279        Ok(())
1280    }
1281}
1282
1283enum RemoteBuffer {
1284    Operations(Vec<Operation>),
1285    Loaded(WeakModelHandle<Buffer>),
1286}
1287
1288impl RemoteBuffer {
1289    fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1290        match self {
1291            Self::Operations(_) => None,
1292            Self::Loaded(buffer) => buffer.upgrade(cx),
1293        }
1294    }
1295}
1296
1297#[derive(Clone)]
1298pub struct Snapshot {
1299    id: usize,
1300    scan_id: usize,
1301    abs_path: Arc<Path>,
1302    root_name: String,
1303    root_char_bag: CharBag,
1304    ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1305    entries_by_path: SumTree<Entry>,
1306    entries_by_id: SumTree<PathEntry>,
1307    removed_entry_ids: HashMap<u64, usize>,
1308    next_entry_id: Arc<AtomicUsize>,
1309}
1310
1311impl Snapshot {
1312    pub fn id(&self) -> usize {
1313        self.id
1314    }
1315
1316    pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1317        let mut updated_entries = Vec::new();
1318        let mut removed_entries = Vec::new();
1319        let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1320        let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1321        loop {
1322            match (self_entries.peek(), other_entries.peek()) {
1323                (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1324                    Ordering::Less => {
1325                        let entry = self.entry_for_id(self_entry.id).unwrap().into();
1326                        updated_entries.push(entry);
1327                        self_entries.next();
1328                    }
1329                    Ordering::Equal => {
1330                        if self_entry.scan_id != other_entry.scan_id {
1331                            let entry = self.entry_for_id(self_entry.id).unwrap().into();
1332                            updated_entries.push(entry);
1333                        }
1334
1335                        self_entries.next();
1336                        other_entries.next();
1337                    }
1338                    Ordering::Greater => {
1339                        removed_entries.push(other_entry.id as u64);
1340                        other_entries.next();
1341                    }
1342                },
1343                (Some(self_entry), None) => {
1344                    let entry = self.entry_for_id(self_entry.id).unwrap().into();
1345                    updated_entries.push(entry);
1346                    self_entries.next();
1347                }
1348                (None, Some(other_entry)) => {
1349                    removed_entries.push(other_entry.id as u64);
1350                    other_entries.next();
1351                }
1352                (None, None) => break,
1353            }
1354        }
1355
1356        proto::UpdateWorktree {
1357            updated_entries,
1358            removed_entries,
1359            worktree_id,
1360        }
1361    }
1362
1363    fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1364        self.scan_id += 1;
1365        let scan_id = self.scan_id;
1366
1367        let mut entries_by_path_edits = Vec::new();
1368        let mut entries_by_id_edits = Vec::new();
1369        for entry_id in update.removed_entries {
1370            let entry_id = entry_id as usize;
1371            let entry = self
1372                .entry_for_id(entry_id)
1373                .ok_or_else(|| anyhow!("unknown entry"))?;
1374            entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1375            entries_by_id_edits.push(Edit::Remove(entry.id));
1376        }
1377
1378        for entry in update.updated_entries {
1379            let entry = Entry::try_from((&self.root_char_bag, entry))?;
1380            if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1381                entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1382            }
1383            entries_by_id_edits.push(Edit::Insert(PathEntry {
1384                id: entry.id,
1385                path: entry.path.clone(),
1386                scan_id,
1387            }));
1388            entries_by_path_edits.push(Edit::Insert(entry));
1389        }
1390
1391        self.entries_by_path.edit(entries_by_path_edits, &());
1392        self.entries_by_id.edit(entries_by_id_edits, &());
1393
1394        Ok(())
1395    }
1396
1397    pub fn file_count(&self) -> usize {
1398        self.entries_by_path.summary().file_count
1399    }
1400
1401    pub fn visible_file_count(&self) -> usize {
1402        self.entries_by_path.summary().visible_file_count
1403    }
1404
1405    pub fn files(&self, start: usize) -> FileIter {
1406        FileIter::all(self, start)
1407    }
1408
1409    pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1410        let empty_path = Path::new("");
1411        self.entries_by_path
1412            .cursor::<(), ()>()
1413            .filter(move |entry| entry.path.as_ref() != empty_path)
1414            .map(|entry| &entry.path)
1415    }
1416
1417    pub fn visible_files(&self, start: usize) -> FileIter {
1418        FileIter::visible(self, start)
1419    }
1420
1421    fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1422        ChildEntriesIter::new(path, self)
1423    }
1424
1425    pub fn root_entry(&self) -> Option<&Entry> {
1426        self.entry_for_path("")
1427    }
1428
1429    pub fn root_name(&self) -> &str {
1430        &self.root_name
1431    }
1432
1433    fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1434        let mut cursor = self.entries_by_path.cursor::<_, ()>();
1435        if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1436            cursor.item()
1437        } else {
1438            None
1439        }
1440    }
1441
1442    fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1443        let entry = self.entries_by_id.get(&id, &())?;
1444        self.entry_for_path(&entry.path)
1445    }
1446
1447    pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1448        self.entry_for_path(path.as_ref()).map(|e| e.inode)
1449    }
1450
1451    fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1452        if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1453            let (ignore, err) = Gitignore::new(self.abs_path.join(&entry.path));
1454            if let Some(err) = err {
1455                log::error!("error in ignore file {:?} - {:?}", &entry.path, err);
1456            }
1457
1458            let ignore_dir_path = entry.path.parent().unwrap();
1459            self.ignores
1460                .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1461        }
1462
1463        self.reuse_entry_id(&mut entry);
1464        self.entries_by_path.insert_or_replace(entry.clone(), &());
1465        self.entries_by_id.insert_or_replace(
1466            PathEntry {
1467                id: entry.id,
1468                path: entry.path.clone(),
1469                scan_id: self.scan_id,
1470            },
1471            &(),
1472        );
1473        entry
1474    }
1475
1476    fn populate_dir(
1477        &mut self,
1478        parent_path: Arc<Path>,
1479        entries: impl IntoIterator<Item = Entry>,
1480        ignore: Option<Arc<Gitignore>>,
1481    ) {
1482        let mut parent_entry = self
1483            .entries_by_path
1484            .get(&PathKey(parent_path.clone()), &())
1485            .unwrap()
1486            .clone();
1487        if let Some(ignore) = ignore {
1488            self.ignores.insert(parent_path, (ignore, self.scan_id));
1489        }
1490        if matches!(parent_entry.kind, EntryKind::PendingDir) {
1491            parent_entry.kind = EntryKind::Dir;
1492        } else {
1493            unreachable!();
1494        }
1495
1496        let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1497        let mut entries_by_id_edits = Vec::new();
1498
1499        for mut entry in entries {
1500            self.reuse_entry_id(&mut entry);
1501            entries_by_id_edits.push(Edit::Insert(PathEntry {
1502                id: entry.id,
1503                path: entry.path.clone(),
1504                scan_id: self.scan_id,
1505            }));
1506            entries_by_path_edits.push(Edit::Insert(entry));
1507        }
1508
1509        self.entries_by_path.edit(entries_by_path_edits, &());
1510        self.entries_by_id.edit(entries_by_id_edits, &());
1511    }
1512
1513    fn reuse_entry_id(&mut self, entry: &mut Entry) {
1514        if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1515            entry.id = removed_entry_id;
1516        } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1517            entry.id = existing_entry.id;
1518        }
1519    }
1520
1521    fn remove_path(&mut self, path: &Path) {
1522        let mut new_entries;
1523        let removed_entry_ids;
1524        {
1525            let mut cursor = self.entries_by_path.cursor::<_, ()>();
1526            new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1527            removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1528            new_entries.push_tree(cursor.suffix(&()), &());
1529        }
1530        self.entries_by_path = new_entries;
1531
1532        let mut entries_by_id_edits = Vec::new();
1533        for entry in removed_entry_ids.cursor::<(), ()>() {
1534            let removed_entry_id = self
1535                .removed_entry_ids
1536                .entry(entry.inode)
1537                .or_insert(entry.id);
1538            *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1539            entries_by_id_edits.push(Edit::Remove(entry.id));
1540        }
1541        self.entries_by_id.edit(entries_by_id_edits, &());
1542
1543        if path.file_name() == Some(&GITIGNORE) {
1544            if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1545                *scan_id = self.scan_id;
1546            }
1547        }
1548    }
1549
1550    fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1551        let mut new_ignores = Vec::new();
1552        for ancestor in path.ancestors().skip(1) {
1553            if let Some((ignore, _)) = self.ignores.get(ancestor) {
1554                new_ignores.push((ancestor, Some(ignore.clone())));
1555            } else {
1556                new_ignores.push((ancestor, None));
1557            }
1558        }
1559
1560        let mut ignore_stack = IgnoreStack::none();
1561        for (parent_path, ignore) in new_ignores.into_iter().rev() {
1562            if ignore_stack.is_path_ignored(&parent_path, true) {
1563                ignore_stack = IgnoreStack::all();
1564                break;
1565            } else if let Some(ignore) = ignore {
1566                ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1567            }
1568        }
1569
1570        if ignore_stack.is_path_ignored(path, is_dir) {
1571            ignore_stack = IgnoreStack::all();
1572        }
1573
1574        ignore_stack
1575    }
1576}
1577
1578impl fmt::Debug for Snapshot {
1579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1580        for entry in self.entries_by_path.cursor::<(), ()>() {
1581            for _ in entry.path.ancestors().skip(1) {
1582                write!(f, " ")?;
1583            }
1584            writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1585        }
1586        Ok(())
1587    }
1588}
1589
1590#[derive(Clone, PartialEq)]
1591pub struct File {
1592    entry_id: Option<usize>,
1593    worktree: ModelHandle<Worktree>,
1594    pub path: Arc<Path>,
1595    pub mtime: SystemTime,
1596}
1597
1598impl File {
1599    pub fn new(
1600        entry_id: usize,
1601        worktree: ModelHandle<Worktree>,
1602        path: Arc<Path>,
1603        mtime: SystemTime,
1604    ) -> Self {
1605        Self {
1606            entry_id: Some(entry_id),
1607            worktree,
1608            path,
1609            mtime,
1610        }
1611    }
1612
1613    pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1614        self.worktree.update(cx, |worktree, cx| {
1615            if let Some((rpc, remote_id)) = match worktree {
1616                Worktree::Local(worktree) => worktree
1617                    .remote_id
1618                    .borrow()
1619                    .map(|id| (worktree.rpc.clone(), id)),
1620                Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1621            } {
1622                cx.spawn(|worktree, mut cx| async move {
1623                    if let Err(error) = rpc
1624                        .request(proto::UpdateBuffer {
1625                            worktree_id: remote_id,
1626                            buffer_id,
1627                            operations: vec![(&operation).into()],
1628                        })
1629                        .await
1630                    {
1631                        worktree.update(&mut cx, |worktree, _| {
1632                            log::error!("error sending buffer operation: {}", error);
1633                            match worktree {
1634                                Worktree::Local(t) => &mut t.queued_operations,
1635                                Worktree::Remote(t) => &mut t.queued_operations,
1636                            }
1637                            .push((buffer_id, operation));
1638                        });
1639                    }
1640                })
1641                .detach();
1642            }
1643        });
1644    }
1645
1646    pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1647        self.worktree.update(cx, |worktree, cx| {
1648            if let Worktree::Remote(worktree) = worktree {
1649                let worktree_id = worktree.remote_id;
1650                let rpc = worktree.rpc.clone();
1651                cx.background()
1652                    .spawn(async move {
1653                        if let Err(error) = rpc
1654                            .send(proto::CloseBuffer {
1655                                worktree_id,
1656                                buffer_id,
1657                            })
1658                            .await
1659                        {
1660                            log::error!("error closing remote buffer: {}", error);
1661                        }
1662                    })
1663                    .detach();
1664            }
1665        });
1666    }
1667
1668    /// Returns this file's path relative to the root of its worktree.
1669    pub fn path(&self) -> Arc<Path> {
1670        self.path.clone()
1671    }
1672
1673    pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1674        self.worktree.read(cx).abs_path.join(&self.path)
1675    }
1676
1677    /// Returns the last component of this handle's absolute path. If this handle refers to the root
1678    /// of its worktree, then this method will return the name of the worktree itself.
1679    pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1680        self.path
1681            .file_name()
1682            .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1683            .map(Into::into)
1684    }
1685
1686    pub fn is_deleted(&self) -> bool {
1687        self.entry_id.is_none()
1688    }
1689
1690    pub fn exists(&self) -> bool {
1691        !self.is_deleted()
1692    }
1693
1694    pub fn save(
1695        &self,
1696        buffer_id: u64,
1697        text: Rope,
1698        version: time::Global,
1699        cx: &mut MutableAppContext,
1700    ) -> Task<Result<(time::Global, SystemTime)>> {
1701        self.worktree.update(cx, |worktree, cx| match worktree {
1702            Worktree::Local(worktree) => {
1703                let rpc = worktree.rpc.clone();
1704                let worktree_id = *worktree.remote_id.borrow();
1705                let save = worktree.save(self.path.clone(), text, cx);
1706                cx.background().spawn(async move {
1707                    let entry = save.await?;
1708                    if let Some(worktree_id) = worktree_id {
1709                        rpc.send(proto::BufferSaved {
1710                            worktree_id,
1711                            buffer_id,
1712                            version: (&version).into(),
1713                            mtime: Some(entry.mtime.into()),
1714                        })
1715                        .await?;
1716                    }
1717                    Ok((version, entry.mtime))
1718                })
1719            }
1720            Worktree::Remote(worktree) => {
1721                let rpc = worktree.rpc.clone();
1722                let worktree_id = worktree.remote_id;
1723                cx.foreground().spawn(async move {
1724                    let response = rpc
1725                        .request(proto::SaveBuffer {
1726                            worktree_id,
1727                            buffer_id,
1728                        })
1729                        .await?;
1730                    let version = response.version.try_into()?;
1731                    let mtime = response
1732                        .mtime
1733                        .ok_or_else(|| anyhow!("missing mtime"))?
1734                        .into();
1735                    Ok((version, mtime))
1736                })
1737            }
1738        })
1739    }
1740
1741    pub fn worktree_id(&self) -> usize {
1742        self.worktree.id()
1743    }
1744
1745    pub fn entry_id(&self) -> (usize, Arc<Path>) {
1746        (self.worktree.id(), self.path.clone())
1747    }
1748}
1749
1750#[derive(Clone, Debug)]
1751pub struct Entry {
1752    pub id: usize,
1753    pub kind: EntryKind,
1754    pub path: Arc<Path>,
1755    pub inode: u64,
1756    pub mtime: SystemTime,
1757    pub is_symlink: bool,
1758    pub is_ignored: bool,
1759}
1760
1761#[derive(Clone, Debug)]
1762pub enum EntryKind {
1763    PendingDir,
1764    Dir,
1765    File(CharBag),
1766}
1767
1768impl Entry {
1769    fn new(
1770        path: Arc<Path>,
1771        metadata: &fs::Metadata,
1772        next_entry_id: &AtomicUsize,
1773        root_char_bag: CharBag,
1774    ) -> Self {
1775        Self {
1776            id: next_entry_id.fetch_add(1, SeqCst),
1777            kind: if metadata.is_dir {
1778                EntryKind::PendingDir
1779            } else {
1780                EntryKind::File(char_bag_for_path(root_char_bag, &path))
1781            },
1782            path,
1783            inode: metadata.inode,
1784            mtime: metadata.mtime,
1785            is_symlink: metadata.is_symlink,
1786            is_ignored: false,
1787        }
1788    }
1789
1790    pub fn is_dir(&self) -> bool {
1791        matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1792    }
1793
1794    pub fn is_file(&self) -> bool {
1795        matches!(self.kind, EntryKind::File(_))
1796    }
1797}
1798
1799impl sum_tree::Item for Entry {
1800    type Summary = EntrySummary;
1801
1802    fn summary(&self) -> Self::Summary {
1803        let file_count;
1804        let visible_file_count;
1805        if self.is_file() {
1806            file_count = 1;
1807            if self.is_ignored {
1808                visible_file_count = 0;
1809            } else {
1810                visible_file_count = 1;
1811            }
1812        } else {
1813            file_count = 0;
1814            visible_file_count = 0;
1815        }
1816
1817        EntrySummary {
1818            max_path: self.path.clone(),
1819            file_count,
1820            visible_file_count,
1821        }
1822    }
1823}
1824
1825impl sum_tree::KeyedItem for Entry {
1826    type Key = PathKey;
1827
1828    fn key(&self) -> Self::Key {
1829        PathKey(self.path.clone())
1830    }
1831}
1832
1833#[derive(Clone, Debug)]
1834pub struct EntrySummary {
1835    max_path: Arc<Path>,
1836    file_count: usize,
1837    visible_file_count: usize,
1838}
1839
1840impl Default for EntrySummary {
1841    fn default() -> Self {
1842        Self {
1843            max_path: Arc::from(Path::new("")),
1844            file_count: 0,
1845            visible_file_count: 0,
1846        }
1847    }
1848}
1849
1850impl sum_tree::Summary for EntrySummary {
1851    type Context = ();
1852
1853    fn add_summary(&mut self, rhs: &Self, _: &()) {
1854        self.max_path = rhs.max_path.clone();
1855        self.file_count += rhs.file_count;
1856        self.visible_file_count += rhs.visible_file_count;
1857    }
1858}
1859
1860#[derive(Clone, Debug)]
1861struct PathEntry {
1862    id: usize,
1863    path: Arc<Path>,
1864    scan_id: usize,
1865}
1866
1867impl sum_tree::Item for PathEntry {
1868    type Summary = PathEntrySummary;
1869
1870    fn summary(&self) -> Self::Summary {
1871        PathEntrySummary { max_id: self.id }
1872    }
1873}
1874
1875impl sum_tree::KeyedItem for PathEntry {
1876    type Key = usize;
1877
1878    fn key(&self) -> Self::Key {
1879        self.id
1880    }
1881}
1882
1883#[derive(Clone, Debug, Default)]
1884struct PathEntrySummary {
1885    max_id: usize,
1886}
1887
1888impl sum_tree::Summary for PathEntrySummary {
1889    type Context = ();
1890
1891    fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
1892        self.max_id = summary.max_id;
1893    }
1894}
1895
1896impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
1897    fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
1898        *self = summary.max_id;
1899    }
1900}
1901
1902#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1903pub struct PathKey(Arc<Path>);
1904
1905impl Default for PathKey {
1906    fn default() -> Self {
1907        Self(Path::new("").into())
1908    }
1909}
1910
1911impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1912    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1913        self.0 = summary.max_path.clone();
1914    }
1915}
1916
1917#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1918enum PathSearch<'a> {
1919    Exact(&'a Path),
1920    Successor(&'a Path),
1921}
1922
1923impl<'a> Ord for PathSearch<'a> {
1924    fn cmp(&self, other: &Self) -> cmp::Ordering {
1925        match (self, other) {
1926            (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1927            (Self::Successor(a), Self::Exact(b)) => {
1928                if b.starts_with(a) {
1929                    cmp::Ordering::Greater
1930                } else {
1931                    a.cmp(b)
1932                }
1933            }
1934            _ => unreachable!("not sure we need the other two cases"),
1935        }
1936    }
1937}
1938
1939impl<'a> PartialOrd for PathSearch<'a> {
1940    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1941        Some(self.cmp(other))
1942    }
1943}
1944
1945impl<'a> Default for PathSearch<'a> {
1946    fn default() -> Self {
1947        Self::Exact(Path::new("").into())
1948    }
1949}
1950
1951impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1952    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1953        *self = Self::Exact(summary.max_path.as_ref());
1954    }
1955}
1956
1957#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1958pub struct FileCount(usize);
1959
1960impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1961    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1962        self.0 += summary.file_count;
1963    }
1964}
1965
1966#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1967pub struct VisibleFileCount(usize);
1968
1969impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1970    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1971        self.0 += summary.visible_file_count;
1972    }
1973}
1974
1975struct BackgroundScanner {
1976    fs: Arc<dyn Fs>,
1977    snapshot: Arc<Mutex<Snapshot>>,
1978    notify: Sender<ScanState>,
1979    executor: Arc<executor::Background>,
1980}
1981
1982impl BackgroundScanner {
1983    fn new(
1984        snapshot: Arc<Mutex<Snapshot>>,
1985        notify: Sender<ScanState>,
1986        fs: Arc<dyn Fs>,
1987        executor: Arc<executor::Background>,
1988    ) -> Self {
1989        Self {
1990            fs,
1991            snapshot,
1992            notify,
1993            executor,
1994        }
1995    }
1996
1997    fn abs_path(&self) -> Arc<Path> {
1998        self.snapshot.lock().abs_path.clone()
1999    }
2000
2001    fn snapshot(&self) -> Snapshot {
2002        self.snapshot.lock().clone()
2003    }
2004
2005    async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2006        if self.notify.send(ScanState::Scanning).await.is_err() {
2007            return;
2008        }
2009
2010        if let Err(err) = self.scan_dirs().await {
2011            if self
2012                .notify
2013                .send(ScanState::Err(Arc::new(err)))
2014                .await
2015                .is_err()
2016            {
2017                return;
2018            }
2019        }
2020
2021        if self.notify.send(ScanState::Idle).await.is_err() {
2022            return;
2023        }
2024
2025        futures::pin_mut!(events_rx);
2026        while let Some(events) = events_rx.next().await {
2027            if self.notify.send(ScanState::Scanning).await.is_err() {
2028                break;
2029            }
2030
2031            if !self.process_events(events).await {
2032                break;
2033            }
2034
2035            if self.notify.send(ScanState::Idle).await.is_err() {
2036                break;
2037            }
2038        }
2039    }
2040
2041    async fn scan_dirs(&mut self) -> Result<()> {
2042        let root_char_bag;
2043        let next_entry_id;
2044        let is_dir;
2045        {
2046            let snapshot = self.snapshot.lock();
2047            root_char_bag = snapshot.root_char_bag;
2048            next_entry_id = snapshot.next_entry_id.clone();
2049            is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2050        };
2051
2052        if is_dir {
2053            let path: Arc<Path> = Arc::from(Path::new(""));
2054            let abs_path = self.abs_path();
2055            let (tx, rx) = channel::unbounded();
2056            tx.send(ScanJob {
2057                abs_path: abs_path.to_path_buf(),
2058                path,
2059                ignore_stack: IgnoreStack::none(),
2060                scan_queue: tx.clone(),
2061            })
2062            .await
2063            .unwrap();
2064            drop(tx);
2065
2066            self.executor
2067                .scoped(|scope| {
2068                    for _ in 0..self.executor.num_cpus() {
2069                        scope.spawn(async {
2070                            while let Ok(job) = rx.recv().await {
2071                                if let Err(err) = self
2072                                    .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2073                                    .await
2074                                {
2075                                    log::error!("error scanning {:?}: {}", job.abs_path, err);
2076                                }
2077                            }
2078                        });
2079                    }
2080                })
2081                .await;
2082        }
2083
2084        Ok(())
2085    }
2086
2087    async fn scan_dir(
2088        &self,
2089        root_char_bag: CharBag,
2090        next_entry_id: Arc<AtomicUsize>,
2091        job: &ScanJob,
2092    ) -> Result<()> {
2093        let mut new_entries: Vec<Entry> = Vec::new();
2094        let mut new_jobs: Vec<ScanJob> = Vec::new();
2095        let mut ignore_stack = job.ignore_stack.clone();
2096        let mut new_ignore = None;
2097
2098        let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2099        while let Some(child_abs_path) = child_paths.next().await {
2100            let child_abs_path = match child_abs_path {
2101                Ok(child_abs_path) => child_abs_path,
2102                Err(error) => {
2103                    log::error!("error processing entry {:?}", error);
2104                    continue;
2105                }
2106            };
2107            let child_name = child_abs_path.file_name().unwrap();
2108            let child_path: Arc<Path> = job.path.join(child_name).into();
2109            let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2110                Some(metadata) => metadata,
2111                None => continue,
2112            };
2113
2114            // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2115            if child_name == *GITIGNORE {
2116                let (ignore, err) = Gitignore::new(&child_abs_path);
2117                if let Some(err) = err {
2118                    log::error!("error in ignore file {:?} - {:?}", child_name, err);
2119                }
2120                let ignore = Arc::new(ignore);
2121                ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2122                new_ignore = Some(ignore);
2123
2124                // Update ignore status of any child entries we've already processed to reflect the
2125                // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2126                // there should rarely be too numerous. Update the ignore stack associated with any
2127                // new jobs as well.
2128                let mut new_jobs = new_jobs.iter_mut();
2129                for entry in &mut new_entries {
2130                    entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2131                    if entry.is_dir() {
2132                        new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2133                            IgnoreStack::all()
2134                        } else {
2135                            ignore_stack.clone()
2136                        };
2137                    }
2138                }
2139            }
2140
2141            let mut child_entry = Entry::new(
2142                child_path.clone(),
2143                &child_metadata,
2144                &next_entry_id,
2145                root_char_bag,
2146            );
2147
2148            if child_metadata.is_dir {
2149                let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2150                child_entry.is_ignored = is_ignored;
2151                new_entries.push(child_entry);
2152                new_jobs.push(ScanJob {
2153                    abs_path: child_abs_path,
2154                    path: child_path,
2155                    ignore_stack: if is_ignored {
2156                        IgnoreStack::all()
2157                    } else {
2158                        ignore_stack.clone()
2159                    },
2160                    scan_queue: job.scan_queue.clone(),
2161                });
2162            } else {
2163                child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2164                new_entries.push(child_entry);
2165            };
2166        }
2167
2168        self.snapshot
2169            .lock()
2170            .populate_dir(job.path.clone(), new_entries, new_ignore);
2171        for new_job in new_jobs {
2172            job.scan_queue.send(new_job).await.unwrap();
2173        }
2174
2175        Ok(())
2176    }
2177
2178    async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2179        let mut snapshot = self.snapshot();
2180        snapshot.scan_id += 1;
2181
2182        let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2183            abs_path
2184        } else {
2185            return false;
2186        };
2187        let root_char_bag = snapshot.root_char_bag;
2188        let next_entry_id = snapshot.next_entry_id.clone();
2189
2190        events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2191        events.dedup_by(|a, b| a.path.starts_with(&b.path));
2192
2193        for event in &events {
2194            match event.path.strip_prefix(&root_abs_path) {
2195                Ok(path) => snapshot.remove_path(&path),
2196                Err(_) => {
2197                    log::error!(
2198                        "unexpected event {:?} for root path {:?}",
2199                        event.path,
2200                        root_abs_path
2201                    );
2202                    continue;
2203                }
2204            }
2205        }
2206
2207        let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2208        for event in events {
2209            let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2210                Ok(path) => Arc::from(path.to_path_buf()),
2211                Err(_) => {
2212                    log::error!(
2213                        "unexpected event {:?} for root path {:?}",
2214                        event.path,
2215                        root_abs_path
2216                    );
2217                    continue;
2218                }
2219            };
2220
2221            match self.fs.metadata(&event.path).await {
2222                Ok(Some(metadata)) => {
2223                    let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2224                    let mut fs_entry = Entry::new(
2225                        path.clone(),
2226                        &metadata,
2227                        snapshot.next_entry_id.as_ref(),
2228                        snapshot.root_char_bag,
2229                    );
2230                    fs_entry.is_ignored = ignore_stack.is_all();
2231                    snapshot.insert_entry(fs_entry);
2232                    if metadata.is_dir {
2233                        scan_queue_tx
2234                            .send(ScanJob {
2235                                abs_path: event.path,
2236                                path,
2237                                ignore_stack,
2238                                scan_queue: scan_queue_tx.clone(),
2239                            })
2240                            .await
2241                            .unwrap();
2242                    }
2243                }
2244                Ok(None) => {}
2245                Err(err) => {
2246                    // TODO - create a special 'error' entry in the entries tree to mark this
2247                    log::error!("error reading file on event {:?}", err);
2248                }
2249            }
2250        }
2251
2252        *self.snapshot.lock() = snapshot;
2253
2254        // Scan any directories that were created as part of this event batch.
2255        drop(scan_queue_tx);
2256        self.executor
2257            .scoped(|scope| {
2258                for _ in 0..self.executor.num_cpus() {
2259                    scope.spawn(async {
2260                        while let Ok(job) = scan_queue_rx.recv().await {
2261                            if let Err(err) = self
2262                                .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2263                                .await
2264                            {
2265                                log::error!("error scanning {:?}: {}", job.abs_path, err);
2266                            }
2267                        }
2268                    });
2269                }
2270            })
2271            .await;
2272
2273        // Attempt to detect renames only over a single batch of file-system events.
2274        self.snapshot.lock().removed_entry_ids.clear();
2275
2276        self.update_ignore_statuses().await;
2277        true
2278    }
2279
2280    async fn update_ignore_statuses(&self) {
2281        let mut snapshot = self.snapshot();
2282
2283        let mut ignores_to_update = Vec::new();
2284        let mut ignores_to_delete = Vec::new();
2285        for (parent_path, (_, scan_id)) in &snapshot.ignores {
2286            if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2287                ignores_to_update.push(parent_path.clone());
2288            }
2289
2290            let ignore_path = parent_path.join(&*GITIGNORE);
2291            if snapshot.entry_for_path(ignore_path).is_none() {
2292                ignores_to_delete.push(parent_path.clone());
2293            }
2294        }
2295
2296        for parent_path in ignores_to_delete {
2297            snapshot.ignores.remove(&parent_path);
2298            self.snapshot.lock().ignores.remove(&parent_path);
2299        }
2300
2301        let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2302        ignores_to_update.sort_unstable();
2303        let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2304        while let Some(parent_path) = ignores_to_update.next() {
2305            while ignores_to_update
2306                .peek()
2307                .map_or(false, |p| p.starts_with(&parent_path))
2308            {
2309                ignores_to_update.next().unwrap();
2310            }
2311
2312            let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2313            ignore_queue_tx
2314                .send(UpdateIgnoreStatusJob {
2315                    path: parent_path,
2316                    ignore_stack,
2317                    ignore_queue: ignore_queue_tx.clone(),
2318                })
2319                .await
2320                .unwrap();
2321        }
2322        drop(ignore_queue_tx);
2323
2324        self.executor
2325            .scoped(|scope| {
2326                for _ in 0..self.executor.num_cpus() {
2327                    scope.spawn(async {
2328                        while let Ok(job) = ignore_queue_rx.recv().await {
2329                            self.update_ignore_status(job, &snapshot).await;
2330                        }
2331                    });
2332                }
2333            })
2334            .await;
2335    }
2336
2337    async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2338        let mut ignore_stack = job.ignore_stack;
2339        if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2340            ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2341        }
2342
2343        let mut edits = Vec::new();
2344        for mut entry in snapshot.child_entries(&job.path).cloned() {
2345            let was_ignored = entry.is_ignored;
2346            entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2347            if entry.is_dir() {
2348                let child_ignore_stack = if entry.is_ignored {
2349                    IgnoreStack::all()
2350                } else {
2351                    ignore_stack.clone()
2352                };
2353                job.ignore_queue
2354                    .send(UpdateIgnoreStatusJob {
2355                        path: entry.path.clone(),
2356                        ignore_stack: child_ignore_stack,
2357                        ignore_queue: job.ignore_queue.clone(),
2358                    })
2359                    .await
2360                    .unwrap();
2361            }
2362
2363            if entry.is_ignored != was_ignored {
2364                edits.push(Edit::Insert(entry));
2365            }
2366        }
2367        self.snapshot.lock().entries_by_path.edit(edits, &());
2368    }
2369}
2370
2371async fn refresh_entry(
2372    fs: &dyn Fs,
2373    snapshot: &Mutex<Snapshot>,
2374    path: Arc<Path>,
2375    abs_path: &Path,
2376) -> Result<Entry> {
2377    let root_char_bag;
2378    let next_entry_id;
2379    {
2380        let snapshot = snapshot.lock();
2381        root_char_bag = snapshot.root_char_bag;
2382        next_entry_id = snapshot.next_entry_id.clone();
2383    }
2384    let entry = Entry::new(
2385        path,
2386        &fs.metadata(abs_path)
2387            .await?
2388            .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2389        &next_entry_id,
2390        root_char_bag,
2391    );
2392    Ok(snapshot.lock().insert_entry(entry))
2393}
2394
2395fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2396    let mut result = root_char_bag;
2397    result.extend(
2398        path.to_string_lossy()
2399            .chars()
2400            .map(|c| c.to_ascii_lowercase()),
2401    );
2402    result
2403}
2404
2405struct ScanJob {
2406    abs_path: PathBuf,
2407    path: Arc<Path>,
2408    ignore_stack: Arc<IgnoreStack>,
2409    scan_queue: Sender<ScanJob>,
2410}
2411
2412struct UpdateIgnoreStatusJob {
2413    path: Arc<Path>,
2414    ignore_stack: Arc<IgnoreStack>,
2415    ignore_queue: Sender<UpdateIgnoreStatusJob>,
2416}
2417
2418pub trait WorktreeHandle {
2419    #[cfg(test)]
2420    fn flush_fs_events<'a>(
2421        &self,
2422        cx: &'a gpui::TestAppContext,
2423    ) -> futures::future::LocalBoxFuture<'a, ()>;
2424}
2425
2426impl WorktreeHandle for ModelHandle<Worktree> {
2427    // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2428    // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2429    // extra directory scans, and emit extra scan-state notifications.
2430    //
2431    // This function mutates the worktree's directory and waits for those mutations to be picked up,
2432    // to ensure that all redundant FS events have already been processed.
2433    #[cfg(test)]
2434    fn flush_fs_events<'a>(
2435        &self,
2436        cx: &'a gpui::TestAppContext,
2437    ) -> futures::future::LocalBoxFuture<'a, ()> {
2438        use smol::future::FutureExt;
2439
2440        let filename = "fs-event-sentinel";
2441        let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2442        let tree = self.clone();
2443        async move {
2444            std::fs::write(root_path.join(filename), "").unwrap();
2445            tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2446                .await;
2447
2448            std::fs::remove_file(root_path.join(filename)).unwrap();
2449            tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2450                .await;
2451
2452            cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2453                .await;
2454        }
2455        .boxed_local()
2456    }
2457}
2458
2459pub enum FileIter<'a> {
2460    All(Cursor<'a, Entry, FileCount, ()>),
2461    Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2462}
2463
2464impl<'a> FileIter<'a> {
2465    fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2466        let mut cursor = snapshot.entries_by_path.cursor();
2467        cursor.seek(&FileCount(start), Bias::Right, &());
2468        Self::All(cursor)
2469    }
2470
2471    fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2472        let mut cursor = snapshot.entries_by_path.cursor();
2473        cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2474        Self::Visible(cursor)
2475    }
2476
2477    fn next_internal(&mut self) {
2478        match self {
2479            Self::All(cursor) => {
2480                let ix = *cursor.seek_start();
2481                cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2482            }
2483            Self::Visible(cursor) => {
2484                let ix = *cursor.seek_start();
2485                cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2486            }
2487        }
2488    }
2489
2490    fn item(&self) -> Option<&'a Entry> {
2491        match self {
2492            Self::All(cursor) => cursor.item(),
2493            Self::Visible(cursor) => cursor.item(),
2494        }
2495    }
2496}
2497
2498impl<'a> Iterator for FileIter<'a> {
2499    type Item = &'a Entry;
2500
2501    fn next(&mut self) -> Option<Self::Item> {
2502        if let Some(entry) = self.item() {
2503            self.next_internal();
2504            Some(entry)
2505        } else {
2506            None
2507        }
2508    }
2509}
2510
2511struct ChildEntriesIter<'a> {
2512    parent_path: &'a Path,
2513    cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2514}
2515
2516impl<'a> ChildEntriesIter<'a> {
2517    fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2518        let mut cursor = snapshot.entries_by_path.cursor();
2519        cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2520        Self {
2521            parent_path,
2522            cursor,
2523        }
2524    }
2525}
2526
2527impl<'a> Iterator for ChildEntriesIter<'a> {
2528    type Item = &'a Entry;
2529
2530    fn next(&mut self) -> Option<Self::Item> {
2531        if let Some(item) = self.cursor.item() {
2532            if item.path.starts_with(self.parent_path) {
2533                self.cursor
2534                    .seek_forward(&PathSearch::Successor(&item.path), Bias::Left, &());
2535                Some(item)
2536            } else {
2537                None
2538            }
2539        } else {
2540            None
2541        }
2542    }
2543}
2544
2545impl<'a> From<&'a Entry> for proto::Entry {
2546    fn from(entry: &'a Entry) -> Self {
2547        Self {
2548            id: entry.id as u64,
2549            is_dir: entry.is_dir(),
2550            path: entry.path.to_string_lossy().to_string(),
2551            inode: entry.inode,
2552            mtime: Some(entry.mtime.into()),
2553            is_symlink: entry.is_symlink,
2554            is_ignored: entry.is_ignored,
2555        }
2556    }
2557}
2558
2559impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2560    type Error = anyhow::Error;
2561
2562    fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2563        if let Some(mtime) = entry.mtime {
2564            let kind = if entry.is_dir {
2565                EntryKind::Dir
2566            } else {
2567                let mut char_bag = root_char_bag.clone();
2568                char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2569                EntryKind::File(char_bag)
2570            };
2571            let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2572            Ok(Entry {
2573                id: entry.id as usize,
2574                kind,
2575                path: path.clone(),
2576                inode: entry.inode,
2577                mtime: mtime.into(),
2578                is_symlink: entry.is_symlink,
2579                is_ignored: entry.is_ignored,
2580            })
2581        } else {
2582            Err(anyhow!(
2583                "missing mtime in remote worktree entry {:?}",
2584                entry.path
2585            ))
2586        }
2587    }
2588}
2589
2590#[cfg(test)]
2591mod tests {
2592    use super::*;
2593    use crate::fs::FakeFs;
2594    use crate::test::*;
2595    use anyhow::Result;
2596    use fs::RealFs;
2597    use rand::prelude::*;
2598    use serde_json::json;
2599    use std::time::UNIX_EPOCH;
2600    use std::{env, fmt::Write, os::unix, time::SystemTime};
2601
2602    #[gpui::test]
2603    async fn test_populate_and_search(cx: gpui::TestAppContext) {
2604        let dir = temp_tree(json!({
2605            "root": {
2606                "apple": "",
2607                "banana": {
2608                    "carrot": {
2609                        "date": "",
2610                        "endive": "",
2611                    }
2612                },
2613                "fennel": {
2614                    "grape": "",
2615                }
2616            }
2617        }));
2618
2619        let root_link_path = dir.path().join("root_link");
2620        unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2621        unix::fs::symlink(
2622            &dir.path().join("root/fennel"),
2623            &dir.path().join("root/finnochio"),
2624        )
2625        .unwrap();
2626
2627        let tree = Worktree::open_local(
2628            rpc::Client::new(),
2629            root_link_path,
2630            Arc::new(RealFs),
2631            Default::default(),
2632            &mut cx.to_async(),
2633        )
2634        .await
2635        .unwrap();
2636
2637        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2638            .await;
2639        let snapshots = [cx.read(|cx| {
2640            let tree = tree.read(cx);
2641            assert_eq!(tree.file_count(), 5);
2642            assert_eq!(
2643                tree.inode_for_path("fennel/grape"),
2644                tree.inode_for_path("finnochio/grape")
2645            );
2646            tree.snapshot()
2647        })];
2648        let cancel_flag = Default::default();
2649        let results = cx
2650            .read(|cx| {
2651                match_paths(
2652                    &snapshots,
2653                    "bna",
2654                    false,
2655                    false,
2656                    10,
2657                    &cancel_flag,
2658                    cx.background().clone(),
2659                )
2660            })
2661            .await;
2662        assert_eq!(
2663            results
2664                .into_iter()
2665                .map(|result| result.path)
2666                .collect::<Vec<Arc<Path>>>(),
2667            vec![
2668                PathBuf::from("banana/carrot/date").into(),
2669                PathBuf::from("banana/carrot/endive").into(),
2670            ]
2671        );
2672    }
2673
2674    #[gpui::test]
2675    async fn test_search_worktree_without_files(cx: gpui::TestAppContext) {
2676        let dir = temp_tree(json!({
2677            "root": {
2678                "dir1": {},
2679                "dir2": {
2680                    "dir3": {}
2681                }
2682            }
2683        }));
2684        let tree = Worktree::open_local(
2685            rpc::Client::new(),
2686            dir.path(),
2687            Arc::new(RealFs),
2688            Default::default(),
2689            &mut cx.to_async(),
2690        )
2691        .await
2692        .unwrap();
2693
2694        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2695            .await;
2696        let snapshots = [cx.read(|cx| {
2697            let tree = tree.read(cx);
2698            assert_eq!(tree.file_count(), 0);
2699            tree.snapshot()
2700        })];
2701        let cancel_flag = Default::default();
2702        let results = cx
2703            .read(|cx| {
2704                match_paths(
2705                    &snapshots,
2706                    "dir",
2707                    false,
2708                    false,
2709                    10,
2710                    &cancel_flag,
2711                    cx.background().clone(),
2712                )
2713            })
2714            .await;
2715        assert_eq!(
2716            results
2717                .into_iter()
2718                .map(|result| result.path)
2719                .collect::<Vec<Arc<Path>>>(),
2720            vec![]
2721        );
2722    }
2723
2724    #[gpui::test]
2725    async fn test_save_file(mut cx: gpui::TestAppContext) {
2726        let dir = temp_tree(json!({
2727            "file1": "the old contents",
2728        }));
2729        let tree = Worktree::open_local(
2730            rpc::Client::new(),
2731            dir.path(),
2732            Arc::new(RealFs),
2733            Default::default(),
2734            &mut cx.to_async(),
2735        )
2736        .await
2737        .unwrap();
2738        let buffer = tree
2739            .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2740            .await
2741            .unwrap();
2742        let save = buffer.update(&mut cx, |buffer, cx| {
2743            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2744            buffer.save(cx).unwrap()
2745        });
2746        save.await.unwrap();
2747
2748        let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2749        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2750    }
2751
2752    #[gpui::test]
2753    async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2754        let dir = temp_tree(json!({
2755            "file1": "the old contents",
2756        }));
2757        let file_path = dir.path().join("file1");
2758
2759        let tree = Worktree::open_local(
2760            rpc::Client::new(),
2761            file_path.clone(),
2762            Arc::new(RealFs),
2763            Default::default(),
2764            &mut cx.to_async(),
2765        )
2766        .await
2767        .unwrap();
2768        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2769            .await;
2770        cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2771
2772        let buffer = tree
2773            .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2774            .await
2775            .unwrap();
2776        let save = buffer.update(&mut cx, |buffer, cx| {
2777            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2778            buffer.save(cx).unwrap()
2779        });
2780        save.await.unwrap();
2781
2782        let new_text = std::fs::read_to_string(file_path).unwrap();
2783        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2784    }
2785
2786    #[gpui::test]
2787    async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2788        let dir = temp_tree(json!({
2789            "a": {
2790                "file1": "",
2791                "file2": "",
2792                "file3": "",
2793            },
2794            "b": {
2795                "c": {
2796                    "file4": "",
2797                    "file5": "",
2798                }
2799            }
2800        }));
2801
2802        let tree = Worktree::open_local(
2803            rpc::Client::new(),
2804            dir.path(),
2805            Arc::new(RealFs),
2806            Default::default(),
2807            &mut cx.to_async(),
2808        )
2809        .await
2810        .unwrap();
2811
2812        let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2813            let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2814            async move { buffer.await.unwrap() }
2815        };
2816        let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2817            tree.read_with(cx, |tree, _| {
2818                tree.entry_for_path(path)
2819                    .expect(&format!("no entry for path {}", path))
2820                    .id
2821            })
2822        };
2823
2824        let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2825        let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2826        let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2827        let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2828
2829        let file2_id = id_for_path("a/file2", &cx);
2830        let file3_id = id_for_path("a/file3", &cx);
2831        let file4_id = id_for_path("b/c/file4", &cx);
2832
2833        // Wait for the initial scan.
2834        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2835            .await;
2836
2837        // Create a remote copy of this worktree.
2838        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2839        let worktree_id = 1;
2840        let share_request = tree
2841            .update(&mut cx, |tree, cx| {
2842                tree.as_local().unwrap().share_request(cx)
2843            })
2844            .await
2845            .unwrap();
2846        let remote = Worktree::remote(
2847            proto::JoinWorktreeResponse {
2848                worktree: share_request.worktree,
2849                replica_id: 1,
2850                peers: Vec::new(),
2851            },
2852            rpc::Client::new(),
2853            Default::default(),
2854            &mut cx.to_async(),
2855        )
2856        .await
2857        .unwrap();
2858
2859        cx.read(|cx| {
2860            assert!(!buffer2.read(cx).is_dirty());
2861            assert!(!buffer3.read(cx).is_dirty());
2862            assert!(!buffer4.read(cx).is_dirty());
2863            assert!(!buffer5.read(cx).is_dirty());
2864        });
2865
2866        // Rename and delete files and directories.
2867        tree.flush_fs_events(&cx).await;
2868        std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2869        std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2870        std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2871        std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2872        tree.flush_fs_events(&cx).await;
2873
2874        let expected_paths = vec![
2875            "a",
2876            "a/file1",
2877            "a/file2.new",
2878            "b",
2879            "d",
2880            "d/file3",
2881            "d/file4",
2882        ];
2883
2884        cx.read(|app| {
2885            assert_eq!(
2886                tree.read(app)
2887                    .paths()
2888                    .map(|p| p.to_str().unwrap())
2889                    .collect::<Vec<_>>(),
2890                expected_paths
2891            );
2892
2893            assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2894            assert_eq!(id_for_path("d/file3", &cx), file3_id);
2895            assert_eq!(id_for_path("d/file4", &cx), file4_id);
2896
2897            assert_eq!(
2898                buffer2.read(app).file().unwrap().path().as_ref(),
2899                Path::new("a/file2.new")
2900            );
2901            assert_eq!(
2902                buffer3.read(app).file().unwrap().path().as_ref(),
2903                Path::new("d/file3")
2904            );
2905            assert_eq!(
2906                buffer4.read(app).file().unwrap().path().as_ref(),
2907                Path::new("d/file4")
2908            );
2909            assert_eq!(
2910                buffer5.read(app).file().unwrap().path().as_ref(),
2911                Path::new("b/c/file5")
2912            );
2913
2914            assert!(!buffer2.read(app).file().unwrap().is_deleted());
2915            assert!(!buffer3.read(app).file().unwrap().is_deleted());
2916            assert!(!buffer4.read(app).file().unwrap().is_deleted());
2917            assert!(buffer5.read(app).file().unwrap().is_deleted());
2918        });
2919
2920        // Update the remote worktree. Check that it becomes consistent with the
2921        // local worktree.
2922        remote.update(&mut cx, |remote, cx| {
2923            let update_message = tree
2924                .read(cx)
2925                .snapshot()
2926                .build_update(&initial_snapshot, worktree_id);
2927            remote
2928                .as_remote_mut()
2929                .unwrap()
2930                .snapshot
2931                .apply_update(update_message)
2932                .unwrap();
2933
2934            assert_eq!(
2935                remote
2936                    .paths()
2937                    .map(|p| p.to_str().unwrap())
2938                    .collect::<Vec<_>>(),
2939                expected_paths
2940            );
2941        });
2942    }
2943
2944    #[gpui::test]
2945    async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
2946        let dir = temp_tree(json!({
2947            ".git": {},
2948            ".gitignore": "ignored-dir\n",
2949            "tracked-dir": {
2950                "tracked-file1": "tracked contents",
2951            },
2952            "ignored-dir": {
2953                "ignored-file1": "ignored contents",
2954            }
2955        }));
2956
2957        let tree = Worktree::open_local(
2958            rpc::Client::new(),
2959            dir.path(),
2960            Arc::new(RealFs),
2961            Default::default(),
2962            &mut cx.to_async(),
2963        )
2964        .await
2965        .unwrap();
2966        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2967            .await;
2968        tree.flush_fs_events(&cx).await;
2969        cx.read(|cx| {
2970            let tree = tree.read(cx);
2971            let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2972            let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2973            assert_eq!(tracked.is_ignored, false);
2974            assert_eq!(ignored.is_ignored, true);
2975        });
2976
2977        std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2978        std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2979        tree.flush_fs_events(&cx).await;
2980        cx.read(|cx| {
2981            let tree = tree.read(cx);
2982            let dot_git = tree.entry_for_path(".git").unwrap();
2983            let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2984            let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2985            assert_eq!(tracked.is_ignored, false);
2986            assert_eq!(ignored.is_ignored, true);
2987            assert_eq!(dot_git.is_ignored, true);
2988        });
2989    }
2990
2991    #[gpui::test]
2992    async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
2993        let user_id = 100;
2994        let mut client = rpc::Client::new();
2995        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
2996
2997        let fs = Arc::new(FakeFs::new());
2998        fs.insert_tree(
2999            "/path",
3000            json!({
3001                "to": {
3002                    "the-dir": {
3003                        ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3004                        "a.txt": "a-contents",
3005                    },
3006                },
3007            }),
3008        )
3009        .await;
3010
3011        let worktree = Worktree::open_local(
3012            client.clone(),
3013            "/path/to/the-dir".as_ref(),
3014            fs,
3015            Default::default(),
3016            &mut cx.to_async(),
3017        )
3018        .await
3019        .unwrap();
3020
3021        {
3022            let cx = cx.to_async();
3023            client.authenticate_and_connect(&cx).await.unwrap();
3024        }
3025
3026        let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3027        assert_eq!(
3028            open_worktree.payload,
3029            proto::OpenWorktree {
3030                root_name: "the-dir".to_string(),
3031                collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3032            }
3033        );
3034
3035        server
3036            .respond(
3037                open_worktree.receipt(),
3038                proto::OpenWorktreeResponse { worktree_id: 5 },
3039            )
3040            .await;
3041        let remote_id = worktree
3042            .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3043            .await;
3044        assert_eq!(remote_id, Some(5));
3045
3046        cx.update(move |_| drop(worktree));
3047        server.receive::<proto::CloseWorktree>().await.unwrap();
3048    }
3049
3050    #[gpui::test(iterations = 100)]
3051    fn test_random(mut rng: StdRng) {
3052        let operations = env::var("OPERATIONS")
3053            .map(|o| o.parse().unwrap())
3054            .unwrap_or(40);
3055        let initial_entries = env::var("INITIAL_ENTRIES")
3056            .map(|o| o.parse().unwrap())
3057            .unwrap_or(20);
3058
3059        let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3060        for _ in 0..initial_entries {
3061            randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3062        }
3063        log::info!("Generated initial tree");
3064
3065        let (notify_tx, _notify_rx) = smol::channel::unbounded();
3066        let fs = Arc::new(RealFs);
3067        let next_entry_id = Arc::new(AtomicUsize::new(0));
3068        let mut initial_snapshot = Snapshot {
3069            id: 0,
3070            scan_id: 0,
3071            abs_path: root_dir.path().into(),
3072            entries_by_path: Default::default(),
3073            entries_by_id: Default::default(),
3074            removed_entry_ids: Default::default(),
3075            ignores: Default::default(),
3076            root_name: Default::default(),
3077            root_char_bag: Default::default(),
3078            next_entry_id: next_entry_id.clone(),
3079        };
3080        initial_snapshot.insert_entry(Entry::new(
3081            Path::new("").into(),
3082            &smol::block_on(fs.metadata(root_dir.path()))
3083                .unwrap()
3084                .unwrap(),
3085            &next_entry_id,
3086            Default::default(),
3087        ));
3088        let mut scanner = BackgroundScanner::new(
3089            Arc::new(Mutex::new(initial_snapshot.clone())),
3090            notify_tx,
3091            fs.clone(),
3092            Arc::new(gpui::executor::Background::new()),
3093        );
3094        smol::block_on(scanner.scan_dirs()).unwrap();
3095        scanner.snapshot().check_invariants();
3096
3097        let mut events = Vec::new();
3098        let mut mutations_len = operations;
3099        while mutations_len > 1 {
3100            if !events.is_empty() && rng.gen_bool(0.4) {
3101                let len = rng.gen_range(0..=events.len());
3102                let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3103                log::info!("Delivering events: {:#?}", to_deliver);
3104                smol::block_on(scanner.process_events(to_deliver));
3105                scanner.snapshot().check_invariants();
3106            } else {
3107                events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3108                mutations_len -= 1;
3109            }
3110        }
3111        log::info!("Quiescing: {:#?}", events);
3112        smol::block_on(scanner.process_events(events));
3113        scanner.snapshot().check_invariants();
3114
3115        let (notify_tx, _notify_rx) = smol::channel::unbounded();
3116        let mut new_scanner = BackgroundScanner::new(
3117            Arc::new(Mutex::new(initial_snapshot)),
3118            notify_tx,
3119            scanner.fs.clone(),
3120            scanner.executor.clone(),
3121        );
3122        smol::block_on(new_scanner.scan_dirs()).unwrap();
3123        assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
3124    }
3125
3126    fn randomly_mutate_tree(
3127        root_path: &Path,
3128        insertion_probability: f64,
3129        rng: &mut impl Rng,
3130    ) -> Result<Vec<fsevent::Event>> {
3131        let root_path = root_path.canonicalize().unwrap();
3132        let (dirs, files) = read_dir_recursive(root_path.clone());
3133
3134        let mut events = Vec::new();
3135        let mut record_event = |path: PathBuf| {
3136            events.push(fsevent::Event {
3137                event_id: SystemTime::now()
3138                    .duration_since(UNIX_EPOCH)
3139                    .unwrap()
3140                    .as_secs(),
3141                flags: fsevent::StreamFlags::empty(),
3142                path,
3143            });
3144        };
3145
3146        if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3147            let path = dirs.choose(rng).unwrap();
3148            let new_path = path.join(gen_name(rng));
3149
3150            if rng.gen() {
3151                log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3152                std::fs::create_dir(&new_path)?;
3153            } else {
3154                log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3155                std::fs::write(&new_path, "")?;
3156            }
3157            record_event(new_path);
3158        } else if rng.gen_bool(0.05) {
3159            let ignore_dir_path = dirs.choose(rng).unwrap();
3160            let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3161
3162            let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3163            let files_to_ignore = {
3164                let len = rng.gen_range(0..=subfiles.len());
3165                subfiles.choose_multiple(rng, len)
3166            };
3167            let dirs_to_ignore = {
3168                let len = rng.gen_range(0..subdirs.len());
3169                subdirs.choose_multiple(rng, len)
3170            };
3171
3172            let mut ignore_contents = String::new();
3173            for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3174                write!(
3175                    ignore_contents,
3176                    "{}\n",
3177                    path_to_ignore
3178                        .strip_prefix(&ignore_dir_path)?
3179                        .to_str()
3180                        .unwrap()
3181                )
3182                .unwrap();
3183            }
3184            log::info!(
3185                "Creating {:?} with contents:\n{}",
3186                ignore_path.strip_prefix(&root_path)?,
3187                ignore_contents
3188            );
3189            std::fs::write(&ignore_path, ignore_contents).unwrap();
3190            record_event(ignore_path);
3191        } else {
3192            let old_path = {
3193                let file_path = files.choose(rng);
3194                let dir_path = dirs[1..].choose(rng);
3195                file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3196            };
3197
3198            let is_rename = rng.gen();
3199            if is_rename {
3200                let new_path_parent = dirs
3201                    .iter()
3202                    .filter(|d| !d.starts_with(old_path))
3203                    .choose(rng)
3204                    .unwrap();
3205
3206                let overwrite_existing_dir =
3207                    !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3208                let new_path = if overwrite_existing_dir {
3209                    std::fs::remove_dir_all(&new_path_parent).ok();
3210                    new_path_parent.to_path_buf()
3211                } else {
3212                    new_path_parent.join(gen_name(rng))
3213                };
3214
3215                log::info!(
3216                    "Renaming {:?} to {}{:?}",
3217                    old_path.strip_prefix(&root_path)?,
3218                    if overwrite_existing_dir {
3219                        "overwrite "
3220                    } else {
3221                        ""
3222                    },
3223                    new_path.strip_prefix(&root_path)?
3224                );
3225                std::fs::rename(&old_path, &new_path)?;
3226                record_event(old_path.clone());
3227                record_event(new_path);
3228            } else if old_path.is_dir() {
3229                let (dirs, files) = read_dir_recursive(old_path.clone());
3230
3231                log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3232                std::fs::remove_dir_all(&old_path).unwrap();
3233                for file in files {
3234                    record_event(file);
3235                }
3236                for dir in dirs {
3237                    record_event(dir);
3238                }
3239            } else {
3240                log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3241                std::fs::remove_file(old_path).unwrap();
3242                record_event(old_path.clone());
3243            }
3244        }
3245
3246        Ok(events)
3247    }
3248
3249    fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3250        let child_entries = std::fs::read_dir(&path).unwrap();
3251        let mut dirs = vec![path];
3252        let mut files = Vec::new();
3253        for child_entry in child_entries {
3254            let child_path = child_entry.unwrap().path();
3255            if child_path.is_dir() {
3256                let (child_dirs, child_files) = read_dir_recursive(child_path);
3257                dirs.extend(child_dirs);
3258                files.extend(child_files);
3259            } else {
3260                files.push(child_path);
3261            }
3262        }
3263        (dirs, files)
3264    }
3265
3266    fn gen_name(rng: &mut impl Rng) -> String {
3267        (0..6)
3268            .map(|_| rng.sample(rand::distributions::Alphanumeric))
3269            .map(char::from)
3270            .collect()
3271    }
3272
3273    impl Snapshot {
3274        fn check_invariants(&self) {
3275            let mut files = self.files(0);
3276            let mut visible_files = self.visible_files(0);
3277            for entry in self.entries_by_path.cursor::<(), ()>() {
3278                if entry.is_file() {
3279                    assert_eq!(files.next().unwrap().inode, entry.inode);
3280                    if !entry.is_ignored {
3281                        assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3282                    }
3283                }
3284            }
3285            assert!(files.next().is_none());
3286            assert!(visible_files.next().is_none());
3287
3288            let mut bfs_paths = Vec::new();
3289            let mut stack = vec![Path::new("")];
3290            while let Some(path) = stack.pop() {
3291                bfs_paths.push(path);
3292                let ix = stack.len();
3293                for child_entry in self.child_entries(path) {
3294                    stack.insert(ix, &child_entry.path);
3295                }
3296            }
3297
3298            let dfs_paths = self
3299                .entries_by_path
3300                .cursor::<(), ()>()
3301                .map(|e| e.path.as_ref())
3302                .collect::<Vec<_>>();
3303            assert_eq!(bfs_paths, dfs_paths);
3304
3305            for (ignore_parent_path, _) in &self.ignores {
3306                assert!(self.entry_for_path(ignore_parent_path).is_some());
3307                assert!(self
3308                    .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3309                    .is_some());
3310            }
3311        }
3312
3313        fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3314            let mut paths = Vec::new();
3315            for entry in self.entries_by_path.cursor::<(), ()>() {
3316                paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3317            }
3318            paths.sort_by(|a, b| a.0.cmp(&b.0));
3319            paths
3320        }
3321    }
3322}