1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "sync"
8
9 "github.com/pkg/errors"
10
11 "github.com/MichaelMure/git-bug/entities/identity"
12 "github.com/MichaelMure/git-bug/entity"
13 "github.com/MichaelMure/git-bug/repository"
14)
15
16type Excerpt interface {
17 Id() entity.Id
18 setId(id entity.Id)
19}
20
21type CacheEntity interface {
22 Id() entity.Id
23 NeedCommit() bool
24}
25
26type getUserIdentityFunc func() (*IdentityCache, error)
27
28// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
29// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
30// functions from this package, but right now identities are not using that framework.
31type Actions[EntityT entity.Interface] struct {
32 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
33 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
34 Remove func(repo repository.ClockedRepo, id entity.Id) error
35 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
36}
37
38var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
39
40type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
41 repo repository.ClockedRepo
42 resolvers func() entity.Resolvers
43
44 getUserIdentity getUserIdentityFunc
45 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
46 makeExcerpt func(CacheT) ExcerptT
47 makeIndexData func(CacheT) []string
48 actions Actions[EntityT]
49
50 typename string
51 namespace string
52 version uint
53 maxLoaded int
54
55 mu sync.RWMutex
56 excerpts map[entity.Id]ExcerptT
57 cached map[entity.Id]CacheT
58 lru *lruIdCache
59}
60
61func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
62 repo repository.ClockedRepo,
63 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
64 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
65 makeExcerpt func(CacheT) ExcerptT,
66 makeIndexData func(CacheT) []string,
67 actions Actions[EntityT],
68 typename, namespace string,
69 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
70 return &SubCache[EntityT, ExcerptT, CacheT]{
71 repo: repo,
72 resolvers: resolvers,
73 getUserIdentity: getUserIdentity,
74 makeCached: makeCached,
75 makeExcerpt: makeExcerpt,
76 makeIndexData: makeIndexData,
77 actions: actions,
78 typename: typename,
79 namespace: namespace,
80 version: version,
81 maxLoaded: maxLoaded,
82 excerpts: make(map[entity.Id]ExcerptT),
83 cached: make(map[entity.Id]CacheT),
84 lru: newLRUIdCache(),
85 }
86}
87
88func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
89 return sc.typename
90}
91
92// Load will try to read from the disk the entity cache file
93func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
94 sc.mu.Lock()
95 defer sc.mu.Unlock()
96
97 f, err := sc.repo.LocalStorage().Open(sc.namespace + "-file")
98 if err != nil {
99 return err
100 }
101
102 decoder := gob.NewDecoder(f)
103
104 aux := struct {
105 Version uint
106 Excerpts map[entity.Id]ExcerptT
107 }{}
108
109 err = decoder.Decode(&aux)
110 if err != nil {
111 return err
112 }
113
114 if aux.Version != sc.version {
115 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
116 }
117
118 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
119 // so we fix it here, which doubles as enforcing coherency.
120 for id, excerpt := range aux.Excerpts {
121 excerpt.setId(id)
122 }
123
124 sc.excerpts = aux.Excerpts
125
126 index, err := sc.repo.GetIndex(sc.typename)
127 if err != nil {
128 return err
129 }
130
131 // simple heuristic to detect a mismatch between the index and the entities
132 count, err := index.DocCount()
133 if err != nil {
134 return err
135 }
136 if count != uint64(len(sc.excerpts)) {
137 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
138 }
139
140 return nil
141}
142
143// Write will serialize on disk the entity cache file
144func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
145 sc.mu.RLock()
146 defer sc.mu.RUnlock()
147
148 var data bytes.Buffer
149
150 aux := struct {
151 Version uint
152 Excerpts map[entity.Id]ExcerptT
153 }{
154 Version: sc.version,
155 Excerpts: sc.excerpts,
156 }
157
158 encoder := gob.NewEncoder(&data)
159
160 err := encoder.Encode(aux)
161 if err != nil {
162 return err
163 }
164
165 f, err := sc.repo.LocalStorage().Create(sc.namespace + "-file")
166 if err != nil {
167 return err
168 }
169
170 _, err = f.Write(data.Bytes())
171 if err != nil {
172 return err
173 }
174
175 return f.Close()
176}
177
178func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
179 sc.excerpts = make(map[entity.Id]ExcerptT)
180
181 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
182
183 index, err := sc.repo.GetIndex(sc.typename)
184 if err != nil {
185 return err
186 }
187
188 // wipe the index just to be sure
189 err = index.Clear()
190 if err != nil {
191 return err
192 }
193
194 indexer, indexEnd := index.IndexBatch()
195
196 for e := range allEntities {
197 if e.Err != nil {
198 return e.Err
199 }
200
201 cached := sc.makeCached(e.Entity, sc.entityUpdated)
202 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
203 // might as well keep them in memory
204 sc.cached[e.Entity.Id()] = cached
205
206 indexData := sc.makeIndexData(cached)
207 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
208 return err
209 }
210 }
211
212 err = indexEnd()
213 if err != nil {
214 return err
215 }
216
217 err = sc.write()
218 if err != nil {
219 return err
220 }
221
222 return nil
223}
224
225func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
226 sc.maxLoaded = size
227 sc.evictIfNeeded()
228}
229
230func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
231 sc.mu.Lock()
232 defer sc.mu.Unlock()
233 sc.excerpts = nil
234 sc.cached = make(map[entity.Id]CacheT)
235 return nil
236}
237
238// AllIds return all known bug ids
239func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
240 sc.mu.RLock()
241 defer sc.mu.RUnlock()
242
243 result := make([]entity.Id, len(sc.excerpts))
244
245 i := 0
246 for _, excerpt := range sc.excerpts {
247 result[i] = excerpt.Id()
248 i++
249 }
250
251 return result
252}
253
254// Resolve retrieve an entity matching the exact given id
255func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
256 sc.mu.RLock()
257 cached, ok := sc.cached[id]
258 if ok {
259 sc.lru.Get(id)
260 sc.mu.RUnlock()
261 return cached, nil
262 }
263 sc.mu.RUnlock()
264
265 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
266 if err != nil {
267 return *new(CacheT), err
268 }
269
270 cached = sc.makeCached(e, sc.entityUpdated)
271
272 sc.mu.Lock()
273 sc.cached[id] = cached
274 sc.lru.Add(id)
275 sc.mu.Unlock()
276
277 sc.evictIfNeeded()
278
279 return cached, nil
280}
281
282// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
283// entity match.
284func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
285 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
286 return excerpt.Id().HasPrefix(prefix)
287 })
288}
289
290func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
291 id, err := sc.resolveMatcher(f)
292 if err != nil {
293 return *new(CacheT), err
294 }
295 return sc.Resolve(id)
296}
297
298// ResolveExcerpt retrieve an Excerpt matching the exact given id
299func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
300 sc.mu.RLock()
301 defer sc.mu.RUnlock()
302
303 excerpt, ok := sc.excerpts[id]
304 if !ok {
305 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
306 }
307
308 return excerpt, nil
309}
310
311// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
312// entity match.
313func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
314 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
315 return excerpt.Id().HasPrefix(prefix)
316 })
317}
318
319func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
320 id, err := sc.resolveMatcher(f)
321 if err != nil {
322 return *new(ExcerptT), err
323 }
324 return sc.ResolveExcerpt(id)
325}
326
327func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
328 sc.mu.RLock()
329 defer sc.mu.RUnlock()
330
331 // preallocate but empty
332 matching := make([]entity.Id, 0, 5)
333
334 for _, excerpt := range sc.excerpts {
335 if f(excerpt) {
336 matching = append(matching, excerpt.Id())
337 }
338 }
339
340 if len(matching) > 1 {
341 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
342 }
343
344 if len(matching) == 0 {
345 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
346 }
347
348 return matching[0], nil
349}
350
351func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
352 sc.mu.Lock()
353 if _, has := sc.cached[e.Id()]; has {
354 sc.mu.Unlock()
355 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
356 }
357
358 cached := sc.makeCached(e, sc.entityUpdated)
359 sc.cached[e.Id()] = cached
360 sc.lru.Add(e.Id())
361 sc.mu.Unlock()
362
363 sc.evictIfNeeded()
364
365 // force the write of the excerpt
366 err := sc.entityUpdated(e.Id())
367 if err != nil {
368 return *new(CacheT), err
369 }
370
371 return cached, nil
372}
373
374func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
375 e, err := sc.ResolvePrefix(prefix)
376 if err != nil {
377 return err
378 }
379
380 sc.mu.Lock()
381
382 err = sc.actions.Remove(sc.repo, e.Id())
383 if err != nil {
384 sc.mu.Unlock()
385 return err
386 }
387
388 delete(sc.cached, e.Id())
389 delete(sc.excerpts, e.Id())
390 sc.lru.Remove(e.Id())
391
392 sc.mu.Unlock()
393
394 return sc.write()
395}
396
397func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
398 out := make(chan entity.MergeResult)
399
400 // Intercept merge results to update the cache properly
401 go func() {
402 defer close(out)
403
404 author, err := sc.getUserIdentity()
405 if err != nil {
406 out <- entity.NewMergeError(err, "")
407 return
408 }
409
410 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
411 for result := range results {
412 out <- result
413
414 if result.Err != nil {
415 continue
416 }
417
418 switch result.Status {
419 case entity.MergeStatusNew, entity.MergeStatusUpdated:
420 e := result.Entity.(EntityT)
421 cached := sc.makeCached(e, sc.entityUpdated)
422
423 sc.mu.Lock()
424 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
425 // might as well keep them in memory
426 sc.cached[result.Id] = cached
427 sc.mu.Unlock()
428 }
429 }
430
431 err = sc.write()
432 if err != nil {
433 out <- entity.NewMergeError(err, "")
434 return
435 }
436 }()
437
438 return out
439
440}
441
442func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
443 return sc.namespace
444}
445
446// entityUpdated is a callback to trigger when the excerpt of an entity changed
447func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
448 sc.mu.Lock()
449 e, ok := sc.cached[id]
450 if !ok {
451 sc.mu.Unlock()
452
453 // if the bug is not loaded at this point, it means it was loaded before
454 // but got evicted. Which means we potentially have multiple copies in
455 // memory and thus concurrent write.
456 // Failing immediately here is the simple and safe solution to avoid
457 // complicated data loss.
458 return errors.New("entity missing from cache")
459 }
460 sc.lru.Get(id)
461 // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
462 sc.excerpts[id] = sc.makeExcerpt(e)
463 sc.mu.Unlock()
464
465 index, err := sc.repo.GetIndex(sc.typename)
466 if err != nil {
467 return err
468 }
469
470 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
471 if err != nil {
472 return err
473 }
474
475 return sc.write()
476}
477
478// evictIfNeeded will evict an entity from the cache if needed
479func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
480 sc.mu.Lock()
481 defer sc.mu.Unlock()
482 if sc.lru.Len() <= sc.maxLoaded {
483 return
484 }
485
486 for _, id := range sc.lru.GetOldestToNewest() {
487 b := sc.cached[id]
488 if b.NeedCommit() {
489 continue
490 }
491
492 // TODO
493 // b.Lock()
494 sc.lru.Remove(id)
495 delete(sc.cached, id)
496
497 if sc.lru.Len() <= sc.maxLoaded {
498 return
499 }
500 }
501}