1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entity"
13 "github.com/MichaelMure/git-bug/repository"
14)
15
16type Excerpt interface {
17 Id() entity.Id
18 setId(id entity.Id)
19}
20
21type CacheEntity interface {
22 Id() entity.Id
23 NeedCommit() bool
24 Lock()
25}
26
27type getUserIdentityFunc func() (*IdentityCache, error)
28
29// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
30// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
31// functions from this package, but right now identities are not using that framework.
32type Actions[EntityT entity.Interface] struct {
33 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
34 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
35 Remove func(repo repository.ClockedRepo, id entity.Id) error
36 RemoveAll func(repo repository.ClockedRepo) error
37 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor entity.Interface) <-chan entity.MergeResult
38}
39
40var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
41
42type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
43 repo repository.ClockedRepo
44 resolvers func() entity.Resolvers
45
46 getUserIdentity getUserIdentityFunc
47 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
48 makeExcerpt func(CacheT) ExcerptT
49 makeIndexData func(CacheT) []string
50 actions Actions[EntityT]
51
52 typename string
53 namespace string
54 version uint
55 maxLoaded int
56
57 mu sync.RWMutex
58 excerpts map[entity.Id]ExcerptT
59 cached map[entity.Id]CacheT
60 lru *lruIdCache
61}
62
63func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
64 repo repository.ClockedRepo,
65 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
66 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
67 makeExcerpt func(CacheT) ExcerptT,
68 makeIndexData func(CacheT) []string,
69 actions Actions[EntityT],
70 typename, namespace string,
71 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
72 return &SubCache[EntityT, ExcerptT, CacheT]{
73 repo: repo,
74 resolvers: resolvers,
75 getUserIdentity: getUserIdentity,
76 makeCached: makeCached,
77 makeExcerpt: makeExcerpt,
78 makeIndexData: makeIndexData,
79 actions: actions,
80 typename: typename,
81 namespace: namespace,
82 version: version,
83 maxLoaded: maxLoaded,
84 excerpts: make(map[entity.Id]ExcerptT),
85 cached: make(map[entity.Id]CacheT),
86 lru: newLRUIdCache(),
87 }
88}
89
90func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
91 return sc.typename
92}
93
94// Load will try to read from the disk the entity cache file
95func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
96 sc.mu.Lock()
97 defer sc.mu.Unlock()
98
99 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
100 if err != nil {
101 return err
102 }
103
104 aux := struct {
105 Version uint
106 Excerpts map[entity.Id]ExcerptT
107 }{}
108
109 decoder := gob.NewDecoder(f)
110 err = decoder.Decode(&aux)
111 if err != nil {
112 _ = f.Close()
113 return err
114 }
115
116 err = f.Close()
117 if err != nil {
118 return err
119 }
120
121 if aux.Version != sc.version {
122 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
123 }
124
125 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
126 // so we fix it here, which doubles as enforcing coherency.
127 for id, excerpt := range aux.Excerpts {
128 excerpt.setId(id)
129 }
130
131 sc.excerpts = aux.Excerpts
132
133 index, err := sc.repo.GetIndex(sc.namespace)
134 if err != nil {
135 return err
136 }
137
138 // simple heuristic to detect a mismatch between the index and the entities
139 count, err := index.DocCount()
140 if err != nil {
141 return err
142 }
143 if count != uint64(len(sc.excerpts)) {
144 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
145 }
146
147 // TODO: find a way to check lamport clocks
148
149 return nil
150}
151
152// Write will serialize on disk the entity cache file
153func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
154 sc.mu.RLock()
155 defer sc.mu.RUnlock()
156
157 var data bytes.Buffer
158
159 aux := struct {
160 Version uint
161 Excerpts map[entity.Id]ExcerptT
162 }{
163 Version: sc.version,
164 Excerpts: sc.excerpts,
165 }
166
167 encoder := gob.NewEncoder(&data)
168
169 err := encoder.Encode(aux)
170 if err != nil {
171 return err
172 }
173
174 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
175 if err != nil {
176 return err
177 }
178
179 _, err = f.Write(data.Bytes())
180 if err != nil {
181 _ = f.Close()
182 return err
183 }
184
185 return f.Close()
186}
187
188func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
189 // value chosen experimentally as giving the fasted indexing, while
190 // not driving the cache size on disk too high.
191 //
192 // | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
193 // |:----------:|:-------------:|:------------:|:--------:|
194 // | 10 | 24 | 84 | 1,59 |
195 // | 30 | 26 | 84 | 1,388 |
196 // | 50 | 26 | 84 | 1,44 |
197 // | 60 | 26 | 80 | 1,377 |
198 // | 68 | 27 | 80 | 1,385 |
199 // | 75 | 26 | 84 | 1,32 |
200 // | 80 | 26 | 80 | 1,37 |
201 // | 85 | 27 | 80 | 1,317 |
202 // | 100 | 26 | 80 | 1,455 |
203 // | 150 | 26 | 80 | 2,066 |
204 // | 200 | 28 | 80 | 2,885 |
205 // | 250 | 30 | 72 | 3,555 |
206 // | 300 | 31 | 72 | 4,787 |
207 // | 500 | 23 | 72 | 5,4 |
208 const maxBatchCount = 75
209
210 out := make(chan BuildEvent)
211
212 go func() {
213 defer close(out)
214
215 out <- BuildEvent{
216 Typename: sc.typename,
217 Event: BuildEventStarted,
218 }
219
220 sc.excerpts = make(map[entity.Id]ExcerptT)
221
222 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
223
224 index, err := sc.repo.GetIndex(sc.namespace)
225 if err != nil {
226 out <- BuildEvent{
227 Typename: sc.typename,
228 Err: err,
229 }
230 return
231 }
232
233 // wipe the index just to be sure
234 err = index.Clear()
235 if err != nil {
236 out <- BuildEvent{
237 Typename: sc.typename,
238 Err: err,
239 }
240 return
241 }
242
243 indexer, indexEnd := index.IndexBatch()
244 var batchCount int
245
246 for e := range allEntities {
247 if e.Err != nil {
248 out <- BuildEvent{
249 Typename: sc.typename,
250 Err: e.Err,
251 }
252 return
253 }
254
255 cached := sc.makeCached(e.Entity, sc.entityUpdated)
256 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
257 // might as well keep them in memory
258 sc.cached[e.Entity.Id()] = cached
259
260 indexData := sc.makeIndexData(cached)
261 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
262 out <- BuildEvent{
263 Typename: sc.typename,
264 Err: err,
265 }
266 return
267 }
268
269 batchCount++
270 if batchCount >= maxBatchCount {
271 err = indexEnd()
272 if err != nil {
273 out <- BuildEvent{
274 Typename: sc.typename,
275 Err: err,
276 }
277 return
278 }
279
280 indexer, indexEnd = index.IndexBatch()
281 batchCount = 0
282 }
283
284 out <- BuildEvent{
285 Typename: sc.typename,
286 Event: BuildEventProgress,
287 Progress: e.CurrentEntity,
288 Total: e.TotalEntities,
289 }
290 }
291
292 if batchCount > 0 {
293 err = indexEnd()
294 if err != nil {
295 out <- BuildEvent{
296 Typename: sc.typename,
297 Err: err,
298 }
299 return
300 }
301 }
302
303 err = sc.write()
304 if err != nil {
305 out <- BuildEvent{
306 Typename: sc.typename,
307 Err: err,
308 }
309 return
310 }
311
312 out <- BuildEvent{
313 Typename: sc.typename,
314 Event: BuildEventFinished,
315 }
316 }()
317
318 return out
319}
320
321func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
322 sc.maxLoaded = size
323 sc.evictIfNeeded()
324}
325
326func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
327 sc.mu.Lock()
328 defer sc.mu.Unlock()
329 sc.excerpts = nil
330 sc.cached = make(map[entity.Id]CacheT)
331 return nil
332}
333
334// AllIds return all known bug ids
335func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
336 sc.mu.RLock()
337 defer sc.mu.RUnlock()
338
339 result := make([]entity.Id, len(sc.excerpts))
340
341 i := 0
342 for _, excerpt := range sc.excerpts {
343 result[i] = excerpt.Id()
344 i++
345 }
346
347 return result
348}
349
350// Resolve retrieve an entity matching the exact given id
351func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
352 sc.mu.RLock()
353 cached, ok := sc.cached[id]
354 if ok {
355 sc.lru.Get(id)
356 sc.mu.RUnlock()
357 return cached, nil
358 }
359 sc.mu.RUnlock()
360
361 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
362 if err != nil {
363 return *new(CacheT), err
364 }
365
366 cached = sc.makeCached(e, sc.entityUpdated)
367
368 sc.mu.Lock()
369 sc.cached[id] = cached
370 sc.lru.Add(id)
371 sc.mu.Unlock()
372
373 sc.evictIfNeeded()
374
375 return cached, nil
376}
377
378// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
379// entity match.
380func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
381 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
382 return excerpt.Id().HasPrefix(prefix)
383 })
384}
385
386func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
387 id, err := sc.resolveMatcher(f)
388 if err != nil {
389 return *new(CacheT), err
390 }
391 return sc.Resolve(id)
392}
393
394// ResolveExcerpt retrieve an Excerpt matching the exact given id
395func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
396 sc.mu.RLock()
397 defer sc.mu.RUnlock()
398
399 excerpt, ok := sc.excerpts[id]
400 if !ok {
401 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
402 }
403
404 return excerpt, nil
405}
406
407// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
408// entity match.
409func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
410 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
411 return excerpt.Id().HasPrefix(prefix)
412 })
413}
414
415func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
416 id, err := sc.resolveMatcher(f)
417 if err != nil {
418 return *new(ExcerptT), err
419 }
420 return sc.ResolveExcerpt(id)
421}
422
423func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
424 sc.mu.RLock()
425 defer sc.mu.RUnlock()
426
427 // preallocate but empty
428 matching := make([]entity.Id, 0, 5)
429
430 for _, excerpt := range sc.excerpts {
431 if f(excerpt) {
432 matching = append(matching, excerpt.Id())
433 }
434 }
435
436 if len(matching) > 1 {
437 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
438 }
439
440 if len(matching) == 0 {
441 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
442 }
443
444 return matching[0], nil
445}
446
447func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
448 sc.mu.Lock()
449 if _, has := sc.cached[e.Id()]; has {
450 sc.mu.Unlock()
451 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
452 }
453
454 cached := sc.makeCached(e, sc.entityUpdated)
455 sc.cached[e.Id()] = cached
456 sc.lru.Add(e.Id())
457 sc.mu.Unlock()
458
459 sc.evictIfNeeded()
460
461 // force the write of the excerpt
462 err := sc.entityUpdated(e.Id())
463 if err != nil {
464 return *new(CacheT), err
465 }
466
467 return cached, nil
468}
469
470func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
471 e, err := sc.ResolvePrefix(prefix)
472 if err != nil {
473 return err
474 }
475
476 sc.mu.Lock()
477
478 err = sc.actions.Remove(sc.repo, e.Id())
479 if err != nil {
480 sc.mu.Unlock()
481 return err
482 }
483
484 delete(sc.cached, e.Id())
485 delete(sc.excerpts, e.Id())
486 sc.lru.Remove(e.Id())
487
488 index, err := sc.repo.GetIndex(sc.namespace)
489 if err != nil {
490 sc.mu.Unlock()
491 return err
492 }
493
494 err = index.Remove(e.Id().String())
495 sc.mu.Unlock()
496 if err != nil {
497 return err
498 }
499
500 return sc.write()
501}
502
503func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
504 sc.mu.Lock()
505
506 err := sc.actions.RemoveAll(sc.repo)
507 if err != nil {
508 sc.mu.Unlock()
509 return err
510 }
511
512 for id, _ := range sc.cached {
513 delete(sc.cached, id)
514 sc.lru.Remove(id)
515 }
516 for id, _ := range sc.excerpts {
517 delete(sc.excerpts, id)
518 }
519
520 index, err := sc.repo.GetIndex(sc.namespace)
521 if err != nil {
522 sc.mu.Unlock()
523 return err
524 }
525
526 err = index.Clear()
527 sc.mu.Unlock()
528 if err != nil {
529 return err
530 }
531
532 return sc.write()
533}
534
535func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
536 out := make(chan entity.MergeResult)
537
538 // Intercept merge results to update the cache properly
539 go func() {
540 defer close(out)
541
542 author, err := sc.getUserIdentity()
543 if err != nil {
544 out <- entity.NewMergeError(err, "")
545 return
546 }
547
548 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
549 for result := range results {
550 out <- result
551
552 if result.Err != nil {
553 continue
554 }
555
556 switch result.Status {
557 case entity.MergeStatusNew, entity.MergeStatusUpdated:
558 e := result.Entity.(EntityT)
559 cached := sc.makeCached(e, sc.entityUpdated)
560
561 sc.mu.Lock()
562 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
563 // might as well keep them in memory
564 sc.cached[result.Id] = cached
565 sc.mu.Unlock()
566 }
567 }
568
569 err = sc.write()
570 if err != nil {
571 out <- entity.NewMergeError(err, "")
572 return
573 }
574 }()
575
576 return out
577
578}
579
580func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
581 return sc.namespace
582}
583
584// entityUpdated is a callback to trigger when the excerpt of an entity changed
585func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
586 sc.mu.Lock()
587 e, ok := sc.cached[id]
588 if !ok {
589 sc.mu.Unlock()
590
591 // if the bug is not loaded at this point, it means it was loaded before
592 // but got evicted. Which means we potentially have multiple copies in
593 // memory and thus concurrent write.
594 // Failing immediately here is the simple and safe solution to avoid
595 // complicated data loss.
596 return errors.New("entity missing from cache")
597 }
598 sc.lru.Get(id)
599 sc.excerpts[id] = sc.makeExcerpt(e)
600 sc.mu.Unlock()
601
602 index, err := sc.repo.GetIndex(sc.namespace)
603 if err != nil {
604 return err
605 }
606
607 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
608 if err != nil {
609 return err
610 }
611
612 return sc.write()
613}
614
615// evictIfNeeded will evict an entity from the cache if needed
616func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
617 sc.mu.Lock()
618 defer sc.mu.Unlock()
619 if sc.lru.Len() <= sc.maxLoaded {
620 return
621 }
622
623 for _, id := range sc.lru.GetOldestToNewest() {
624 b := sc.cached[id]
625 if b.NeedCommit() {
626 continue
627 }
628
629 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
630 // if something try to do it anyway, it will lock the program and make it obvious.
631 b.Lock()
632
633 sc.lru.Remove(id)
634 delete(sc.cached, id)
635
636 if sc.lru.Len() <= sc.maxLoaded {
637 return
638 }
639 }
640}