training_data_uploader.rs

  1use std::collections::hash_map;
  2
  3use cloud_llm_client::{PredictEditsEvent, PredictEditsGitInfo, SerializedJson};
  4use collections::{HashMap, HashSet};
  5use fs::MTime;
  6use gpui::{AppContext as _, Context, Entity, EntityId, Task, WeakEntity};
  7use language::{Buffer, BufferEvent};
  8use project::{
  9    Project, ProjectEntryId, ProjectPath,
 10    buffer_store::{BufferStore, BufferStoreEvent},
 11    git_store::{GitStore, GitStoreEvent, Repository, RepositoryId},
 12    worktree_store::{WorktreeStore, WorktreeStoreEvent},
 13};
 14use uuid::Uuid;
 15
 16use crate::license_detection::LicenseDetectionWatcher;
 17
 18// todos:
 19//
 20// * Don't subscribe to all buffers
 21//
 22// * Currently MoveCursor event will only happen for edit prediction requests.
 23
 24pub struct TrainingDataUploader {
 25    projects: HashMap<EntityId, Entity<ZetaProject>>,
 26    _upload_task: Task<()>,
 27}
 28
 29struct ZetaProject {
 30    project: WeakEntity<Project>,
 31    repositories: HashMap<RepositoryId, Entity<ZetaRepository>>,
 32    buffers_changed: HashSet<WeakEntity<Buffer>>,
 33    project_entries_changed: HashSet<ProjectEntryId>,
 34}
 35
 36struct ZetaRepository {
 37    unsent_events: Vec<SerializedJson<PredictEditsEvent>>,
 38    pending_event: Option<PredictEditsEvent>,
 39    last_snapshot: Option<ZetaRepositorySnapshot>,
 40    license_watcher: LicenseDetectionWatcher,
 41}
 42
 43struct ZetaRepositorySnapshot {
 44    request_id: Uuid,
 45    git_info: PredictEditsGitInfo,
 46    buffers: HashMap<ProjectEntryId, ZetaBufferSnapshot>,
 47    files: HashMap<ProjectEntryId, ZetaFileSnapshot>,
 48}
 49
 50struct ZetaBufferSnapshot {
 51    path: ProjectPath,
 52    text: String,
 53    buffer: WeakEntity<Buffer>,
 54    version: clock::Global,
 55}
 56
 57struct ZetaFileSnapshot {
 58    path: ProjectPath,
 59    text: String,
 60    mtime: MTime,
 61}
 62
 63impl TrainingDataUploader {
 64    pub fn new(cx: &mut Context<Self>) -> Self {
 65        let _upload_task = cx.spawn(|this, cx| {
 66            loop {
 67                todo!();
 68            }
 69        });
 70        Self {
 71            projects: HashMap::default(),
 72            _upload_task,
 73        }
 74    }
 75
 76    fn register(&mut self, project: &Entity<Project>, path: ProjectPath, cx: &mut Context<Self>) {
 77        let project_entity_id = project.entity_id();
 78
 79        let zeta_project = match self.projects.entry(project_entity_id) {
 80            hash_map::Entry::Vacant(entry) => {
 81                let zeta_project = cx.new(|cx| ZetaProject::new(project, cx));
 82                cx.observe_release(project, move |this, project, cx| {
 83                    this.projects.remove(&project_entity_id);
 84                });
 85                entry.insert(zeta_project)
 86            }
 87            hash_map::Entry::Occupied(entry) => entry.into_mut(),
 88        };
 89
 90        // todo!
 91        // zeta_project.update(|zeta_project, cx| zeta_project.register(path, cx));
 92    }
 93}
 94
 95impl ZetaProject {
 96    pub fn new(project: &Entity<Project>, cx: &mut Context<Self>) -> Self {
 97        cx.subscribe(&project, Self::handle_project_event).detach();
 98        cx.subscribe(
 99            &project.read(cx).git_store().clone(),
100            Self::handle_git_store_event,
101        )
102        .detach();
103        cx.subscribe(
104            &project.read(cx).worktree_store(),
105            Self::handle_worktree_store_event,
106        )
107        .detach();
108
109        let buffer_store = project.read(cx).buffer_store().clone();
110        for buffer in buffer_store.read(cx).buffers().collect::<Vec<_>>() {
111            Self::register_buffer(&buffer, cx);
112        }
113        cx.subscribe(&buffer_store, Self::handle_buffer_store_event)
114            .detach();
115
116        Self {
117            project: project.downgrade(),
118            repositories: HashMap::default(),
119            buffers_changed: HashSet::default(),
120            project_entries_changed: HashSet::default(),
121        }
122    }
123
124    fn handle_git_store_event(
125        &mut self,
126        _git_store: Entity<GitStore>,
127        event: &GitStoreEvent,
128        cx: &mut Context<Self>,
129    ) {
130        use GitStoreEvent::*;
131        match event {
132            RepositoryRemoved(repository_id) => {
133                self.repositories.remove(&repository_id);
134            }
135            RepositoryAdded(repository_id) => {
136                self.repositories
137                    .insert(*repository_id, cx.new(|cx| ZetaRepository::new(cx)));
138            }
139            RepositoryUpdated(repository_id, event, is_active) => {}
140            ActiveRepositoryChanged { .. }
141            | IndexWriteError { .. }
142            | JobsUpdated
143            | ConflictsUpdated => {}
144        }
145    }
146
147    fn handle_worktree_store_event(
148        &mut self,
149        _worktree_store: Entity<WorktreeStore>,
150        event: &WorktreeStoreEvent,
151        cx: &mut Context<Self>,
152    ) {
153        use WorktreeStoreEvent::*;
154        match event {
155            WorktreeAdded(worktree) => {}
156            WorktreeRemoved(worktree_entity_id, worktree_id) => {}
157            WorktreeUpdatedEntries(worktree_id, updated_entries_set) => {
158                for (path, entry_id, _path_change) in updated_entries_set.iter() {
159                    self.project_entries_changed.insert(*entry_id);
160                }
161            }
162            WorktreeUpdatedGitRepositories(worktree_id, updated_git_repositories) => {}
163            WorktreeDeletedEntry(worktree_id, project_entry_id) => {}
164            WorktreeReleased { .. } | WorktreeOrderChanged | WorktreeUpdateSent { .. } => {}
165        }
166    }
167
168    fn handle_buffer_store_event(
169        &mut self,
170        _buffer_store: Entity<BufferStore>,
171        event: &BufferStoreEvent,
172        cx: &mut Context<Self>,
173    ) {
174        use BufferStoreEvent::*;
175        match event {
176            BufferAdded(buffer) => Self::register_buffer(buffer, cx),
177            BufferOpened { .. }
178            | BufferChangedFilePath { .. }
179            | BufferDropped { .. }
180            | SharedBufferClosed { .. } => {}
181        }
182    }
183
184    fn register_buffer(buffer: &Entity<Buffer>, cx: &mut Context<Self>) {
185        cx.subscribe(buffer, Self::handle_buffer_event);
186    }
187
188    fn handle_buffer_event(
189        &mut self,
190        buffer: Entity<Buffer>,
191        event: &BufferEvent,
192        _cx: &mut Context<Self>,
193    ) {
194        match event {
195            BufferEvent::Edited => {
196                self.buffers_changed.insert(buffer.downgrade());
197            }
198            _ => {}
199        }
200    }
201
202    fn handle_project_event(
203        &mut self,
204        _project: Entity<Project>,
205        event: &project::Event,
206        cx: &mut Context<Self>,
207    ) {
208        match event {
209            project::Event::ActiveEntryChanged(entry_id) => {
210                todo!()
211            }
212            _ => {}
213        }
214    }
215}
216
217impl ZetaRepository {
218    pub fn new(cx: &mut Context<Self>) -> Self {
219        Self {
220            unsent_events: Vec::new(),
221            pending_event: None,
222            last_snapshot: None,
223            license_watcher: LicenseDetectionWatcher::new(cx),
224        }
225    }
226}