1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 "github.com/MichaelMure/git-bug/repository"
15)
16
17type Excerpt interface {
18 Id() entity.Id
19 setId(id entity.Id)
20}
21
22type CacheEntity interface {
23 Id() entity.Id
24 NeedCommit() bool
25 Lock()
26}
27
28type getUserIdentityFunc func() (*IdentityCache, error)
29
30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
32// functions from this package, but right now identities are not using that framework.
33type Actions[EntityT entity.Interface] struct {
34 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
35 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
36 Remove func(repo repository.ClockedRepo, id entity.Id) error
37 RemoveAll func(repo repository.ClockedRepo) error
38 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
39}
40
41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
42
43type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
44 repo repository.ClockedRepo
45 resolvers func() entity.Resolvers
46
47 getUserIdentity getUserIdentityFunc
48 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
49 makeExcerpt func(CacheT) ExcerptT
50 makeIndexData func(CacheT) []string
51 actions Actions[EntityT]
52
53 typename string
54 namespace string
55 version uint
56 maxLoaded int
57
58 mu sync.RWMutex
59 excerpts map[entity.Id]ExcerptT
60 cached map[entity.Id]CacheT
61 lru *lruIdCache
62}
63
64func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
65 repo repository.ClockedRepo,
66 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
67 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
68 makeExcerpt func(CacheT) ExcerptT,
69 makeIndexData func(CacheT) []string,
70 actions Actions[EntityT],
71 typename, namespace string,
72 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
73 return &SubCache[EntityT, ExcerptT, CacheT]{
74 repo: repo,
75 resolvers: resolvers,
76 getUserIdentity: getUserIdentity,
77 makeCached: makeCached,
78 makeExcerpt: makeExcerpt,
79 makeIndexData: makeIndexData,
80 actions: actions,
81 typename: typename,
82 namespace: namespace,
83 version: version,
84 maxLoaded: maxLoaded,
85 excerpts: make(map[entity.Id]ExcerptT),
86 cached: make(map[entity.Id]CacheT),
87 lru: newLRUIdCache(),
88 }
89}
90
91func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
92 return sc.typename
93}
94
95// Load will try to read from the disk the entity cache file
96func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
97 sc.mu.Lock()
98 defer sc.mu.Unlock()
99
100 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
101 if err != nil {
102 return err
103 }
104
105 aux := struct {
106 Version uint
107 Excerpts map[entity.Id]ExcerptT
108 }{}
109
110 decoder := gob.NewDecoder(f)
111 err = decoder.Decode(&aux)
112 if err != nil {
113 _ = f.Close()
114 return err
115 }
116
117 err = f.Close()
118 if err != nil {
119 return err
120 }
121
122 if aux.Version != sc.version {
123 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
124 }
125
126 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
127 // so we fix it here, which doubles as enforcing coherency.
128 for id, excerpt := range aux.Excerpts {
129 excerpt.setId(id)
130 }
131
132 sc.excerpts = aux.Excerpts
133
134 index, err := sc.repo.GetIndex(sc.namespace)
135 if err != nil {
136 return err
137 }
138
139 // simple heuristic to detect a mismatch between the index and the entities
140 count, err := index.DocCount()
141 if err != nil {
142 return err
143 }
144 if count != uint64(len(sc.excerpts)) {
145 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
146 }
147
148 // TODO: find a way to check lamport clocks
149
150 return nil
151}
152
153// Write will serialize on disk the entity cache file
154func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
155 sc.mu.RLock()
156 defer sc.mu.RUnlock()
157
158 var data bytes.Buffer
159
160 aux := struct {
161 Version uint
162 Excerpts map[entity.Id]ExcerptT
163 }{
164 Version: sc.version,
165 Excerpts: sc.excerpts,
166 }
167
168 encoder := gob.NewEncoder(&data)
169
170 err := encoder.Encode(aux)
171 if err != nil {
172 return err
173 }
174
175 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
176 if err != nil {
177 return err
178 }
179
180 _, err = f.Write(data.Bytes())
181 if err != nil {
182 _ = f.Close()
183 return err
184 }
185
186 return f.Close()
187}
188
189func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
190 // value chosen experimentally as giving the fasted indexing, while
191 // not driving the cache size on disk too high.
192 //
193 // | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
194 // |:----------:|:-------------:|:------------:|:--------:|
195 // | 10 | 24 | 84 | 1,59 |
196 // | 30 | 26 | 84 | 1,388 |
197 // | 50 | 26 | 84 | 1,44 |
198 // | 60 | 26 | 80 | 1,377 |
199 // | 68 | 27 | 80 | 1,385 |
200 // | 75 | 26 | 84 | 1,32 |
201 // | 80 | 26 | 80 | 1,37 |
202 // | 85 | 27 | 80 | 1,317 |
203 // | 100 | 26 | 80 | 1,455 |
204 // | 150 | 26 | 80 | 2,066 |
205 // | 200 | 28 | 80 | 2,885 |
206 // | 250 | 30 | 72 | 3,555 |
207 // | 300 | 31 | 72 | 4,787 |
208 // | 500 | 23 | 72 | 5,4 |
209 const maxBatchCount = 75
210
211 out := make(chan BuildEvent)
212
213 go func() {
214 defer close(out)
215
216 out <- BuildEvent{
217 Typename: sc.typename,
218 Event: BuildEventStarted,
219 }
220
221 sc.excerpts = make(map[entity.Id]ExcerptT)
222
223 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
224
225 index, err := sc.repo.GetIndex(sc.namespace)
226 if err != nil {
227 out <- BuildEvent{
228 Typename: sc.typename,
229 Err: err,
230 }
231 return
232 }
233
234 // wipe the index just to be sure
235 err = index.Clear()
236 if err != nil {
237 out <- BuildEvent{
238 Typename: sc.typename,
239 Err: err,
240 }
241 return
242 }
243
244 indexer, indexEnd := index.IndexBatch()
245 var batchCount int
246
247 for e := range allEntities {
248 if e.Err != nil {
249 out <- BuildEvent{
250 Typename: sc.typename,
251 Err: e.Err,
252 }
253 return
254 }
255
256 cached := sc.makeCached(e.Entity, sc.entityUpdated)
257 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
258 // might as well keep them in memory
259 sc.cached[e.Entity.Id()] = cached
260
261 indexData := sc.makeIndexData(cached)
262 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
263 out <- BuildEvent{
264 Typename: sc.typename,
265 Err: err,
266 }
267 return
268 }
269
270 batchCount++
271 if batchCount >= maxBatchCount {
272 err = indexEnd()
273 if err != nil {
274 out <- BuildEvent{
275 Typename: sc.typename,
276 Err: err,
277 }
278 return
279 }
280
281 indexer, indexEnd = index.IndexBatch()
282 batchCount = 0
283 }
284
285 out <- BuildEvent{
286 Typename: sc.typename,
287 Event: BuildEventProgress,
288 Progress: e.CurrentEntity,
289 Total: e.TotalEntities,
290 }
291 }
292
293 if batchCount > 0 {
294 err = indexEnd()
295 if err != nil {
296 out <- BuildEvent{
297 Typename: sc.typename,
298 Err: err,
299 }
300 return
301 }
302 }
303
304 err = sc.write()
305 if err != nil {
306 out <- BuildEvent{
307 Typename: sc.typename,
308 Err: err,
309 }
310 return
311 }
312
313 out <- BuildEvent{
314 Typename: sc.typename,
315 Event: BuildEventFinished,
316 }
317 }()
318
319 return out
320}
321
322func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
323 sc.maxLoaded = size
324 sc.evictIfNeeded()
325}
326
327func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
328 sc.mu.Lock()
329 defer sc.mu.Unlock()
330 sc.excerpts = nil
331 sc.cached = make(map[entity.Id]CacheT)
332 return nil
333}
334
335// AllIds return all known bug ids
336func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
337 sc.mu.RLock()
338 defer sc.mu.RUnlock()
339
340 result := make([]entity.Id, len(sc.excerpts))
341
342 i := 0
343 for _, excerpt := range sc.excerpts {
344 result[i] = excerpt.Id()
345 i++
346 }
347
348 return result
349}
350
351// Resolve retrieve an entity matching the exact given id
352func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
353 sc.mu.RLock()
354 cached, ok := sc.cached[id]
355 if ok {
356 sc.lru.Get(id)
357 sc.mu.RUnlock()
358 return cached, nil
359 }
360 sc.mu.RUnlock()
361
362 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
363 if err != nil {
364 return *new(CacheT), err
365 }
366
367 cached = sc.makeCached(e, sc.entityUpdated)
368
369 sc.mu.Lock()
370 sc.cached[id] = cached
371 sc.lru.Add(id)
372 sc.mu.Unlock()
373
374 sc.evictIfNeeded()
375
376 return cached, nil
377}
378
379// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
380// entity match.
381func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
382 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
383 return excerpt.Id().HasPrefix(prefix)
384 })
385}
386
387func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
388 id, err := sc.resolveMatcher(f)
389 if err != nil {
390 return *new(CacheT), err
391 }
392 return sc.Resolve(id)
393}
394
395// ResolveExcerpt retrieve an Excerpt matching the exact given id
396func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
397 sc.mu.RLock()
398 defer sc.mu.RUnlock()
399
400 excerpt, ok := sc.excerpts[id]
401 if !ok {
402 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
403 }
404
405 return excerpt, nil
406}
407
408// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
409// entity match.
410func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
411 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
412 return excerpt.Id().HasPrefix(prefix)
413 })
414}
415
416func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
417 id, err := sc.resolveMatcher(f)
418 if err != nil {
419 return *new(ExcerptT), err
420 }
421 return sc.ResolveExcerpt(id)
422}
423
424func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
425 sc.mu.RLock()
426 defer sc.mu.RUnlock()
427
428 // preallocate but empty
429 matching := make([]entity.Id, 0, 5)
430
431 for _, excerpt := range sc.excerpts {
432 if f(excerpt) {
433 matching = append(matching, excerpt.Id())
434 }
435 }
436
437 if len(matching) > 1 {
438 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
439 }
440
441 if len(matching) == 0 {
442 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
443 }
444
445 return matching[0], nil
446}
447
448func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
449 sc.mu.Lock()
450 if _, has := sc.cached[e.Id()]; has {
451 sc.mu.Unlock()
452 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
453 }
454
455 cached := sc.makeCached(e, sc.entityUpdated)
456 sc.cached[e.Id()] = cached
457 sc.lru.Add(e.Id())
458 sc.mu.Unlock()
459
460 sc.evictIfNeeded()
461
462 // force the write of the excerpt
463 err := sc.entityUpdated(e.Id())
464 if err != nil {
465 return *new(CacheT), err
466 }
467
468 return cached, nil
469}
470
471func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
472 e, err := sc.ResolvePrefix(prefix)
473 if err != nil {
474 return err
475 }
476
477 sc.mu.Lock()
478
479 err = sc.actions.Remove(sc.repo, e.Id())
480 if err != nil {
481 sc.mu.Unlock()
482 return err
483 }
484
485 delete(sc.cached, e.Id())
486 delete(sc.excerpts, e.Id())
487 sc.lru.Remove(e.Id())
488
489 index, err := sc.repo.GetIndex(sc.namespace)
490 if err != nil {
491 sc.mu.Unlock()
492 return err
493 }
494
495 err = index.Remove(e.Id().String())
496 sc.mu.Unlock()
497 if err != nil {
498 return err
499 }
500
501 return sc.write()
502}
503
504func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
505 sc.mu.Lock()
506
507 err := sc.actions.RemoveAll(sc.repo)
508 if err != nil {
509 sc.mu.Unlock()
510 return err
511 }
512
513 for id, _ := range sc.cached {
514 delete(sc.cached, id)
515 sc.lru.Remove(id)
516 }
517 for id, _ := range sc.excerpts {
518 delete(sc.excerpts, id)
519 }
520
521 index, err := sc.repo.GetIndex(sc.namespace)
522 if err != nil {
523 sc.mu.Unlock()
524 return err
525 }
526
527 err = index.Clear()
528 sc.mu.Unlock()
529 if err != nil {
530 return err
531 }
532
533 return sc.write()
534}
535
536func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
537 out := make(chan entity.MergeResult)
538
539 // Intercept merge results to update the cache properly
540 go func() {
541 defer close(out)
542
543 author, err := sc.getUserIdentity()
544 if err != nil {
545 out <- entity.NewMergeError(err, "")
546 return
547 }
548
549 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
550 for result := range results {
551 out <- result
552
553 if result.Err != nil {
554 continue
555 }
556
557 switch result.Status {
558 case entity.MergeStatusNew, entity.MergeStatusUpdated:
559 e := result.Entity.(EntityT)
560 cached := sc.makeCached(e, sc.entityUpdated)
561
562 sc.mu.Lock()
563 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
564 // might as well keep them in memory
565 sc.cached[result.Id] = cached
566 sc.mu.Unlock()
567 }
568 }
569
570 err = sc.write()
571 if err != nil {
572 out <- entity.NewMergeError(err, "")
573 return
574 }
575 }()
576
577 return out
578
579}
580
581func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
582 return sc.namespace
583}
584
585// entityUpdated is a callback to trigger when the excerpt of an entity changed
586func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
587 sc.mu.Lock()
588 e, ok := sc.cached[id]
589 if !ok {
590 sc.mu.Unlock()
591
592 // if the bug is not loaded at this point, it means it was loaded before
593 // but got evicted. Which means we potentially have multiple copies in
594 // memory and thus concurrent write.
595 // Failing immediately here is the simple and safe solution to avoid
596 // complicated data loss.
597 return errors.New("entity missing from cache")
598 }
599 sc.lru.Get(id)
600 // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
601 sc.excerpts[id] = sc.makeExcerpt(e)
602 sc.mu.Unlock()
603
604 index, err := sc.repo.GetIndex(sc.namespace)
605 if err != nil {
606 return err
607 }
608
609 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
610 if err != nil {
611 return err
612 }
613
614 return sc.write()
615}
616
617// evictIfNeeded will evict an entity from the cache if needed
618func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
619 sc.mu.Lock()
620 defer sc.mu.Unlock()
621 if sc.lru.Len() <= sc.maxLoaded {
622 return
623 }
624
625 for _, id := range sc.lru.GetOldestToNewest() {
626 b := sc.cached[id]
627 if b.NeedCommit() {
628 continue
629 }
630
631 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
632 // if something try to do it anyway, it will lock the program and make it obvious.
633 b.Lock()
634
635 sc.lru.Remove(id)
636 delete(sc.cached, id)
637
638 if sc.lru.Len() <= sc.maxLoaded {
639 return
640 }
641 }
642}