worktree.rs

   1mod ignore;
   2
   3use self::ignore::IgnoreStack;
   4use crate::{
   5    editor::{self, Buffer, History, Operation, Rope},
   6    fs::{self, Fs},
   7    fuzzy,
   8    fuzzy::CharBag,
   9    language::{Language, LanguageRegistry},
  10    rpc::{self, proto, Status},
  11    time::{self, ReplicaId},
  12    util::{Bias, TryFutureExt},
  13};
  14use ::ignore::gitignore::Gitignore;
  15use anyhow::{anyhow, Result};
  16use futures::{Stream, StreamExt};
  17pub use fuzzy::{match_paths, PathMatch};
  18use gpui::{
  19    executor,
  20    sum_tree::{self, Cursor, Edit, SumTree},
  21    AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task,
  22    UpgradeModelHandle, WeakModelHandle,
  23};
  24use lazy_static::lazy_static;
  25use parking_lot::Mutex;
  26use postage::{
  27    prelude::{Sink as _, Stream as _},
  28    watch,
  29};
  30use serde::Deserialize;
  31use smol::channel::{self, Sender};
  32use std::{
  33    cmp::{self, Ordering},
  34    collections::HashMap,
  35    convert::{TryFrom, TryInto},
  36    ffi::{OsStr, OsString},
  37    fmt,
  38    future::Future,
  39    ops::Deref,
  40    path::{Path, PathBuf},
  41    sync::{
  42        atomic::{AtomicUsize, Ordering::SeqCst},
  43        Arc,
  44    },
  45    time::{Duration, SystemTime},
  46};
  47use zrpc::{PeerId, TypedEnvelope};
  48
  49lazy_static! {
  50    static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
  51}
  52
  53#[derive(Clone, Debug)]
  54enum ScanState {
  55    Idle,
  56    Scanning,
  57    Err(Arc<anyhow::Error>),
  58}
  59
  60pub enum Worktree {
  61    Local(LocalWorktree),
  62    Remote(RemoteWorktree),
  63}
  64
  65pub enum Event {
  66    Closed,
  67}
  68
  69impl Entity for Worktree {
  70    type Event = Event;
  71
  72    fn release(&mut self, cx: &mut MutableAppContext) {
  73        match self {
  74            Self::Local(tree) => {
  75                if let Some(worktree_id) = *tree.remote_id.borrow() {
  76                    let rpc = tree.rpc.clone();
  77                    cx.spawn(|_| async move {
  78                        if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
  79                            log::error!("error closing worktree: {}", err);
  80                        }
  81                    })
  82                    .detach();
  83                }
  84            }
  85            Self::Remote(tree) => {
  86                let rpc = tree.rpc.clone();
  87                let worktree_id = tree.remote_id;
  88                cx.spawn(|_| async move {
  89                    if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
  90                        log::error!("error closing worktree: {}", err);
  91                    }
  92                })
  93                .detach();
  94            }
  95        }
  96    }
  97}
  98
  99impl Worktree {
 100    pub async fn open_local(
 101        rpc: Arc<rpc::Client>,
 102        path: impl Into<Arc<Path>>,
 103        fs: Arc<dyn Fs>,
 104        languages: Arc<LanguageRegistry>,
 105        cx: &mut AsyncAppContext,
 106    ) -> Result<ModelHandle<Self>> {
 107        let (tree, scan_states_tx) =
 108            LocalWorktree::new(rpc, path, fs.clone(), languages, cx).await?;
 109        tree.update(cx, |tree, cx| {
 110            let tree = tree.as_local_mut().unwrap();
 111            let abs_path = tree.snapshot.abs_path.clone();
 112            let background_snapshot = tree.background_snapshot.clone();
 113            let background = cx.background().clone();
 114            tree._background_scanner_task = Some(cx.background().spawn(async move {
 115                let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
 116                let scanner =
 117                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
 118                scanner.run(events).await;
 119            }));
 120        });
 121        Ok(tree)
 122    }
 123
 124    pub async fn open_remote(
 125        rpc: Arc<rpc::Client>,
 126        id: u64,
 127        languages: Arc<LanguageRegistry>,
 128        cx: &mut AsyncAppContext,
 129    ) -> Result<ModelHandle<Self>> {
 130        let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
 131        Worktree::remote(response, rpc, languages, cx).await
 132    }
 133
 134    async fn remote(
 135        join_response: proto::JoinWorktreeResponse,
 136        rpc: Arc<rpc::Client>,
 137        languages: Arc<LanguageRegistry>,
 138        cx: &mut AsyncAppContext,
 139    ) -> Result<ModelHandle<Self>> {
 140        let worktree = join_response
 141            .worktree
 142            .ok_or_else(|| anyhow!("empty worktree"))?;
 143
 144        let remote_id = worktree.id;
 145        let replica_id = join_response.replica_id as ReplicaId;
 146        let peers = join_response.peers;
 147        let root_char_bag: CharBag = worktree
 148            .root_name
 149            .chars()
 150            .map(|c| c.to_ascii_lowercase())
 151            .collect();
 152        let root_name = worktree.root_name.clone();
 153        let (entries_by_path, entries_by_id) = cx
 154            .background()
 155            .spawn(async move {
 156                let mut entries_by_path_edits = Vec::new();
 157                let mut entries_by_id_edits = Vec::new();
 158                for entry in worktree.entries {
 159                    match Entry::try_from((&root_char_bag, entry)) {
 160                        Ok(entry) => {
 161                            entries_by_id_edits.push(Edit::Insert(PathEntry {
 162                                id: entry.id,
 163                                path: entry.path.clone(),
 164                                scan_id: 0,
 165                            }));
 166                            entries_by_path_edits.push(Edit::Insert(entry));
 167                        }
 168                        Err(err) => log::warn!("error for remote worktree entry {:?}", err),
 169                    }
 170                }
 171
 172                let mut entries_by_path = SumTree::new();
 173                let mut entries_by_id = SumTree::new();
 174                entries_by_path.edit(entries_by_path_edits, &());
 175                entries_by_id.edit(entries_by_id_edits, &());
 176                (entries_by_path, entries_by_id)
 177            })
 178            .await;
 179
 180        let worktree = cx.update(|cx| {
 181            cx.add_model(|cx: &mut ModelContext<Worktree>| {
 182                let snapshot = Snapshot {
 183                    id: cx.model_id(),
 184                    scan_id: 0,
 185                    abs_path: Path::new("").into(),
 186                    root_name,
 187                    root_char_bag,
 188                    ignores: Default::default(),
 189                    entries_by_path,
 190                    entries_by_id,
 191                    removed_entry_ids: Default::default(),
 192                    next_entry_id: Default::default(),
 193                };
 194
 195                let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
 196                let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
 197
 198                cx.background()
 199                    .spawn(async move {
 200                        while let Some(update) = updates_rx.recv().await {
 201                            let mut snapshot = snapshot_tx.borrow().clone();
 202                            if let Err(error) = snapshot.apply_update(update) {
 203                                log::error!("error applying worktree update: {}", error);
 204                            }
 205                            *snapshot_tx.borrow_mut() = snapshot;
 206                        }
 207                    })
 208                    .detach();
 209
 210                {
 211                    let mut snapshot_rx = snapshot_rx.clone();
 212                    cx.spawn_weak(|this, mut cx| async move {
 213                        while let Some(_) = snapshot_rx.recv().await {
 214                            if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
 215                                this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
 216                            } else {
 217                                break;
 218                            }
 219                        }
 220                    })
 221                    .detach();
 222                }
 223
 224                let _subscriptions = vec![
 225                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_add_peer),
 226                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer),
 227                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_update),
 228                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
 229                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
 230                    rpc.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
 231                ];
 232
 233                Worktree::Remote(RemoteWorktree {
 234                    remote_id,
 235                    replica_id,
 236                    snapshot,
 237                    snapshot_rx,
 238                    updates_tx,
 239                    rpc: rpc.clone(),
 240                    open_buffers: Default::default(),
 241                    peers: peers
 242                        .into_iter()
 243                        .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
 244                        .collect(),
 245                    queued_operations: Default::default(),
 246                    languages,
 247                    _subscriptions,
 248                })
 249            })
 250        });
 251
 252        Ok(worktree)
 253    }
 254
 255    pub fn as_local(&self) -> Option<&LocalWorktree> {
 256        if let Worktree::Local(worktree) = self {
 257            Some(worktree)
 258        } else {
 259            None
 260        }
 261    }
 262
 263    pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
 264        if let Worktree::Local(worktree) = self {
 265            Some(worktree)
 266        } else {
 267            None
 268        }
 269    }
 270
 271    pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
 272        if let Worktree::Remote(worktree) = self {
 273            Some(worktree)
 274        } else {
 275            None
 276        }
 277    }
 278
 279    pub fn snapshot(&self) -> Snapshot {
 280        match self {
 281            Worktree::Local(worktree) => worktree.snapshot(),
 282            Worktree::Remote(worktree) => worktree.snapshot(),
 283        }
 284    }
 285
 286    pub fn replica_id(&self) -> ReplicaId {
 287        match self {
 288            Worktree::Local(_) => 0,
 289            Worktree::Remote(worktree) => worktree.replica_id,
 290        }
 291    }
 292
 293    pub fn handle_add_peer(
 294        &mut self,
 295        envelope: TypedEnvelope<proto::AddPeer>,
 296        _: Arc<rpc::Client>,
 297        cx: &mut ModelContext<Self>,
 298    ) -> Result<()> {
 299        match self {
 300            Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
 301            Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
 302        }
 303    }
 304
 305    pub fn handle_remove_peer(
 306        &mut self,
 307        envelope: TypedEnvelope<proto::RemovePeer>,
 308        _: Arc<rpc::Client>,
 309        cx: &mut ModelContext<Self>,
 310    ) -> Result<()> {
 311        match self {
 312            Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
 313            Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
 314        }
 315    }
 316
 317    pub fn handle_update(
 318        &mut self,
 319        envelope: TypedEnvelope<proto::UpdateWorktree>,
 320        _: Arc<rpc::Client>,
 321        cx: &mut ModelContext<Self>,
 322    ) -> anyhow::Result<()> {
 323        self.as_remote_mut()
 324            .unwrap()
 325            .update_from_remote(envelope, cx)
 326    }
 327
 328    pub fn handle_open_buffer(
 329        &mut self,
 330        envelope: TypedEnvelope<proto::OpenBuffer>,
 331        rpc: Arc<rpc::Client>,
 332        cx: &mut ModelContext<Self>,
 333    ) -> anyhow::Result<()> {
 334        let receipt = envelope.receipt();
 335
 336        let response = self
 337            .as_local_mut()
 338            .unwrap()
 339            .open_remote_buffer(envelope, cx);
 340
 341        cx.background()
 342            .spawn(
 343                async move {
 344                    rpc.respond(receipt, response.await?).await?;
 345                    Ok(())
 346                }
 347                .log_err(),
 348            )
 349            .detach();
 350
 351        Ok(())
 352    }
 353
 354    pub fn handle_close_buffer(
 355        &mut self,
 356        envelope: TypedEnvelope<proto::CloseBuffer>,
 357        _: Arc<rpc::Client>,
 358        cx: &mut ModelContext<Self>,
 359    ) -> anyhow::Result<()> {
 360        self.as_local_mut()
 361            .unwrap()
 362            .close_remote_buffer(envelope, cx)
 363    }
 364
 365    pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
 366        match self {
 367            Worktree::Local(worktree) => &worktree.peers,
 368            Worktree::Remote(worktree) => &worktree.peers,
 369        }
 370    }
 371
 372    pub fn open_buffer(
 373        &mut self,
 374        path: impl AsRef<Path>,
 375        cx: &mut ModelContext<Self>,
 376    ) -> Task<Result<ModelHandle<Buffer>>> {
 377        match self {
 378            Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
 379            Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
 380        }
 381    }
 382
 383    #[cfg(feature = "test-support")]
 384    pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
 385        let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
 386            Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
 387            Worktree::Remote(worktree) => {
 388                Box::new(worktree.open_buffers.values().filter_map(|buf| {
 389                    if let RemoteBuffer::Loaded(buf) = buf {
 390                        Some(buf)
 391                    } else {
 392                        None
 393                    }
 394                }))
 395            }
 396        };
 397
 398        let path = path.as_ref();
 399        open_buffers
 400            .find(|buffer| {
 401                if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
 402                    file.path.as_ref() == path
 403                } else {
 404                    false
 405                }
 406            })
 407            .is_some()
 408    }
 409
 410    pub fn handle_update_buffer(
 411        &mut self,
 412        envelope: TypedEnvelope<proto::UpdateBuffer>,
 413        _: Arc<rpc::Client>,
 414        cx: &mut ModelContext<Self>,
 415    ) -> Result<()> {
 416        let payload = envelope.payload.clone();
 417        let buffer_id = payload.buffer_id as usize;
 418        let ops = payload
 419            .operations
 420            .into_iter()
 421            .map(|op| op.try_into())
 422            .collect::<anyhow::Result<Vec<_>>>()?;
 423
 424        match self {
 425            Worktree::Local(worktree) => {
 426                let buffer = worktree
 427                    .open_buffers
 428                    .get(&buffer_id)
 429                    .and_then(|buf| buf.upgrade(cx))
 430                    .ok_or_else(|| {
 431                        anyhow!("invalid buffer {} in update buffer message", buffer_id)
 432                    })?;
 433                buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
 434            }
 435            Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
 436                Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
 437                Some(RemoteBuffer::Loaded(buffer)) => {
 438                    if let Some(buffer) = buffer.upgrade(cx) {
 439                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
 440                    } else {
 441                        worktree
 442                            .open_buffers
 443                            .insert(buffer_id, RemoteBuffer::Operations(ops));
 444                    }
 445                }
 446                None => {
 447                    worktree
 448                        .open_buffers
 449                        .insert(buffer_id, RemoteBuffer::Operations(ops));
 450                }
 451            },
 452        }
 453
 454        Ok(())
 455    }
 456
 457    pub fn handle_save_buffer(
 458        &mut self,
 459        envelope: TypedEnvelope<proto::SaveBuffer>,
 460        rpc: Arc<rpc::Client>,
 461        cx: &mut ModelContext<Self>,
 462    ) -> Result<()> {
 463        let sender_id = envelope.original_sender_id()?;
 464        let buffer = self
 465            .as_local()
 466            .unwrap()
 467            .shared_buffers
 468            .get(&sender_id)
 469            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
 470            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
 471
 472        let receipt = envelope.receipt();
 473        let worktree_id = envelope.payload.worktree_id;
 474        let buffer_id = envelope.payload.buffer_id;
 475        let save = cx.spawn(|_, mut cx| async move {
 476            buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
 477        });
 478
 479        cx.background()
 480            .spawn(
 481                async move {
 482                    let (version, mtime) = save.await?;
 483
 484                    rpc.respond(
 485                        receipt,
 486                        proto::BufferSaved {
 487                            worktree_id,
 488                            buffer_id,
 489                            version: (&version).into(),
 490                            mtime: Some(mtime.into()),
 491                        },
 492                    )
 493                    .await?;
 494
 495                    Ok(())
 496                }
 497                .log_err(),
 498            )
 499            .detach();
 500
 501        Ok(())
 502    }
 503
 504    pub fn handle_buffer_saved(
 505        &mut self,
 506        envelope: TypedEnvelope<proto::BufferSaved>,
 507        _: Arc<rpc::Client>,
 508        cx: &mut ModelContext<Self>,
 509    ) -> Result<()> {
 510        let payload = envelope.payload.clone();
 511        let worktree = self.as_remote_mut().unwrap();
 512        if let Some(buffer) = worktree
 513            .open_buffers
 514            .get(&(payload.buffer_id as usize))
 515            .and_then(|buf| buf.upgrade(cx))
 516        {
 517            buffer.update(cx, |buffer, cx| {
 518                let version = payload.version.try_into()?;
 519                let mtime = payload
 520                    .mtime
 521                    .ok_or_else(|| anyhow!("missing mtime"))?
 522                    .into();
 523                buffer.did_save(version, mtime, cx);
 524                Result::<_, anyhow::Error>::Ok(())
 525            })?;
 526        }
 527        Ok(())
 528    }
 529
 530    pub fn handle_unshare(
 531        &mut self,
 532        _: TypedEnvelope<proto::UnshareWorktree>,
 533        _: Arc<rpc::Client>,
 534        cx: &mut ModelContext<Self>,
 535    ) -> Result<()> {
 536        cx.emit(Event::Closed);
 537        Ok(())
 538    }
 539
 540    fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
 541        match self {
 542            Self::Local(worktree) => {
 543                let is_fake_fs = worktree.fs.is_fake();
 544                worktree.snapshot = worktree.background_snapshot.lock().clone();
 545                if worktree.is_scanning() {
 546                    if worktree.poll_task.is_none() {
 547                        worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
 548                            if is_fake_fs {
 549                                smol::future::yield_now().await;
 550                            } else {
 551                                smol::Timer::after(Duration::from_millis(100)).await;
 552                            }
 553                            this.update(&mut cx, |this, cx| {
 554                                this.as_local_mut().unwrap().poll_task = None;
 555                                this.poll_snapshot(cx);
 556                            })
 557                        }));
 558                    }
 559                } else {
 560                    worktree.poll_task.take();
 561                    self.update_open_buffers(cx);
 562                }
 563            }
 564            Self::Remote(worktree) => {
 565                worktree.snapshot = worktree.snapshot_rx.borrow().clone();
 566                self.update_open_buffers(cx);
 567            }
 568        };
 569
 570        cx.notify();
 571    }
 572
 573    fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
 574        let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
 575            Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
 576            Self::Remote(worktree) => {
 577                Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
 578                    if let RemoteBuffer::Loaded(buf) = buf {
 579                        Some((id, buf))
 580                    } else {
 581                        None
 582                    }
 583                }))
 584            }
 585        };
 586
 587        let mut buffers_to_delete = Vec::new();
 588        for (buffer_id, buffer) in open_buffers {
 589            if let Some(buffer) = buffer.upgrade(cx) {
 590                buffer.update(cx, |buffer, cx| {
 591                    let buffer_is_clean = !buffer.is_dirty();
 592
 593                    if let Some(file) = buffer.file_mut() {
 594                        let mut file_changed = false;
 595
 596                        if let Some(entry) = file
 597                            .entry_id
 598                            .and_then(|entry_id| self.entry_for_id(entry_id))
 599                        {
 600                            if entry.path != file.path {
 601                                file.path = entry.path.clone();
 602                                file_changed = true;
 603                            }
 604
 605                            if entry.mtime != file.mtime {
 606                                file.mtime = entry.mtime;
 607                                file_changed = true;
 608                                if let Some(worktree) = self.as_local() {
 609                                    if buffer_is_clean {
 610                                        let abs_path = worktree.absolutize(&file.path);
 611                                        refresh_buffer(abs_path, &worktree.fs, cx);
 612                                    }
 613                                }
 614                            }
 615                        } else if let Some(entry) = self.entry_for_path(&file.path) {
 616                            file.entry_id = Some(entry.id);
 617                            file.mtime = entry.mtime;
 618                            if let Some(worktree) = self.as_local() {
 619                                if buffer_is_clean {
 620                                    let abs_path = worktree.absolutize(&file.path);
 621                                    refresh_buffer(abs_path, &worktree.fs, cx);
 622                                }
 623                            }
 624                            file_changed = true;
 625                        } else if !file.is_deleted() {
 626                            if buffer_is_clean {
 627                                cx.emit(editor::buffer::Event::Dirtied);
 628                            }
 629                            file.entry_id = None;
 630                            file_changed = true;
 631                        }
 632
 633                        if file_changed {
 634                            cx.emit(editor::buffer::Event::FileHandleChanged);
 635                        }
 636                    }
 637                });
 638            } else {
 639                buffers_to_delete.push(*buffer_id);
 640            }
 641        }
 642
 643        for buffer_id in buffers_to_delete {
 644            match self {
 645                Self::Local(worktree) => {
 646                    worktree.open_buffers.remove(&buffer_id);
 647                }
 648                Self::Remote(worktree) => {
 649                    worktree.open_buffers.remove(&buffer_id);
 650                }
 651            }
 652        }
 653    }
 654}
 655
 656impl Deref for Worktree {
 657    type Target = Snapshot;
 658
 659    fn deref(&self) -> &Self::Target {
 660        match self {
 661            Worktree::Local(worktree) => &worktree.snapshot,
 662            Worktree::Remote(worktree) => &worktree.snapshot,
 663        }
 664    }
 665}
 666
 667pub struct LocalWorktree {
 668    snapshot: Snapshot,
 669    config: WorktreeConfig,
 670    background_snapshot: Arc<Mutex<Snapshot>>,
 671    last_scan_state_rx: watch::Receiver<ScanState>,
 672    _background_scanner_task: Option<Task<()>>,
 673    _maintain_remote_id_task: Task<Option<()>>,
 674    poll_task: Option<Task<()>>,
 675    remote_id: watch::Receiver<Option<u64>>,
 676    share: Option<ShareState>,
 677    open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
 678    shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
 679    peers: HashMap<PeerId, ReplicaId>,
 680    languages: Arc<LanguageRegistry>,
 681    queued_operations: Vec<(u64, Operation)>,
 682    rpc: Arc<rpc::Client>,
 683    fs: Arc<dyn Fs>,
 684}
 685
 686#[derive(Default, Deserialize)]
 687struct WorktreeConfig {
 688    collaborators: Vec<String>,
 689}
 690
 691impl LocalWorktree {
 692    async fn new(
 693        rpc: Arc<rpc::Client>,
 694        path: impl Into<Arc<Path>>,
 695        fs: Arc<dyn Fs>,
 696        languages: Arc<LanguageRegistry>,
 697        cx: &mut AsyncAppContext,
 698    ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
 699        let abs_path = path.into();
 700        let path: Arc<Path> = Arc::from(Path::new(""));
 701        let next_entry_id = AtomicUsize::new(0);
 702
 703        // After determining whether the root entry is a file or a directory, populate the
 704        // snapshot's "root name", which will be used for the purpose of fuzzy matching.
 705        let root_name = abs_path
 706            .file_name()
 707            .map_or(String::new(), |f| f.to_string_lossy().to_string());
 708        let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
 709        let metadata = fs.metadata(&abs_path).await?;
 710
 711        let mut config = WorktreeConfig::default();
 712        if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
 713            if let Ok(parsed) = toml::from_str(&zed_toml) {
 714                config = parsed;
 715            }
 716        }
 717
 718        let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
 719        let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
 720        let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
 721            let mut snapshot = Snapshot {
 722                id: cx.model_id(),
 723                scan_id: 0,
 724                abs_path,
 725                root_name: root_name.clone(),
 726                root_char_bag,
 727                ignores: Default::default(),
 728                entries_by_path: Default::default(),
 729                entries_by_id: Default::default(),
 730                removed_entry_ids: Default::default(),
 731                next_entry_id: Arc::new(next_entry_id),
 732            };
 733            if let Some(metadata) = metadata {
 734                snapshot.insert_entry(Entry::new(
 735                    path.into(),
 736                    &metadata,
 737                    &snapshot.next_entry_id,
 738                    snapshot.root_char_bag,
 739                ));
 740            }
 741
 742            let (mut remote_id_tx, remote_id_rx) = watch::channel();
 743            let _maintain_remote_id_task = cx.spawn_weak({
 744                let rpc = rpc.clone();
 745                move |this, cx| {
 746                    async move {
 747                        let mut status = rpc.status();
 748                        while let Some(status) = status.recv().await {
 749                            if let Some(this) = this.upgrade(&cx) {
 750                                let remote_id = if let Status::Connected { .. } = status {
 751                                    let collaborator_logins = this.read_with(&cx, |this, _| {
 752                                        this.as_local().unwrap().config.collaborators.clone()
 753                                    });
 754                                    let response = rpc
 755                                        .request(proto::OpenWorktree {
 756                                            root_name: root_name.clone(),
 757                                            collaborator_logins,
 758                                        })
 759                                        .await?;
 760
 761                                    Some(response.worktree_id)
 762                                } else {
 763                                    None
 764                                };
 765                                if remote_id_tx.send(remote_id).await.is_err() {
 766                                    break;
 767                                }
 768                            }
 769                        }
 770                        Ok(())
 771                    }
 772                    .log_err()
 773                }
 774            });
 775
 776            let tree = Self {
 777                snapshot: snapshot.clone(),
 778                config,
 779                remote_id: remote_id_rx,
 780                background_snapshot: Arc::new(Mutex::new(snapshot)),
 781                last_scan_state_rx,
 782                _background_scanner_task: None,
 783                _maintain_remote_id_task,
 784                share: None,
 785                poll_task: None,
 786                open_buffers: Default::default(),
 787                shared_buffers: Default::default(),
 788                queued_operations: Default::default(),
 789                peers: Default::default(),
 790                languages,
 791                rpc,
 792                fs,
 793            };
 794
 795            cx.spawn_weak(|this, mut cx| async move {
 796                while let Ok(scan_state) = scan_states_rx.recv().await {
 797                    if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
 798                        let to_send = handle.update(&mut cx, |this, cx| {
 799                            last_scan_state_tx.blocking_send(scan_state).ok();
 800                            this.poll_snapshot(cx);
 801                            let tree = this.as_local_mut().unwrap();
 802                            if !tree.is_scanning() {
 803                                if let Some(share) = tree.share.as_ref() {
 804                                    return Some((tree.snapshot(), share.snapshots_tx.clone()));
 805                                }
 806                            }
 807                            None
 808                        });
 809
 810                        if let Some((snapshot, snapshots_to_send_tx)) = to_send {
 811                            if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
 812                                log::error!("error submitting snapshot to send {}", err);
 813                            }
 814                        }
 815                    } else {
 816                        break;
 817                    }
 818                }
 819            })
 820            .detach();
 821
 822            Worktree::Local(tree)
 823        });
 824
 825        Ok((tree, scan_states_tx))
 826    }
 827
 828    pub fn open_buffer(
 829        &mut self,
 830        path: &Path,
 831        cx: &mut ModelContext<Worktree>,
 832    ) -> Task<Result<ModelHandle<Buffer>>> {
 833        let handle = cx.handle();
 834
 835        // If there is already a buffer for the given path, then return it.
 836        let mut existing_buffer = None;
 837        self.open_buffers.retain(|_buffer_id, buffer| {
 838            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
 839                if let Some(file) = buffer.read(cx.as_ref()).file() {
 840                    if file.worktree_id() == handle.id() && file.path.as_ref() == path {
 841                        existing_buffer = Some(buffer);
 842                    }
 843                }
 844                true
 845            } else {
 846                false
 847            }
 848        });
 849
 850        let path = Arc::from(path);
 851        cx.spawn(|this, mut cx| async move {
 852            if let Some(existing_buffer) = existing_buffer {
 853                Ok(existing_buffer)
 854            } else {
 855                let (file, contents) = this
 856                    .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
 857                    .await?;
 858                let buffer = cx.add_model(|cx| {
 859                    let language = file.select_language(cx);
 860                    Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
 861                });
 862                this.update(&mut cx, |this, _| {
 863                    let this = this
 864                        .as_local_mut()
 865                        .ok_or_else(|| anyhow!("must be a local worktree"))?;
 866                    this.open_buffers.insert(buffer.id(), buffer.downgrade());
 867                    Ok(buffer)
 868                })
 869            }
 870        })
 871    }
 872
 873    pub fn open_remote_buffer(
 874        &mut self,
 875        envelope: TypedEnvelope<proto::OpenBuffer>,
 876        cx: &mut ModelContext<Worktree>,
 877    ) -> Task<Result<proto::OpenBufferResponse>> {
 878        let peer_id = envelope.original_sender_id();
 879        let path = Path::new(&envelope.payload.path);
 880
 881        let buffer = self.open_buffer(path, cx);
 882
 883        cx.spawn(|this, mut cx| async move {
 884            let buffer = buffer.await?;
 885            this.update(&mut cx, |this, cx| {
 886                this.as_local_mut()
 887                    .unwrap()
 888                    .shared_buffers
 889                    .entry(peer_id?)
 890                    .or_default()
 891                    .insert(buffer.id() as u64, buffer.clone());
 892
 893                Ok(proto::OpenBufferResponse {
 894                    buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
 895                })
 896            })
 897        })
 898    }
 899
 900    pub fn close_remote_buffer(
 901        &mut self,
 902        envelope: TypedEnvelope<proto::CloseBuffer>,
 903        cx: &mut ModelContext<Worktree>,
 904    ) -> Result<()> {
 905        if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
 906            shared_buffers.remove(&envelope.payload.buffer_id);
 907            cx.notify();
 908        }
 909
 910        Ok(())
 911    }
 912
 913    pub fn add_peer(
 914        &mut self,
 915        envelope: TypedEnvelope<proto::AddPeer>,
 916        cx: &mut ModelContext<Worktree>,
 917    ) -> Result<()> {
 918        let peer = envelope
 919            .payload
 920            .peer
 921            .as_ref()
 922            .ok_or_else(|| anyhow!("empty peer"))?;
 923        self.peers
 924            .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
 925        cx.notify();
 926
 927        Ok(())
 928    }
 929
 930    pub fn remove_peer(
 931        &mut self,
 932        envelope: TypedEnvelope<proto::RemovePeer>,
 933        cx: &mut ModelContext<Worktree>,
 934    ) -> Result<()> {
 935        let peer_id = PeerId(envelope.payload.peer_id);
 936        let replica_id = self
 937            .peers
 938            .remove(&peer_id)
 939            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
 940        self.shared_buffers.remove(&peer_id);
 941        for (_, buffer) in &self.open_buffers {
 942            if let Some(buffer) = buffer.upgrade(cx) {
 943                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
 944            }
 945        }
 946        cx.notify();
 947
 948        Ok(())
 949    }
 950
 951    pub fn scan_complete(&self) -> impl Future<Output = ()> {
 952        let mut scan_state_rx = self.last_scan_state_rx.clone();
 953        async move {
 954            let mut scan_state = Some(scan_state_rx.borrow().clone());
 955            while let Some(ScanState::Scanning) = scan_state {
 956                scan_state = scan_state_rx.recv().await;
 957            }
 958        }
 959    }
 960
 961    pub fn remote_id(&self) -> Option<u64> {
 962        *self.remote_id.borrow()
 963    }
 964
 965    pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
 966        let mut remote_id = self.remote_id.clone();
 967        async move {
 968            while let Some(remote_id) = remote_id.recv().await {
 969                if remote_id.is_some() {
 970                    return remote_id;
 971                }
 972            }
 973            None
 974        }
 975    }
 976
 977    fn is_scanning(&self) -> bool {
 978        if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
 979            true
 980        } else {
 981            false
 982        }
 983    }
 984
 985    pub fn snapshot(&self) -> Snapshot {
 986        self.snapshot.clone()
 987    }
 988
 989    pub fn abs_path(&self) -> &Path {
 990        self.snapshot.abs_path.as_ref()
 991    }
 992
 993    pub fn contains_abs_path(&self, path: &Path) -> bool {
 994        path.starts_with(&self.snapshot.abs_path)
 995    }
 996
 997    fn absolutize(&self, path: &Path) -> PathBuf {
 998        if path.file_name().is_some() {
 999            self.snapshot.abs_path.join(path)
1000        } else {
1001            self.snapshot.abs_path.to_path_buf()
1002        }
1003    }
1004
1005    fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1006        let handle = cx.handle();
1007        let path = Arc::from(path);
1008        let abs_path = self.absolutize(&path);
1009        let background_snapshot = self.background_snapshot.clone();
1010        let fs = self.fs.clone();
1011        cx.spawn(|this, mut cx| async move {
1012            let text = fs.load(&abs_path).await?;
1013            // Eagerly populate the snapshot with an updated entry for the loaded file
1014            let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1015            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1016            Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
1017        })
1018    }
1019
1020    pub fn save_buffer_as(
1021        &self,
1022        buffer: ModelHandle<Buffer>,
1023        path: impl Into<Arc<Path>>,
1024        text: Rope,
1025        cx: &mut ModelContext<Worktree>,
1026    ) -> Task<Result<File>> {
1027        let save = self.save(path, text, cx);
1028        cx.spawn(|this, mut cx| async move {
1029            let entry = save.await?;
1030            this.update(&mut cx, |this, cx| {
1031                this.as_local_mut()
1032                    .unwrap()
1033                    .open_buffers
1034                    .insert(buffer.id(), buffer.downgrade());
1035                Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
1036            })
1037        })
1038    }
1039
1040    fn save(
1041        &self,
1042        path: impl Into<Arc<Path>>,
1043        text: Rope,
1044        cx: &mut ModelContext<Worktree>,
1045    ) -> Task<Result<Entry>> {
1046        let path = path.into();
1047        let abs_path = self.absolutize(&path);
1048        let background_snapshot = self.background_snapshot.clone();
1049        let fs = self.fs.clone();
1050        let save = cx.background().spawn(async move {
1051            fs.save(&abs_path, &text).await?;
1052            refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1053        });
1054
1055        cx.spawn(|this, mut cx| async move {
1056            let entry = save.await?;
1057            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1058            Ok(entry)
1059        })
1060    }
1061
1062    pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1063        let snapshot = self.snapshot();
1064        let share_request = self.share_request(cx);
1065        let rpc = self.rpc.clone();
1066        cx.spawn(|this, mut cx| async move {
1067            let share_request = if let Some(request) = share_request.await {
1068                request
1069            } else {
1070                return Err(anyhow!("failed to open worktree on the server"));
1071            };
1072
1073            let remote_id = share_request.worktree.as_ref().unwrap().id;
1074            let share_response = rpc.request(share_request).await?;
1075
1076            log::info!("sharing worktree {:?}", share_response);
1077            let (snapshots_to_send_tx, snapshots_to_send_rx) =
1078                smol::channel::unbounded::<Snapshot>();
1079
1080            cx.background()
1081                .spawn({
1082                    let rpc = rpc.clone();
1083                    async move {
1084                        let mut prev_snapshot = snapshot;
1085                        while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1086                            let message = snapshot.build_update(&prev_snapshot, remote_id);
1087                            match rpc.send(message).await {
1088                                Ok(()) => prev_snapshot = snapshot,
1089                                Err(err) => log::error!("error sending snapshot diff {}", err),
1090                            }
1091                        }
1092                    }
1093                })
1094                .detach();
1095
1096            this.update(&mut cx, |worktree, cx| {
1097                let _subscriptions = vec![
1098                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer),
1099                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer),
1100                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1101                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1102                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1103                    rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1104                ];
1105
1106                let worktree = worktree.as_local_mut().unwrap();
1107                worktree.share = Some(ShareState {
1108                    snapshots_tx: snapshots_to_send_tx,
1109                    _subscriptions,
1110                });
1111            });
1112
1113            Ok(remote_id)
1114        })
1115    }
1116
1117    pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1118        self.share.take();
1119        let rpc = self.rpc.clone();
1120        let remote_id = self.remote_id();
1121        cx.foreground()
1122            .spawn(
1123                async move {
1124                    if let Some(worktree_id) = remote_id {
1125                        rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1126                    }
1127                    Ok(())
1128                }
1129                .log_err(),
1130            )
1131            .detach()
1132    }
1133
1134    fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1135        let remote_id = self.next_remote_id();
1136        let snapshot = self.snapshot();
1137        let root_name = self.root_name.clone();
1138        cx.background().spawn(async move {
1139            remote_id.await.map(|id| {
1140                let entries = snapshot
1141                    .entries_by_path
1142                    .cursor::<(), ()>()
1143                    .map(Into::into)
1144                    .collect();
1145                proto::ShareWorktree {
1146                    worktree: Some(proto::Worktree {
1147                        id,
1148                        root_name,
1149                        entries,
1150                    }),
1151                }
1152            })
1153        })
1154    }
1155}
1156
1157pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1158    let fs = fs.clone();
1159    cx.spawn(|buffer, mut cx| async move {
1160        let new_text = fs.load(&abs_path).await;
1161        match new_text {
1162            Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1163            Ok(new_text) => {
1164                buffer
1165                    .update(&mut cx, |buffer, cx| {
1166                        buffer.set_text_from_disk(new_text.into(), cx)
1167                    })
1168                    .await;
1169            }
1170        }
1171    })
1172    .detach()
1173}
1174
1175impl Deref for LocalWorktree {
1176    type Target = Snapshot;
1177
1178    fn deref(&self) -> &Self::Target {
1179        &self.snapshot
1180    }
1181}
1182
1183impl fmt::Debug for LocalWorktree {
1184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1185        self.snapshot.fmt(f)
1186    }
1187}
1188
1189struct ShareState {
1190    snapshots_tx: Sender<Snapshot>,
1191    _subscriptions: Vec<rpc::Subscription>,
1192}
1193
1194pub struct RemoteWorktree {
1195    remote_id: u64,
1196    snapshot: Snapshot,
1197    snapshot_rx: watch::Receiver<Snapshot>,
1198    rpc: Arc<rpc::Client>,
1199    updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1200    replica_id: ReplicaId,
1201    open_buffers: HashMap<usize, RemoteBuffer>,
1202    peers: HashMap<PeerId, ReplicaId>,
1203    languages: Arc<LanguageRegistry>,
1204    queued_operations: Vec<(u64, Operation)>,
1205    _subscriptions: Vec<rpc::Subscription>,
1206}
1207
1208impl RemoteWorktree {
1209    pub fn open_buffer(
1210        &mut self,
1211        path: &Path,
1212        cx: &mut ModelContext<Worktree>,
1213    ) -> Task<Result<ModelHandle<Buffer>>> {
1214        let mut existing_buffer = None;
1215        self.open_buffers.retain(|_buffer_id, buffer| {
1216            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1217                if let Some(file) = buffer.read(cx.as_ref()).file() {
1218                    if file.worktree_id() == cx.model_id() && file.path.as_ref() == path {
1219                        existing_buffer = Some(buffer);
1220                    }
1221                }
1222                true
1223            } else {
1224                false
1225            }
1226        });
1227
1228        let rpc = self.rpc.clone();
1229        let replica_id = self.replica_id;
1230        let remote_worktree_id = self.remote_id;
1231        let path = path.to_string_lossy().to_string();
1232        cx.spawn_weak(|this, mut cx| async move {
1233            if let Some(existing_buffer) = existing_buffer {
1234                Ok(existing_buffer)
1235            } else {
1236                let entry = this
1237                    .upgrade(&cx)
1238                    .ok_or_else(|| anyhow!("worktree was closed"))?
1239                    .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1240                    .ok_or_else(|| anyhow!("file does not exist"))?;
1241                let response = rpc
1242                    .request(proto::OpenBuffer {
1243                        worktree_id: remote_worktree_id as u64,
1244                        path,
1245                    })
1246                    .await?;
1247
1248                let this = this
1249                    .upgrade(&cx)
1250                    .ok_or_else(|| anyhow!("worktree was closed"))?;
1251                let file = File::new(entry.id, this.clone(), entry.path, entry.mtime);
1252                let language = cx.read(|cx| file.select_language(cx));
1253                let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1254                let buffer_id = remote_buffer.id as usize;
1255                let buffer = cx.add_model(|cx| {
1256                    Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1257                });
1258                this.update(&mut cx, |this, cx| {
1259                    let this = this.as_remote_mut().unwrap();
1260                    if let Some(RemoteBuffer::Operations(pending_ops)) = this
1261                        .open_buffers
1262                        .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1263                    {
1264                        buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1265                    }
1266                    Result::<_, anyhow::Error>::Ok(())
1267                })?;
1268                Ok(buffer)
1269            }
1270        })
1271    }
1272
1273    pub fn remote_id(&self) -> u64 {
1274        self.remote_id
1275    }
1276
1277    pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1278        for (_, buffer) in self.open_buffers.drain() {
1279            if let RemoteBuffer::Loaded(buffer) = buffer {
1280                if let Some(buffer) = buffer.upgrade(cx) {
1281                    buffer.update(cx, |buffer, cx| buffer.close(cx))
1282                }
1283            }
1284        }
1285    }
1286
1287    fn snapshot(&self) -> Snapshot {
1288        self.snapshot.clone()
1289    }
1290
1291    fn update_from_remote(
1292        &mut self,
1293        envelope: TypedEnvelope<proto::UpdateWorktree>,
1294        cx: &mut ModelContext<Worktree>,
1295    ) -> Result<()> {
1296        let mut tx = self.updates_tx.clone();
1297        let payload = envelope.payload.clone();
1298        cx.background()
1299            .spawn(async move {
1300                tx.send(payload).await.expect("receiver runs to completion");
1301            })
1302            .detach();
1303
1304        Ok(())
1305    }
1306
1307    pub fn add_peer(
1308        &mut self,
1309        envelope: TypedEnvelope<proto::AddPeer>,
1310        cx: &mut ModelContext<Worktree>,
1311    ) -> Result<()> {
1312        let peer = envelope
1313            .payload
1314            .peer
1315            .as_ref()
1316            .ok_or_else(|| anyhow!("empty peer"))?;
1317        self.peers
1318            .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1319        cx.notify();
1320        Ok(())
1321    }
1322
1323    pub fn remove_peer(
1324        &mut self,
1325        envelope: TypedEnvelope<proto::RemovePeer>,
1326        cx: &mut ModelContext<Worktree>,
1327    ) -> Result<()> {
1328        let peer_id = PeerId(envelope.payload.peer_id);
1329        let replica_id = self
1330            .peers
1331            .remove(&peer_id)
1332            .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1333        for (_, buffer) in &self.open_buffers {
1334            if let Some(buffer) = buffer.upgrade(cx) {
1335                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1336            }
1337        }
1338        cx.notify();
1339        Ok(())
1340    }
1341}
1342
1343enum RemoteBuffer {
1344    Operations(Vec<Operation>),
1345    Loaded(WeakModelHandle<Buffer>),
1346}
1347
1348impl RemoteBuffer {
1349    fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1350        match self {
1351            Self::Operations(_) => None,
1352            Self::Loaded(buffer) => buffer.upgrade(cx),
1353        }
1354    }
1355}
1356
1357#[derive(Clone)]
1358pub struct Snapshot {
1359    id: usize,
1360    scan_id: usize,
1361    abs_path: Arc<Path>,
1362    root_name: String,
1363    root_char_bag: CharBag,
1364    ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1365    entries_by_path: SumTree<Entry>,
1366    entries_by_id: SumTree<PathEntry>,
1367    removed_entry_ids: HashMap<u64, usize>,
1368    next_entry_id: Arc<AtomicUsize>,
1369}
1370
1371impl Snapshot {
1372    pub fn id(&self) -> usize {
1373        self.id
1374    }
1375
1376    pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1377        let mut updated_entries = Vec::new();
1378        let mut removed_entries = Vec::new();
1379        let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1380        let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1381        loop {
1382            match (self_entries.peek(), other_entries.peek()) {
1383                (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1384                    Ordering::Less => {
1385                        let entry = self.entry_for_id(self_entry.id).unwrap().into();
1386                        updated_entries.push(entry);
1387                        self_entries.next();
1388                    }
1389                    Ordering::Equal => {
1390                        if self_entry.scan_id != other_entry.scan_id {
1391                            let entry = self.entry_for_id(self_entry.id).unwrap().into();
1392                            updated_entries.push(entry);
1393                        }
1394
1395                        self_entries.next();
1396                        other_entries.next();
1397                    }
1398                    Ordering::Greater => {
1399                        removed_entries.push(other_entry.id as u64);
1400                        other_entries.next();
1401                    }
1402                },
1403                (Some(self_entry), None) => {
1404                    let entry = self.entry_for_id(self_entry.id).unwrap().into();
1405                    updated_entries.push(entry);
1406                    self_entries.next();
1407                }
1408                (None, Some(other_entry)) => {
1409                    removed_entries.push(other_entry.id as u64);
1410                    other_entries.next();
1411                }
1412                (None, None) => break,
1413            }
1414        }
1415
1416        proto::UpdateWorktree {
1417            updated_entries,
1418            removed_entries,
1419            worktree_id,
1420        }
1421    }
1422
1423    fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1424        self.scan_id += 1;
1425        let scan_id = self.scan_id;
1426
1427        let mut entries_by_path_edits = Vec::new();
1428        let mut entries_by_id_edits = Vec::new();
1429        for entry_id in update.removed_entries {
1430            let entry_id = entry_id as usize;
1431            let entry = self
1432                .entry_for_id(entry_id)
1433                .ok_or_else(|| anyhow!("unknown entry"))?;
1434            entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1435            entries_by_id_edits.push(Edit::Remove(entry.id));
1436        }
1437
1438        for entry in update.updated_entries {
1439            let entry = Entry::try_from((&self.root_char_bag, entry))?;
1440            if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1441                entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1442            }
1443            entries_by_id_edits.push(Edit::Insert(PathEntry {
1444                id: entry.id,
1445                path: entry.path.clone(),
1446                scan_id,
1447            }));
1448            entries_by_path_edits.push(Edit::Insert(entry));
1449        }
1450
1451        self.entries_by_path.edit(entries_by_path_edits, &());
1452        self.entries_by_id.edit(entries_by_id_edits, &());
1453
1454        Ok(())
1455    }
1456
1457    pub fn file_count(&self) -> usize {
1458        self.entries_by_path.summary().file_count
1459    }
1460
1461    pub fn visible_file_count(&self) -> usize {
1462        self.entries_by_path.summary().visible_file_count
1463    }
1464
1465    pub fn files(&self, start: usize) -> FileIter {
1466        FileIter::all(self, start)
1467    }
1468
1469    pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1470        let empty_path = Path::new("");
1471        self.entries_by_path
1472            .cursor::<(), ()>()
1473            .filter(move |entry| entry.path.as_ref() != empty_path)
1474            .map(|entry| &entry.path)
1475    }
1476
1477    pub fn visible_files(&self, start: usize) -> FileIter {
1478        FileIter::visible(self, start)
1479    }
1480
1481    fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1482        ChildEntriesIter::new(path, self)
1483    }
1484
1485    pub fn root_entry(&self) -> Option<&Entry> {
1486        self.entry_for_path("")
1487    }
1488
1489    pub fn root_name(&self) -> &str {
1490        &self.root_name
1491    }
1492
1493    fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1494        let mut cursor = self.entries_by_path.cursor::<_, ()>();
1495        if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1496            cursor.item()
1497        } else {
1498            None
1499        }
1500    }
1501
1502    fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1503        let entry = self.entries_by_id.get(&id, &())?;
1504        self.entry_for_path(&entry.path)
1505    }
1506
1507    pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1508        self.entry_for_path(path.as_ref()).map(|e| e.inode)
1509    }
1510
1511    fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1512        if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1513            let (ignore, err) = Gitignore::new(self.abs_path.join(&entry.path));
1514            if let Some(err) = err {
1515                log::error!("error in ignore file {:?} - {:?}", &entry.path, err);
1516            }
1517
1518            let ignore_dir_path = entry.path.parent().unwrap();
1519            self.ignores
1520                .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1521        }
1522
1523        self.reuse_entry_id(&mut entry);
1524        self.entries_by_path.insert_or_replace(entry.clone(), &());
1525        self.entries_by_id.insert_or_replace(
1526            PathEntry {
1527                id: entry.id,
1528                path: entry.path.clone(),
1529                scan_id: self.scan_id,
1530            },
1531            &(),
1532        );
1533        entry
1534    }
1535
1536    fn populate_dir(
1537        &mut self,
1538        parent_path: Arc<Path>,
1539        entries: impl IntoIterator<Item = Entry>,
1540        ignore: Option<Arc<Gitignore>>,
1541    ) {
1542        let mut parent_entry = self
1543            .entries_by_path
1544            .get(&PathKey(parent_path.clone()), &())
1545            .unwrap()
1546            .clone();
1547        if let Some(ignore) = ignore {
1548            self.ignores.insert(parent_path, (ignore, self.scan_id));
1549        }
1550        if matches!(parent_entry.kind, EntryKind::PendingDir) {
1551            parent_entry.kind = EntryKind::Dir;
1552        } else {
1553            unreachable!();
1554        }
1555
1556        let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1557        let mut entries_by_id_edits = Vec::new();
1558
1559        for mut entry in entries {
1560            self.reuse_entry_id(&mut entry);
1561            entries_by_id_edits.push(Edit::Insert(PathEntry {
1562                id: entry.id,
1563                path: entry.path.clone(),
1564                scan_id: self.scan_id,
1565            }));
1566            entries_by_path_edits.push(Edit::Insert(entry));
1567        }
1568
1569        self.entries_by_path.edit(entries_by_path_edits, &());
1570        self.entries_by_id.edit(entries_by_id_edits, &());
1571    }
1572
1573    fn reuse_entry_id(&mut self, entry: &mut Entry) {
1574        if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1575            entry.id = removed_entry_id;
1576        } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1577            entry.id = existing_entry.id;
1578        }
1579    }
1580
1581    fn remove_path(&mut self, path: &Path) {
1582        let mut new_entries;
1583        let removed_entry_ids;
1584        {
1585            let mut cursor = self.entries_by_path.cursor::<_, ()>();
1586            new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1587            removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1588            new_entries.push_tree(cursor.suffix(&()), &());
1589        }
1590        self.entries_by_path = new_entries;
1591
1592        let mut entries_by_id_edits = Vec::new();
1593        for entry in removed_entry_ids.cursor::<(), ()>() {
1594            let removed_entry_id = self
1595                .removed_entry_ids
1596                .entry(entry.inode)
1597                .or_insert(entry.id);
1598            *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1599            entries_by_id_edits.push(Edit::Remove(entry.id));
1600        }
1601        self.entries_by_id.edit(entries_by_id_edits, &());
1602
1603        if path.file_name() == Some(&GITIGNORE) {
1604            if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1605                *scan_id = self.scan_id;
1606            }
1607        }
1608    }
1609
1610    fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1611        let mut new_ignores = Vec::new();
1612        for ancestor in path.ancestors().skip(1) {
1613            if let Some((ignore, _)) = self.ignores.get(ancestor) {
1614                new_ignores.push((ancestor, Some(ignore.clone())));
1615            } else {
1616                new_ignores.push((ancestor, None));
1617            }
1618        }
1619
1620        let mut ignore_stack = IgnoreStack::none();
1621        for (parent_path, ignore) in new_ignores.into_iter().rev() {
1622            if ignore_stack.is_path_ignored(&parent_path, true) {
1623                ignore_stack = IgnoreStack::all();
1624                break;
1625            } else if let Some(ignore) = ignore {
1626                ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1627            }
1628        }
1629
1630        if ignore_stack.is_path_ignored(path, is_dir) {
1631            ignore_stack = IgnoreStack::all();
1632        }
1633
1634        ignore_stack
1635    }
1636}
1637
1638impl fmt::Debug for Snapshot {
1639    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1640        for entry in self.entries_by_path.cursor::<(), ()>() {
1641            for _ in entry.path.ancestors().skip(1) {
1642                write!(f, " ")?;
1643            }
1644            writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1645        }
1646        Ok(())
1647    }
1648}
1649
1650#[derive(Clone, PartialEq)]
1651pub struct File {
1652    entry_id: Option<usize>,
1653    worktree: ModelHandle<Worktree>,
1654    pub path: Arc<Path>,
1655    pub mtime: SystemTime,
1656}
1657
1658impl File {
1659    pub fn new(
1660        entry_id: usize,
1661        worktree: ModelHandle<Worktree>,
1662        path: Arc<Path>,
1663        mtime: SystemTime,
1664    ) -> Self {
1665        Self {
1666            entry_id: Some(entry_id),
1667            worktree,
1668            path,
1669            mtime,
1670        }
1671    }
1672
1673    pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1674        self.worktree.update(cx, |worktree, cx| {
1675            if let Some((rpc, remote_id)) = match worktree {
1676                Worktree::Local(worktree) => worktree
1677                    .remote_id
1678                    .borrow()
1679                    .map(|id| (worktree.rpc.clone(), id)),
1680                Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1681            } {
1682                cx.spawn(|worktree, mut cx| async move {
1683                    if let Err(error) = rpc
1684                        .request(proto::UpdateBuffer {
1685                            worktree_id: remote_id,
1686                            buffer_id,
1687                            operations: vec![(&operation).into()],
1688                        })
1689                        .await
1690                    {
1691                        worktree.update(&mut cx, |worktree, _| {
1692                            log::error!("error sending buffer operation: {}", error);
1693                            match worktree {
1694                                Worktree::Local(t) => &mut t.queued_operations,
1695                                Worktree::Remote(t) => &mut t.queued_operations,
1696                            }
1697                            .push((buffer_id, operation));
1698                        });
1699                    }
1700                })
1701                .detach();
1702            }
1703        });
1704    }
1705
1706    pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1707        self.worktree.update(cx, |worktree, cx| {
1708            if let Worktree::Remote(worktree) = worktree {
1709                let worktree_id = worktree.remote_id;
1710                let rpc = worktree.rpc.clone();
1711                cx.background()
1712                    .spawn(async move {
1713                        if let Err(error) = rpc
1714                            .send(proto::CloseBuffer {
1715                                worktree_id,
1716                                buffer_id,
1717                            })
1718                            .await
1719                        {
1720                            log::error!("error closing remote buffer: {}", error);
1721                        }
1722                    })
1723                    .detach();
1724            }
1725        });
1726    }
1727
1728    /// Returns this file's path relative to the root of its worktree.
1729    pub fn path(&self) -> Arc<Path> {
1730        self.path.clone()
1731    }
1732
1733    pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1734        self.worktree.read(cx).abs_path.join(&self.path)
1735    }
1736
1737    pub fn select_language(&self, cx: &AppContext) -> Option<Arc<Language>> {
1738        let worktree = self.worktree.read(cx);
1739        let mut full_path = PathBuf::new();
1740        full_path.push(worktree.root_name());
1741        full_path.push(&self.path);
1742        let languages = match self.worktree.read(cx) {
1743            Worktree::Local(worktree) => &worktree.languages,
1744            Worktree::Remote(worktree) => &worktree.languages,
1745        };
1746        languages.select_language(&full_path).cloned()
1747    }
1748
1749    /// Returns the last component of this handle's absolute path. If this handle refers to the root
1750    /// of its worktree, then this method will return the name of the worktree itself.
1751    pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1752        self.path
1753            .file_name()
1754            .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1755            .map(Into::into)
1756    }
1757
1758    pub fn is_deleted(&self) -> bool {
1759        self.entry_id.is_none()
1760    }
1761
1762    pub fn exists(&self) -> bool {
1763        !self.is_deleted()
1764    }
1765
1766    pub fn save(
1767        &self,
1768        buffer_id: u64,
1769        text: Rope,
1770        version: time::Global,
1771        cx: &mut MutableAppContext,
1772    ) -> Task<Result<(time::Global, SystemTime)>> {
1773        self.worktree.update(cx, |worktree, cx| match worktree {
1774            Worktree::Local(worktree) => {
1775                let rpc = worktree.rpc.clone();
1776                let worktree_id = *worktree.remote_id.borrow();
1777                let save = worktree.save(self.path.clone(), text, cx);
1778                cx.background().spawn(async move {
1779                    let entry = save.await?;
1780                    if let Some(worktree_id) = worktree_id {
1781                        rpc.send(proto::BufferSaved {
1782                            worktree_id,
1783                            buffer_id,
1784                            version: (&version).into(),
1785                            mtime: Some(entry.mtime.into()),
1786                        })
1787                        .await?;
1788                    }
1789                    Ok((version, entry.mtime))
1790                })
1791            }
1792            Worktree::Remote(worktree) => {
1793                let rpc = worktree.rpc.clone();
1794                let worktree_id = worktree.remote_id;
1795                cx.foreground().spawn(async move {
1796                    let response = rpc
1797                        .request(proto::SaveBuffer {
1798                            worktree_id,
1799                            buffer_id,
1800                        })
1801                        .await?;
1802                    let version = response.version.try_into()?;
1803                    let mtime = response
1804                        .mtime
1805                        .ok_or_else(|| anyhow!("missing mtime"))?
1806                        .into();
1807                    Ok((version, mtime))
1808                })
1809            }
1810        })
1811    }
1812
1813    pub fn worktree_id(&self) -> usize {
1814        self.worktree.id()
1815    }
1816
1817    pub fn entry_id(&self) -> (usize, Arc<Path>) {
1818        (self.worktree.id(), self.path.clone())
1819    }
1820}
1821
1822#[derive(Clone, Debug)]
1823pub struct Entry {
1824    pub id: usize,
1825    pub kind: EntryKind,
1826    pub path: Arc<Path>,
1827    pub inode: u64,
1828    pub mtime: SystemTime,
1829    pub is_symlink: bool,
1830    pub is_ignored: bool,
1831}
1832
1833#[derive(Clone, Debug)]
1834pub enum EntryKind {
1835    PendingDir,
1836    Dir,
1837    File(CharBag),
1838}
1839
1840impl Entry {
1841    fn new(
1842        path: Arc<Path>,
1843        metadata: &fs::Metadata,
1844        next_entry_id: &AtomicUsize,
1845        root_char_bag: CharBag,
1846    ) -> Self {
1847        Self {
1848            id: next_entry_id.fetch_add(1, SeqCst),
1849            kind: if metadata.is_dir {
1850                EntryKind::PendingDir
1851            } else {
1852                EntryKind::File(char_bag_for_path(root_char_bag, &path))
1853            },
1854            path,
1855            inode: metadata.inode,
1856            mtime: metadata.mtime,
1857            is_symlink: metadata.is_symlink,
1858            is_ignored: false,
1859        }
1860    }
1861
1862    pub fn is_dir(&self) -> bool {
1863        matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1864    }
1865
1866    pub fn is_file(&self) -> bool {
1867        matches!(self.kind, EntryKind::File(_))
1868    }
1869}
1870
1871impl sum_tree::Item for Entry {
1872    type Summary = EntrySummary;
1873
1874    fn summary(&self) -> Self::Summary {
1875        let file_count;
1876        let visible_file_count;
1877        if self.is_file() {
1878            file_count = 1;
1879            if self.is_ignored {
1880                visible_file_count = 0;
1881            } else {
1882                visible_file_count = 1;
1883            }
1884        } else {
1885            file_count = 0;
1886            visible_file_count = 0;
1887        }
1888
1889        EntrySummary {
1890            max_path: self.path.clone(),
1891            file_count,
1892            visible_file_count,
1893        }
1894    }
1895}
1896
1897impl sum_tree::KeyedItem for Entry {
1898    type Key = PathKey;
1899
1900    fn key(&self) -> Self::Key {
1901        PathKey(self.path.clone())
1902    }
1903}
1904
1905#[derive(Clone, Debug)]
1906pub struct EntrySummary {
1907    max_path: Arc<Path>,
1908    file_count: usize,
1909    visible_file_count: usize,
1910}
1911
1912impl Default for EntrySummary {
1913    fn default() -> Self {
1914        Self {
1915            max_path: Arc::from(Path::new("")),
1916            file_count: 0,
1917            visible_file_count: 0,
1918        }
1919    }
1920}
1921
1922impl sum_tree::Summary for EntrySummary {
1923    type Context = ();
1924
1925    fn add_summary(&mut self, rhs: &Self, _: &()) {
1926        self.max_path = rhs.max_path.clone();
1927        self.file_count += rhs.file_count;
1928        self.visible_file_count += rhs.visible_file_count;
1929    }
1930}
1931
1932#[derive(Clone, Debug)]
1933struct PathEntry {
1934    id: usize,
1935    path: Arc<Path>,
1936    scan_id: usize,
1937}
1938
1939impl sum_tree::Item for PathEntry {
1940    type Summary = PathEntrySummary;
1941
1942    fn summary(&self) -> Self::Summary {
1943        PathEntrySummary { max_id: self.id }
1944    }
1945}
1946
1947impl sum_tree::KeyedItem for PathEntry {
1948    type Key = usize;
1949
1950    fn key(&self) -> Self::Key {
1951        self.id
1952    }
1953}
1954
1955#[derive(Clone, Debug, Default)]
1956struct PathEntrySummary {
1957    max_id: usize,
1958}
1959
1960impl sum_tree::Summary for PathEntrySummary {
1961    type Context = ();
1962
1963    fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
1964        self.max_id = summary.max_id;
1965    }
1966}
1967
1968impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
1969    fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
1970        *self = summary.max_id;
1971    }
1972}
1973
1974#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1975pub struct PathKey(Arc<Path>);
1976
1977impl Default for PathKey {
1978    fn default() -> Self {
1979        Self(Path::new("").into())
1980    }
1981}
1982
1983impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1984    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1985        self.0 = summary.max_path.clone();
1986    }
1987}
1988
1989#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1990enum PathSearch<'a> {
1991    Exact(&'a Path),
1992    Successor(&'a Path),
1993}
1994
1995impl<'a> Ord for PathSearch<'a> {
1996    fn cmp(&self, other: &Self) -> cmp::Ordering {
1997        match (self, other) {
1998            (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1999            (Self::Successor(a), Self::Exact(b)) => {
2000                if b.starts_with(a) {
2001                    cmp::Ordering::Greater
2002                } else {
2003                    a.cmp(b)
2004                }
2005            }
2006            _ => unreachable!("not sure we need the other two cases"),
2007        }
2008    }
2009}
2010
2011impl<'a> PartialOrd for PathSearch<'a> {
2012    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
2013        Some(self.cmp(other))
2014    }
2015}
2016
2017impl<'a> Default for PathSearch<'a> {
2018    fn default() -> Self {
2019        Self::Exact(Path::new("").into())
2020    }
2021}
2022
2023impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
2024    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2025        *self = Self::Exact(summary.max_path.as_ref());
2026    }
2027}
2028
2029#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
2030pub struct FileCount(usize);
2031
2032impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
2033    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2034        self.0 += summary.file_count;
2035    }
2036}
2037
2038#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
2039pub struct VisibleFileCount(usize);
2040
2041impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
2042    fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2043        self.0 += summary.visible_file_count;
2044    }
2045}
2046
2047struct BackgroundScanner {
2048    fs: Arc<dyn Fs>,
2049    snapshot: Arc<Mutex<Snapshot>>,
2050    notify: Sender<ScanState>,
2051    executor: Arc<executor::Background>,
2052}
2053
2054impl BackgroundScanner {
2055    fn new(
2056        snapshot: Arc<Mutex<Snapshot>>,
2057        notify: Sender<ScanState>,
2058        fs: Arc<dyn Fs>,
2059        executor: Arc<executor::Background>,
2060    ) -> Self {
2061        Self {
2062            fs,
2063            snapshot,
2064            notify,
2065            executor,
2066        }
2067    }
2068
2069    fn abs_path(&self) -> Arc<Path> {
2070        self.snapshot.lock().abs_path.clone()
2071    }
2072
2073    fn snapshot(&self) -> Snapshot {
2074        self.snapshot.lock().clone()
2075    }
2076
2077    async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2078        if self.notify.send(ScanState::Scanning).await.is_err() {
2079            return;
2080        }
2081
2082        if let Err(err) = self.scan_dirs().await {
2083            if self
2084                .notify
2085                .send(ScanState::Err(Arc::new(err)))
2086                .await
2087                .is_err()
2088            {
2089                return;
2090            }
2091        }
2092
2093        if self.notify.send(ScanState::Idle).await.is_err() {
2094            return;
2095        }
2096
2097        futures::pin_mut!(events_rx);
2098        while let Some(events) = events_rx.next().await {
2099            if self.notify.send(ScanState::Scanning).await.is_err() {
2100                break;
2101            }
2102
2103            if !self.process_events(events).await {
2104                break;
2105            }
2106
2107            if self.notify.send(ScanState::Idle).await.is_err() {
2108                break;
2109            }
2110        }
2111    }
2112
2113    async fn scan_dirs(&mut self) -> Result<()> {
2114        let root_char_bag;
2115        let next_entry_id;
2116        let is_dir;
2117        {
2118            let snapshot = self.snapshot.lock();
2119            root_char_bag = snapshot.root_char_bag;
2120            next_entry_id = snapshot.next_entry_id.clone();
2121            is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2122        };
2123
2124        if is_dir {
2125            let path: Arc<Path> = Arc::from(Path::new(""));
2126            let abs_path = self.abs_path();
2127            let (tx, rx) = channel::unbounded();
2128            tx.send(ScanJob {
2129                abs_path: abs_path.to_path_buf(),
2130                path,
2131                ignore_stack: IgnoreStack::none(),
2132                scan_queue: tx.clone(),
2133            })
2134            .await
2135            .unwrap();
2136            drop(tx);
2137
2138            self.executor
2139                .scoped(|scope| {
2140                    for _ in 0..self.executor.num_cpus() {
2141                        scope.spawn(async {
2142                            while let Ok(job) = rx.recv().await {
2143                                if let Err(err) = self
2144                                    .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2145                                    .await
2146                                {
2147                                    log::error!("error scanning {:?}: {}", job.abs_path, err);
2148                                }
2149                            }
2150                        });
2151                    }
2152                })
2153                .await;
2154        }
2155
2156        Ok(())
2157    }
2158
2159    async fn scan_dir(
2160        &self,
2161        root_char_bag: CharBag,
2162        next_entry_id: Arc<AtomicUsize>,
2163        job: &ScanJob,
2164    ) -> Result<()> {
2165        let mut new_entries: Vec<Entry> = Vec::new();
2166        let mut new_jobs: Vec<ScanJob> = Vec::new();
2167        let mut ignore_stack = job.ignore_stack.clone();
2168        let mut new_ignore = None;
2169
2170        let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2171        while let Some(child_abs_path) = child_paths.next().await {
2172            let child_abs_path = match child_abs_path {
2173                Ok(child_abs_path) => child_abs_path,
2174                Err(error) => {
2175                    log::error!("error processing entry {:?}", error);
2176                    continue;
2177                }
2178            };
2179            let child_name = child_abs_path.file_name().unwrap();
2180            let child_path: Arc<Path> = job.path.join(child_name).into();
2181            let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2182                Some(metadata) => metadata,
2183                None => continue,
2184            };
2185
2186            // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2187            if child_name == *GITIGNORE {
2188                let (ignore, err) = Gitignore::new(&child_abs_path);
2189                if let Some(err) = err {
2190                    log::error!("error in ignore file {:?} - {:?}", child_name, err);
2191                }
2192                let ignore = Arc::new(ignore);
2193                ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2194                new_ignore = Some(ignore);
2195
2196                // Update ignore status of any child entries we've already processed to reflect the
2197                // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2198                // there should rarely be too numerous. Update the ignore stack associated with any
2199                // new jobs as well.
2200                let mut new_jobs = new_jobs.iter_mut();
2201                for entry in &mut new_entries {
2202                    entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2203                    if entry.is_dir() {
2204                        new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2205                            IgnoreStack::all()
2206                        } else {
2207                            ignore_stack.clone()
2208                        };
2209                    }
2210                }
2211            }
2212
2213            let mut child_entry = Entry::new(
2214                child_path.clone(),
2215                &child_metadata,
2216                &next_entry_id,
2217                root_char_bag,
2218            );
2219
2220            if child_metadata.is_dir {
2221                let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2222                child_entry.is_ignored = is_ignored;
2223                new_entries.push(child_entry);
2224                new_jobs.push(ScanJob {
2225                    abs_path: child_abs_path,
2226                    path: child_path,
2227                    ignore_stack: if is_ignored {
2228                        IgnoreStack::all()
2229                    } else {
2230                        ignore_stack.clone()
2231                    },
2232                    scan_queue: job.scan_queue.clone(),
2233                });
2234            } else {
2235                child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2236                new_entries.push(child_entry);
2237            };
2238        }
2239
2240        self.snapshot
2241            .lock()
2242            .populate_dir(job.path.clone(), new_entries, new_ignore);
2243        for new_job in new_jobs {
2244            job.scan_queue.send(new_job).await.unwrap();
2245        }
2246
2247        Ok(())
2248    }
2249
2250    async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2251        let mut snapshot = self.snapshot();
2252        snapshot.scan_id += 1;
2253
2254        let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2255            abs_path
2256        } else {
2257            return false;
2258        };
2259        let root_char_bag = snapshot.root_char_bag;
2260        let next_entry_id = snapshot.next_entry_id.clone();
2261
2262        events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2263        events.dedup_by(|a, b| a.path.starts_with(&b.path));
2264
2265        for event in &events {
2266            match event.path.strip_prefix(&root_abs_path) {
2267                Ok(path) => snapshot.remove_path(&path),
2268                Err(_) => {
2269                    log::error!(
2270                        "unexpected event {:?} for root path {:?}",
2271                        event.path,
2272                        root_abs_path
2273                    );
2274                    continue;
2275                }
2276            }
2277        }
2278
2279        let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2280        for event in events {
2281            let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2282                Ok(path) => Arc::from(path.to_path_buf()),
2283                Err(_) => {
2284                    log::error!(
2285                        "unexpected event {:?} for root path {:?}",
2286                        event.path,
2287                        root_abs_path
2288                    );
2289                    continue;
2290                }
2291            };
2292
2293            match self.fs.metadata(&event.path).await {
2294                Ok(Some(metadata)) => {
2295                    let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2296                    let mut fs_entry = Entry::new(
2297                        path.clone(),
2298                        &metadata,
2299                        snapshot.next_entry_id.as_ref(),
2300                        snapshot.root_char_bag,
2301                    );
2302                    fs_entry.is_ignored = ignore_stack.is_all();
2303                    snapshot.insert_entry(fs_entry);
2304                    if metadata.is_dir {
2305                        scan_queue_tx
2306                            .send(ScanJob {
2307                                abs_path: event.path,
2308                                path,
2309                                ignore_stack,
2310                                scan_queue: scan_queue_tx.clone(),
2311                            })
2312                            .await
2313                            .unwrap();
2314                    }
2315                }
2316                Ok(None) => {}
2317                Err(err) => {
2318                    // TODO - create a special 'error' entry in the entries tree to mark this
2319                    log::error!("error reading file on event {:?}", err);
2320                }
2321            }
2322        }
2323
2324        *self.snapshot.lock() = snapshot;
2325
2326        // Scan any directories that were created as part of this event batch.
2327        drop(scan_queue_tx);
2328        self.executor
2329            .scoped(|scope| {
2330                for _ in 0..self.executor.num_cpus() {
2331                    scope.spawn(async {
2332                        while let Ok(job) = scan_queue_rx.recv().await {
2333                            if let Err(err) = self
2334                                .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2335                                .await
2336                            {
2337                                log::error!("error scanning {:?}: {}", job.abs_path, err);
2338                            }
2339                        }
2340                    });
2341                }
2342            })
2343            .await;
2344
2345        // Attempt to detect renames only over a single batch of file-system events.
2346        self.snapshot.lock().removed_entry_ids.clear();
2347
2348        self.update_ignore_statuses().await;
2349        true
2350    }
2351
2352    async fn update_ignore_statuses(&self) {
2353        let mut snapshot = self.snapshot();
2354
2355        let mut ignores_to_update = Vec::new();
2356        let mut ignores_to_delete = Vec::new();
2357        for (parent_path, (_, scan_id)) in &snapshot.ignores {
2358            if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2359                ignores_to_update.push(parent_path.clone());
2360            }
2361
2362            let ignore_path = parent_path.join(&*GITIGNORE);
2363            if snapshot.entry_for_path(ignore_path).is_none() {
2364                ignores_to_delete.push(parent_path.clone());
2365            }
2366        }
2367
2368        for parent_path in ignores_to_delete {
2369            snapshot.ignores.remove(&parent_path);
2370            self.snapshot.lock().ignores.remove(&parent_path);
2371        }
2372
2373        let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2374        ignores_to_update.sort_unstable();
2375        let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2376        while let Some(parent_path) = ignores_to_update.next() {
2377            while ignores_to_update
2378                .peek()
2379                .map_or(false, |p| p.starts_with(&parent_path))
2380            {
2381                ignores_to_update.next().unwrap();
2382            }
2383
2384            let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2385            ignore_queue_tx
2386                .send(UpdateIgnoreStatusJob {
2387                    path: parent_path,
2388                    ignore_stack,
2389                    ignore_queue: ignore_queue_tx.clone(),
2390                })
2391                .await
2392                .unwrap();
2393        }
2394        drop(ignore_queue_tx);
2395
2396        self.executor
2397            .scoped(|scope| {
2398                for _ in 0..self.executor.num_cpus() {
2399                    scope.spawn(async {
2400                        while let Ok(job) = ignore_queue_rx.recv().await {
2401                            self.update_ignore_status(job, &snapshot).await;
2402                        }
2403                    });
2404                }
2405            })
2406            .await;
2407    }
2408
2409    async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2410        let mut ignore_stack = job.ignore_stack;
2411        if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2412            ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2413        }
2414
2415        let mut edits = Vec::new();
2416        for mut entry in snapshot.child_entries(&job.path).cloned() {
2417            let was_ignored = entry.is_ignored;
2418            entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2419            if entry.is_dir() {
2420                let child_ignore_stack = if entry.is_ignored {
2421                    IgnoreStack::all()
2422                } else {
2423                    ignore_stack.clone()
2424                };
2425                job.ignore_queue
2426                    .send(UpdateIgnoreStatusJob {
2427                        path: entry.path.clone(),
2428                        ignore_stack: child_ignore_stack,
2429                        ignore_queue: job.ignore_queue.clone(),
2430                    })
2431                    .await
2432                    .unwrap();
2433            }
2434
2435            if entry.is_ignored != was_ignored {
2436                edits.push(Edit::Insert(entry));
2437            }
2438        }
2439        self.snapshot.lock().entries_by_path.edit(edits, &());
2440    }
2441}
2442
2443async fn refresh_entry(
2444    fs: &dyn Fs,
2445    snapshot: &Mutex<Snapshot>,
2446    path: Arc<Path>,
2447    abs_path: &Path,
2448) -> Result<Entry> {
2449    let root_char_bag;
2450    let next_entry_id;
2451    {
2452        let snapshot = snapshot.lock();
2453        root_char_bag = snapshot.root_char_bag;
2454        next_entry_id = snapshot.next_entry_id.clone();
2455    }
2456    let entry = Entry::new(
2457        path,
2458        &fs.metadata(abs_path)
2459            .await?
2460            .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2461        &next_entry_id,
2462        root_char_bag,
2463    );
2464    Ok(snapshot.lock().insert_entry(entry))
2465}
2466
2467fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2468    let mut result = root_char_bag;
2469    result.extend(
2470        path.to_string_lossy()
2471            .chars()
2472            .map(|c| c.to_ascii_lowercase()),
2473    );
2474    result
2475}
2476
2477struct ScanJob {
2478    abs_path: PathBuf,
2479    path: Arc<Path>,
2480    ignore_stack: Arc<IgnoreStack>,
2481    scan_queue: Sender<ScanJob>,
2482}
2483
2484struct UpdateIgnoreStatusJob {
2485    path: Arc<Path>,
2486    ignore_stack: Arc<IgnoreStack>,
2487    ignore_queue: Sender<UpdateIgnoreStatusJob>,
2488}
2489
2490pub trait WorktreeHandle {
2491    #[cfg(test)]
2492    fn flush_fs_events<'a>(
2493        &self,
2494        cx: &'a gpui::TestAppContext,
2495    ) -> futures::future::LocalBoxFuture<'a, ()>;
2496}
2497
2498impl WorktreeHandle for ModelHandle<Worktree> {
2499    // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2500    // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2501    // extra directory scans, and emit extra scan-state notifications.
2502    //
2503    // This function mutates the worktree's directory and waits for those mutations to be picked up,
2504    // to ensure that all redundant FS events have already been processed.
2505    #[cfg(test)]
2506    fn flush_fs_events<'a>(
2507        &self,
2508        cx: &'a gpui::TestAppContext,
2509    ) -> futures::future::LocalBoxFuture<'a, ()> {
2510        use smol::future::FutureExt;
2511
2512        let filename = "fs-event-sentinel";
2513        let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2514        let tree = self.clone();
2515        async move {
2516            std::fs::write(root_path.join(filename), "").unwrap();
2517            tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2518                .await;
2519
2520            std::fs::remove_file(root_path.join(filename)).unwrap();
2521            tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2522                .await;
2523
2524            cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2525                .await;
2526        }
2527        .boxed_local()
2528    }
2529}
2530
2531pub enum FileIter<'a> {
2532    All(Cursor<'a, Entry, FileCount, ()>),
2533    Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2534}
2535
2536impl<'a> FileIter<'a> {
2537    fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2538        let mut cursor = snapshot.entries_by_path.cursor();
2539        cursor.seek(&FileCount(start), Bias::Right, &());
2540        Self::All(cursor)
2541    }
2542
2543    fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2544        let mut cursor = snapshot.entries_by_path.cursor();
2545        cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2546        Self::Visible(cursor)
2547    }
2548
2549    fn next_internal(&mut self) {
2550        match self {
2551            Self::All(cursor) => {
2552                let ix = *cursor.seek_start();
2553                cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2554            }
2555            Self::Visible(cursor) => {
2556                let ix = *cursor.seek_start();
2557                cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2558            }
2559        }
2560    }
2561
2562    fn item(&self) -> Option<&'a Entry> {
2563        match self {
2564            Self::All(cursor) => cursor.item(),
2565            Self::Visible(cursor) => cursor.item(),
2566        }
2567    }
2568}
2569
2570impl<'a> Iterator for FileIter<'a> {
2571    type Item = &'a Entry;
2572
2573    fn next(&mut self) -> Option<Self::Item> {
2574        if let Some(entry) = self.item() {
2575            self.next_internal();
2576            Some(entry)
2577        } else {
2578            None
2579        }
2580    }
2581}
2582
2583struct ChildEntriesIter<'a> {
2584    parent_path: &'a Path,
2585    cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2586}
2587
2588impl<'a> ChildEntriesIter<'a> {
2589    fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2590        let mut cursor = snapshot.entries_by_path.cursor();
2591        cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2592        Self {
2593            parent_path,
2594            cursor,
2595        }
2596    }
2597}
2598
2599impl<'a> Iterator for ChildEntriesIter<'a> {
2600    type Item = &'a Entry;
2601
2602    fn next(&mut self) -> Option<Self::Item> {
2603        if let Some(item) = self.cursor.item() {
2604            if item.path.starts_with(self.parent_path) {
2605                self.cursor
2606                    .seek_forward(&PathSearch::Successor(&item.path), Bias::Left, &());
2607                Some(item)
2608            } else {
2609                None
2610            }
2611        } else {
2612            None
2613        }
2614    }
2615}
2616
2617impl<'a> From<&'a Entry> for proto::Entry {
2618    fn from(entry: &'a Entry) -> Self {
2619        Self {
2620            id: entry.id as u64,
2621            is_dir: entry.is_dir(),
2622            path: entry.path.to_string_lossy().to_string(),
2623            inode: entry.inode,
2624            mtime: Some(entry.mtime.into()),
2625            is_symlink: entry.is_symlink,
2626            is_ignored: entry.is_ignored,
2627        }
2628    }
2629}
2630
2631impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2632    type Error = anyhow::Error;
2633
2634    fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2635        if let Some(mtime) = entry.mtime {
2636            let kind = if entry.is_dir {
2637                EntryKind::Dir
2638            } else {
2639                let mut char_bag = root_char_bag.clone();
2640                char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2641                EntryKind::File(char_bag)
2642            };
2643            let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2644            Ok(Entry {
2645                id: entry.id as usize,
2646                kind,
2647                path: path.clone(),
2648                inode: entry.inode,
2649                mtime: mtime.into(),
2650                is_symlink: entry.is_symlink,
2651                is_ignored: entry.is_ignored,
2652            })
2653        } else {
2654            Err(anyhow!(
2655                "missing mtime in remote worktree entry {:?}",
2656                entry.path
2657            ))
2658        }
2659    }
2660}
2661
2662#[cfg(test)]
2663mod tests {
2664    use super::*;
2665    use crate::fs::FakeFs;
2666    use crate::test::*;
2667    use anyhow::Result;
2668    use fs::RealFs;
2669    use rand::prelude::*;
2670    use serde_json::json;
2671    use std::time::UNIX_EPOCH;
2672    use std::{env, fmt::Write, os::unix, time::SystemTime};
2673
2674    #[gpui::test]
2675    async fn test_populate_and_search(cx: gpui::TestAppContext) {
2676        let dir = temp_tree(json!({
2677            "root": {
2678                "apple": "",
2679                "banana": {
2680                    "carrot": {
2681                        "date": "",
2682                        "endive": "",
2683                    }
2684                },
2685                "fennel": {
2686                    "grape": "",
2687                }
2688            }
2689        }));
2690
2691        let root_link_path = dir.path().join("root_link");
2692        unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2693        unix::fs::symlink(
2694            &dir.path().join("root/fennel"),
2695            &dir.path().join("root/finnochio"),
2696        )
2697        .unwrap();
2698
2699        let tree = Worktree::open_local(
2700            rpc::Client::new(),
2701            root_link_path,
2702            Arc::new(RealFs),
2703            Default::default(),
2704            &mut cx.to_async(),
2705        )
2706        .await
2707        .unwrap();
2708
2709        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2710            .await;
2711        let snapshots = [cx.read(|cx| {
2712            let tree = tree.read(cx);
2713            assert_eq!(tree.file_count(), 5);
2714            assert_eq!(
2715                tree.inode_for_path("fennel/grape"),
2716                tree.inode_for_path("finnochio/grape")
2717            );
2718            tree.snapshot()
2719        })];
2720        let cancel_flag = Default::default();
2721        let results = cx
2722            .read(|cx| {
2723                match_paths(
2724                    &snapshots,
2725                    "bna",
2726                    false,
2727                    false,
2728                    10,
2729                    &cancel_flag,
2730                    cx.background().clone(),
2731                )
2732            })
2733            .await;
2734        assert_eq!(
2735            results
2736                .into_iter()
2737                .map(|result| result.path)
2738                .collect::<Vec<Arc<Path>>>(),
2739            vec![
2740                PathBuf::from("banana/carrot/date").into(),
2741                PathBuf::from("banana/carrot/endive").into(),
2742            ]
2743        );
2744    }
2745
2746    #[gpui::test]
2747    async fn test_search_worktree_without_files(cx: gpui::TestAppContext) {
2748        let dir = temp_tree(json!({
2749            "root": {
2750                "dir1": {},
2751                "dir2": {
2752                    "dir3": {}
2753                }
2754            }
2755        }));
2756        let tree = Worktree::open_local(
2757            rpc::Client::new(),
2758            dir.path(),
2759            Arc::new(RealFs),
2760            Default::default(),
2761            &mut cx.to_async(),
2762        )
2763        .await
2764        .unwrap();
2765
2766        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2767            .await;
2768        let snapshots = [cx.read(|cx| {
2769            let tree = tree.read(cx);
2770            assert_eq!(tree.file_count(), 0);
2771            tree.snapshot()
2772        })];
2773        let cancel_flag = Default::default();
2774        let results = cx
2775            .read(|cx| {
2776                match_paths(
2777                    &snapshots,
2778                    "dir",
2779                    false,
2780                    false,
2781                    10,
2782                    &cancel_flag,
2783                    cx.background().clone(),
2784                )
2785            })
2786            .await;
2787        assert_eq!(
2788            results
2789                .into_iter()
2790                .map(|result| result.path)
2791                .collect::<Vec<Arc<Path>>>(),
2792            vec![]
2793        );
2794    }
2795
2796    #[gpui::test]
2797    async fn test_save_file(mut cx: gpui::TestAppContext) {
2798        let dir = temp_tree(json!({
2799            "file1": "the old contents",
2800        }));
2801        let tree = Worktree::open_local(
2802            rpc::Client::new(),
2803            dir.path(),
2804            Arc::new(RealFs),
2805            Default::default(),
2806            &mut cx.to_async(),
2807        )
2808        .await
2809        .unwrap();
2810        let buffer = tree
2811            .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2812            .await
2813            .unwrap();
2814        let save = buffer.update(&mut cx, |buffer, cx| {
2815            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2816            buffer.save(cx).unwrap()
2817        });
2818        save.await.unwrap();
2819
2820        let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2821        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2822    }
2823
2824    #[gpui::test]
2825    async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2826        let dir = temp_tree(json!({
2827            "file1": "the old contents",
2828        }));
2829        let file_path = dir.path().join("file1");
2830
2831        let tree = Worktree::open_local(
2832            rpc::Client::new(),
2833            file_path.clone(),
2834            Arc::new(RealFs),
2835            Default::default(),
2836            &mut cx.to_async(),
2837        )
2838        .await
2839        .unwrap();
2840        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2841            .await;
2842        cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2843
2844        let buffer = tree
2845            .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2846            .await
2847            .unwrap();
2848        let save = buffer.update(&mut cx, |buffer, cx| {
2849            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2850            buffer.save(cx).unwrap()
2851        });
2852        save.await.unwrap();
2853
2854        let new_text = std::fs::read_to_string(file_path).unwrap();
2855        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2856    }
2857
2858    #[gpui::test]
2859    async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2860        let dir = temp_tree(json!({
2861            "a": {
2862                "file1": "",
2863                "file2": "",
2864                "file3": "",
2865            },
2866            "b": {
2867                "c": {
2868                    "file4": "",
2869                    "file5": "",
2870                }
2871            }
2872        }));
2873
2874        let user_id = 5;
2875        let mut client = rpc::Client::new();
2876        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
2877        let tree = Worktree::open_local(
2878            client,
2879            dir.path(),
2880            Arc::new(RealFs),
2881            Default::default(),
2882            &mut cx.to_async(),
2883        )
2884        .await
2885        .unwrap();
2886
2887        let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2888            let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2889            async move { buffer.await.unwrap() }
2890        };
2891        let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2892            tree.read_with(cx, |tree, _| {
2893                tree.entry_for_path(path)
2894                    .expect(&format!("no entry for path {}", path))
2895                    .id
2896            })
2897        };
2898
2899        let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2900        let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2901        let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2902        let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2903
2904        let file2_id = id_for_path("a/file2", &cx);
2905        let file3_id = id_for_path("a/file3", &cx);
2906        let file4_id = id_for_path("b/c/file4", &cx);
2907
2908        // Wait for the initial scan.
2909        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2910            .await;
2911
2912        // Create a remote copy of this worktree.
2913        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2914        let worktree_id = 1;
2915        let share_request = tree.update(&mut cx, |tree, cx| {
2916            tree.as_local().unwrap().share_request(cx)
2917        });
2918        let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
2919        server
2920            .respond(
2921                open_worktree.receipt(),
2922                proto::OpenWorktreeResponse { worktree_id: 1 },
2923            )
2924            .await;
2925
2926        let remote = Worktree::remote(
2927            proto::JoinWorktreeResponse {
2928                worktree: share_request.await.unwrap().worktree,
2929                replica_id: 1,
2930                peers: Vec::new(),
2931            },
2932            rpc::Client::new(),
2933            Default::default(),
2934            &mut cx.to_async(),
2935        )
2936        .await
2937        .unwrap();
2938
2939        cx.read(|cx| {
2940            assert!(!buffer2.read(cx).is_dirty());
2941            assert!(!buffer3.read(cx).is_dirty());
2942            assert!(!buffer4.read(cx).is_dirty());
2943            assert!(!buffer5.read(cx).is_dirty());
2944        });
2945
2946        // Rename and delete files and directories.
2947        tree.flush_fs_events(&cx).await;
2948        std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2949        std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2950        std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2951        std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2952        tree.flush_fs_events(&cx).await;
2953
2954        let expected_paths = vec![
2955            "a",
2956            "a/file1",
2957            "a/file2.new",
2958            "b",
2959            "d",
2960            "d/file3",
2961            "d/file4",
2962        ];
2963
2964        cx.read(|app| {
2965            assert_eq!(
2966                tree.read(app)
2967                    .paths()
2968                    .map(|p| p.to_str().unwrap())
2969                    .collect::<Vec<_>>(),
2970                expected_paths
2971            );
2972
2973            assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2974            assert_eq!(id_for_path("d/file3", &cx), file3_id);
2975            assert_eq!(id_for_path("d/file4", &cx), file4_id);
2976
2977            assert_eq!(
2978                buffer2.read(app).file().unwrap().path().as_ref(),
2979                Path::new("a/file2.new")
2980            );
2981            assert_eq!(
2982                buffer3.read(app).file().unwrap().path().as_ref(),
2983                Path::new("d/file3")
2984            );
2985            assert_eq!(
2986                buffer4.read(app).file().unwrap().path().as_ref(),
2987                Path::new("d/file4")
2988            );
2989            assert_eq!(
2990                buffer5.read(app).file().unwrap().path().as_ref(),
2991                Path::new("b/c/file5")
2992            );
2993
2994            assert!(!buffer2.read(app).file().unwrap().is_deleted());
2995            assert!(!buffer3.read(app).file().unwrap().is_deleted());
2996            assert!(!buffer4.read(app).file().unwrap().is_deleted());
2997            assert!(buffer5.read(app).file().unwrap().is_deleted());
2998        });
2999
3000        // Update the remote worktree. Check that it becomes consistent with the
3001        // local worktree.
3002        remote.update(&mut cx, |remote, cx| {
3003            let update_message = tree
3004                .read(cx)
3005                .snapshot()
3006                .build_update(&initial_snapshot, worktree_id);
3007            remote
3008                .as_remote_mut()
3009                .unwrap()
3010                .snapshot
3011                .apply_update(update_message)
3012                .unwrap();
3013
3014            assert_eq!(
3015                remote
3016                    .paths()
3017                    .map(|p| p.to_str().unwrap())
3018                    .collect::<Vec<_>>(),
3019                expected_paths
3020            );
3021        });
3022    }
3023
3024    #[gpui::test]
3025    async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
3026        let dir = temp_tree(json!({
3027            ".git": {},
3028            ".gitignore": "ignored-dir\n",
3029            "tracked-dir": {
3030                "tracked-file1": "tracked contents",
3031            },
3032            "ignored-dir": {
3033                "ignored-file1": "ignored contents",
3034            }
3035        }));
3036
3037        let tree = Worktree::open_local(
3038            rpc::Client::new(),
3039            dir.path(),
3040            Arc::new(RealFs),
3041            Default::default(),
3042            &mut cx.to_async(),
3043        )
3044        .await
3045        .unwrap();
3046        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3047            .await;
3048        tree.flush_fs_events(&cx).await;
3049        cx.read(|cx| {
3050            let tree = tree.read(cx);
3051            let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3052            let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3053            assert_eq!(tracked.is_ignored, false);
3054            assert_eq!(ignored.is_ignored, true);
3055        });
3056
3057        std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3058        std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3059        tree.flush_fs_events(&cx).await;
3060        cx.read(|cx| {
3061            let tree = tree.read(cx);
3062            let dot_git = tree.entry_for_path(".git").unwrap();
3063            let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3064            let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3065            assert_eq!(tracked.is_ignored, false);
3066            assert_eq!(ignored.is_ignored, true);
3067            assert_eq!(dot_git.is_ignored, true);
3068        });
3069    }
3070
3071    #[gpui::test]
3072    async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3073        let user_id = 100;
3074        let mut client = rpc::Client::new();
3075        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3076
3077        let fs = Arc::new(FakeFs::new());
3078        fs.insert_tree(
3079            "/path",
3080            json!({
3081                "to": {
3082                    "the-dir": {
3083                        ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3084                        "a.txt": "a-contents",
3085                    },
3086                },
3087            }),
3088        )
3089        .await;
3090
3091        let worktree = Worktree::open_local(
3092            client.clone(),
3093            "/path/to/the-dir".as_ref(),
3094            fs,
3095            Default::default(),
3096            &mut cx.to_async(),
3097        )
3098        .await
3099        .unwrap();
3100
3101        {
3102            let cx = cx.to_async();
3103            client.authenticate_and_connect(&cx).await.unwrap();
3104        }
3105
3106        let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3107        assert_eq!(
3108            open_worktree.payload,
3109            proto::OpenWorktree {
3110                root_name: "the-dir".to_string(),
3111                collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3112            }
3113        );
3114
3115        server
3116            .respond(
3117                open_worktree.receipt(),
3118                proto::OpenWorktreeResponse { worktree_id: 5 },
3119            )
3120            .await;
3121        let remote_id = worktree
3122            .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3123            .await;
3124        assert_eq!(remote_id, Some(5));
3125
3126        cx.update(move |_| drop(worktree));
3127        server.receive::<proto::CloseWorktree>().await.unwrap();
3128    }
3129
3130    #[gpui::test(iterations = 100)]
3131    fn test_random(mut rng: StdRng) {
3132        let operations = env::var("OPERATIONS")
3133            .map(|o| o.parse().unwrap())
3134            .unwrap_or(40);
3135        let initial_entries = env::var("INITIAL_ENTRIES")
3136            .map(|o| o.parse().unwrap())
3137            .unwrap_or(20);
3138
3139        let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3140        for _ in 0..initial_entries {
3141            randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3142        }
3143        log::info!("Generated initial tree");
3144
3145        let (notify_tx, _notify_rx) = smol::channel::unbounded();
3146        let fs = Arc::new(RealFs);
3147        let next_entry_id = Arc::new(AtomicUsize::new(0));
3148        let mut initial_snapshot = Snapshot {
3149            id: 0,
3150            scan_id: 0,
3151            abs_path: root_dir.path().into(),
3152            entries_by_path: Default::default(),
3153            entries_by_id: Default::default(),
3154            removed_entry_ids: Default::default(),
3155            ignores: Default::default(),
3156            root_name: Default::default(),
3157            root_char_bag: Default::default(),
3158            next_entry_id: next_entry_id.clone(),
3159        };
3160        initial_snapshot.insert_entry(Entry::new(
3161            Path::new("").into(),
3162            &smol::block_on(fs.metadata(root_dir.path()))
3163                .unwrap()
3164                .unwrap(),
3165            &next_entry_id,
3166            Default::default(),
3167        ));
3168        let mut scanner = BackgroundScanner::new(
3169            Arc::new(Mutex::new(initial_snapshot.clone())),
3170            notify_tx,
3171            fs.clone(),
3172            Arc::new(gpui::executor::Background::new()),
3173        );
3174        smol::block_on(scanner.scan_dirs()).unwrap();
3175        scanner.snapshot().check_invariants();
3176
3177        let mut events = Vec::new();
3178        let mut mutations_len = operations;
3179        while mutations_len > 1 {
3180            if !events.is_empty() && rng.gen_bool(0.4) {
3181                let len = rng.gen_range(0..=events.len());
3182                let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3183                log::info!("Delivering events: {:#?}", to_deliver);
3184                smol::block_on(scanner.process_events(to_deliver));
3185                scanner.snapshot().check_invariants();
3186            } else {
3187                events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3188                mutations_len -= 1;
3189            }
3190        }
3191        log::info!("Quiescing: {:#?}", events);
3192        smol::block_on(scanner.process_events(events));
3193        scanner.snapshot().check_invariants();
3194
3195        let (notify_tx, _notify_rx) = smol::channel::unbounded();
3196        let mut new_scanner = BackgroundScanner::new(
3197            Arc::new(Mutex::new(initial_snapshot)),
3198            notify_tx,
3199            scanner.fs.clone(),
3200            scanner.executor.clone(),
3201        );
3202        smol::block_on(new_scanner.scan_dirs()).unwrap();
3203        assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
3204    }
3205
3206    fn randomly_mutate_tree(
3207        root_path: &Path,
3208        insertion_probability: f64,
3209        rng: &mut impl Rng,
3210    ) -> Result<Vec<fsevent::Event>> {
3211        let root_path = root_path.canonicalize().unwrap();
3212        let (dirs, files) = read_dir_recursive(root_path.clone());
3213
3214        let mut events = Vec::new();
3215        let mut record_event = |path: PathBuf| {
3216            events.push(fsevent::Event {
3217                event_id: SystemTime::now()
3218                    .duration_since(UNIX_EPOCH)
3219                    .unwrap()
3220                    .as_secs(),
3221                flags: fsevent::StreamFlags::empty(),
3222                path,
3223            });
3224        };
3225
3226        if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3227            let path = dirs.choose(rng).unwrap();
3228            let new_path = path.join(gen_name(rng));
3229
3230            if rng.gen() {
3231                log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3232                std::fs::create_dir(&new_path)?;
3233            } else {
3234                log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3235                std::fs::write(&new_path, "")?;
3236            }
3237            record_event(new_path);
3238        } else if rng.gen_bool(0.05) {
3239            let ignore_dir_path = dirs.choose(rng).unwrap();
3240            let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3241
3242            let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3243            let files_to_ignore = {
3244                let len = rng.gen_range(0..=subfiles.len());
3245                subfiles.choose_multiple(rng, len)
3246            };
3247            let dirs_to_ignore = {
3248                let len = rng.gen_range(0..subdirs.len());
3249                subdirs.choose_multiple(rng, len)
3250            };
3251
3252            let mut ignore_contents = String::new();
3253            for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3254                write!(
3255                    ignore_contents,
3256                    "{}\n",
3257                    path_to_ignore
3258                        .strip_prefix(&ignore_dir_path)?
3259                        .to_str()
3260                        .unwrap()
3261                )
3262                .unwrap();
3263            }
3264            log::info!(
3265                "Creating {:?} with contents:\n{}",
3266                ignore_path.strip_prefix(&root_path)?,
3267                ignore_contents
3268            );
3269            std::fs::write(&ignore_path, ignore_contents).unwrap();
3270            record_event(ignore_path);
3271        } else {
3272            let old_path = {
3273                let file_path = files.choose(rng);
3274                let dir_path = dirs[1..].choose(rng);
3275                file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3276            };
3277
3278            let is_rename = rng.gen();
3279            if is_rename {
3280                let new_path_parent = dirs
3281                    .iter()
3282                    .filter(|d| !d.starts_with(old_path))
3283                    .choose(rng)
3284                    .unwrap();
3285
3286                let overwrite_existing_dir =
3287                    !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3288                let new_path = if overwrite_existing_dir {
3289                    std::fs::remove_dir_all(&new_path_parent).ok();
3290                    new_path_parent.to_path_buf()
3291                } else {
3292                    new_path_parent.join(gen_name(rng))
3293                };
3294
3295                log::info!(
3296                    "Renaming {:?} to {}{:?}",
3297                    old_path.strip_prefix(&root_path)?,
3298                    if overwrite_existing_dir {
3299                        "overwrite "
3300                    } else {
3301                        ""
3302                    },
3303                    new_path.strip_prefix(&root_path)?
3304                );
3305                std::fs::rename(&old_path, &new_path)?;
3306                record_event(old_path.clone());
3307                record_event(new_path);
3308            } else if old_path.is_dir() {
3309                let (dirs, files) = read_dir_recursive(old_path.clone());
3310
3311                log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3312                std::fs::remove_dir_all(&old_path).unwrap();
3313                for file in files {
3314                    record_event(file);
3315                }
3316                for dir in dirs {
3317                    record_event(dir);
3318                }
3319            } else {
3320                log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3321                std::fs::remove_file(old_path).unwrap();
3322                record_event(old_path.clone());
3323            }
3324        }
3325
3326        Ok(events)
3327    }
3328
3329    fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3330        let child_entries = std::fs::read_dir(&path).unwrap();
3331        let mut dirs = vec![path];
3332        let mut files = Vec::new();
3333        for child_entry in child_entries {
3334            let child_path = child_entry.unwrap().path();
3335            if child_path.is_dir() {
3336                let (child_dirs, child_files) = read_dir_recursive(child_path);
3337                dirs.extend(child_dirs);
3338                files.extend(child_files);
3339            } else {
3340                files.push(child_path);
3341            }
3342        }
3343        (dirs, files)
3344    }
3345
3346    fn gen_name(rng: &mut impl Rng) -> String {
3347        (0..6)
3348            .map(|_| rng.sample(rand::distributions::Alphanumeric))
3349            .map(char::from)
3350            .collect()
3351    }
3352
3353    impl Snapshot {
3354        fn check_invariants(&self) {
3355            let mut files = self.files(0);
3356            let mut visible_files = self.visible_files(0);
3357            for entry in self.entries_by_path.cursor::<(), ()>() {
3358                if entry.is_file() {
3359                    assert_eq!(files.next().unwrap().inode, entry.inode);
3360                    if !entry.is_ignored {
3361                        assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3362                    }
3363                }
3364            }
3365            assert!(files.next().is_none());
3366            assert!(visible_files.next().is_none());
3367
3368            let mut bfs_paths = Vec::new();
3369            let mut stack = vec![Path::new("")];
3370            while let Some(path) = stack.pop() {
3371                bfs_paths.push(path);
3372                let ix = stack.len();
3373                for child_entry in self.child_entries(path) {
3374                    stack.insert(ix, &child_entry.path);
3375                }
3376            }
3377
3378            let dfs_paths = self
3379                .entries_by_path
3380                .cursor::<(), ()>()
3381                .map(|e| e.path.as_ref())
3382                .collect::<Vec<_>>();
3383            assert_eq!(bfs_paths, dfs_paths);
3384
3385            for (ignore_parent_path, _) in &self.ignores {
3386                assert!(self.entry_for_path(ignore_parent_path).is_some());
3387                assert!(self
3388                    .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3389                    .is_some());
3390            }
3391        }
3392
3393        fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3394            let mut paths = Vec::new();
3395            for entry in self.entries_by_path.cursor::<(), ()>() {
3396                paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3397            }
3398            paths.sort_by(|a, b| a.0.cmp(&b.0));
3399            paths
3400        }
3401    }
3402}