1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/git-bug/git-bug/entities/identity"
13 "github.com/git-bug/git-bug/entity"
14 "github.com/git-bug/git-bug/repository"
15)
16
17type Excerpt interface {
18 Id() entity.Id
19 setId(id entity.Id)
20}
21
22type CacheEntity interface {
23 Id() entity.Id
24 NeedCommit() bool
25 Lock()
26}
27
28type getUserIdentityFunc func() (*IdentityCache, error)
29
30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
32// functions from this package, but right now identities are not using that framework.
33type Actions[EntityT entity.Interface] struct {
34 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
35 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
36 Remove func(repo repository.ClockedRepo, id entity.Id) error
37 RemoveAll func(repo repository.ClockedRepo) error
38 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
39}
40
41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
42
43type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
44 repo repository.ClockedRepo
45 resolvers func() entity.Resolvers
46
47 getUserIdentity getUserIdentityFunc
48 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
49 makeExcerpt func(CacheT) ExcerptT
50 makeIndexData func(CacheT) []string
51 actions Actions[EntityT]
52
53 typename string
54 namespace string
55 version uint
56 maxLoaded int
57
58 mu sync.RWMutex
59 excerpts map[entity.Id]ExcerptT
60 cached map[entity.Id]CacheT
61 lru lruIdCache
62
63 muObservers sync.RWMutex
64 observers map[Observer]string // observer --> repo name
65}
66
67func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
68 repo repository.ClockedRepo,
69 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
70 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
71 makeExcerpt func(CacheT) ExcerptT,
72 makeIndexData func(CacheT) []string,
73 actions Actions[EntityT],
74 typename, namespace string,
75 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
76 return &SubCache[EntityT, ExcerptT, CacheT]{
77 repo: repo,
78 resolvers: resolvers,
79 getUserIdentity: getUserIdentity,
80 makeCached: makeCached,
81 makeExcerpt: makeExcerpt,
82 makeIndexData: makeIndexData,
83 actions: actions,
84 typename: typename,
85 namespace: namespace,
86 version: version,
87 maxLoaded: maxLoaded,
88 excerpts: make(map[entity.Id]ExcerptT),
89 cached: make(map[entity.Id]CacheT),
90 lru: newLRUIdCache(),
91 }
92}
93
94func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
95 return sc.typename
96}
97
98// Load will try to read from the disk the entity cache file
99func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
100 sc.mu.Lock()
101 defer sc.mu.Unlock()
102
103 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
104 if err != nil {
105 return err
106 }
107
108 aux := struct {
109 Version uint
110 Excerpts map[entity.Id]ExcerptT
111 }{}
112
113 decoder := gob.NewDecoder(f)
114 err = decoder.Decode(&aux)
115 if err != nil {
116 _ = f.Close()
117 return err
118 }
119
120 err = f.Close()
121 if err != nil {
122 return err
123 }
124
125 if aux.Version != sc.version {
126 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
127 }
128
129 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
130 // so we fix it here, which doubles as enforcing coherency.
131 for id, excerpt := range aux.Excerpts {
132 excerpt.setId(id)
133 }
134
135 sc.excerpts = aux.Excerpts
136
137 index, err := sc.repo.GetIndex(sc.namespace)
138 if err != nil {
139 return err
140 }
141
142 // simple heuristic to detect a mismatch between the index and the entities
143 count, err := index.DocCount()
144 if err != nil {
145 return err
146 }
147 if count != uint64(len(sc.excerpts)) {
148 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
149 }
150
151 // TODO: find a way to check lamport clocks
152
153 return nil
154}
155
156// Write will serialize on disk the entity cache file
157func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
158 sc.mu.RLock()
159 defer sc.mu.RUnlock()
160
161 var data bytes.Buffer
162
163 aux := struct {
164 Version uint
165 Excerpts map[entity.Id]ExcerptT
166 }{
167 Version: sc.version,
168 Excerpts: sc.excerpts,
169 }
170
171 encoder := gob.NewEncoder(&data)
172
173 err := encoder.Encode(aux)
174 if err != nil {
175 return err
176 }
177
178 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
179 if err != nil {
180 return err
181 }
182
183 _, err = f.Write(data.Bytes())
184 if err != nil {
185 _ = f.Close()
186 return err
187 }
188
189 return f.Close()
190}
191
192func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
193 // value chosen experimentally as giving the fasted indexing, while
194 // not driving the cache size on disk too high.
195 //
196 // | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
197 // |:----------:|:-------------:|:------------:|:--------:|
198 // | 10 | 24 | 84 | 1,59 |
199 // | 30 | 26 | 84 | 1,388 |
200 // | 50 | 26 | 84 | 1,44 |
201 // | 60 | 26 | 80 | 1,377 |
202 // | 68 | 27 | 80 | 1,385 |
203 // | 75 | 26 | 84 | 1,32 |
204 // | 80 | 26 | 80 | 1,37 |
205 // | 85 | 27 | 80 | 1,317 |
206 // | 100 | 26 | 80 | 1,455 |
207 // | 150 | 26 | 80 | 2,066 |
208 // | 200 | 28 | 80 | 2,885 |
209 // | 250 | 30 | 72 | 3,555 |
210 // | 300 | 31 | 72 | 4,787 |
211 // | 500 | 23 | 72 | 5,4 |
212 const maxBatchCount = 75
213
214 out := make(chan BuildEvent)
215
216 go func() {
217 defer close(out)
218
219 out <- BuildEvent{
220 Typename: sc.typename,
221 Event: BuildEventStarted,
222 }
223
224 sc.excerpts = make(map[entity.Id]ExcerptT)
225
226 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
227
228 index, err := sc.repo.GetIndex(sc.namespace)
229 if err != nil {
230 out <- BuildEvent{
231 Typename: sc.typename,
232 Err: err,
233 }
234 return
235 }
236
237 // wipe the index just to be sure
238 err = index.Clear()
239 if err != nil {
240 out <- BuildEvent{
241 Typename: sc.typename,
242 Err: err,
243 }
244 return
245 }
246
247 indexer, indexEnd := index.IndexBatch()
248 var batchCount int
249
250 for e := range allEntities {
251 if e.Err != nil {
252 out <- BuildEvent{
253 Typename: sc.typename,
254 Err: e.Err,
255 }
256 return
257 }
258
259 cached := sc.makeCached(e.Entity, sc.entityUpdated)
260 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
261 // might as well keep them in memory
262 sc.cached[e.Entity.Id()] = cached
263
264 indexData := sc.makeIndexData(cached)
265 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
266 out <- BuildEvent{
267 Typename: sc.typename,
268 Err: err,
269 }
270 return
271 }
272
273 batchCount++
274 if batchCount >= maxBatchCount {
275 err = indexEnd()
276 if err != nil {
277 out <- BuildEvent{
278 Typename: sc.typename,
279 Err: err,
280 }
281 return
282 }
283
284 indexer, indexEnd = index.IndexBatch()
285 batchCount = 0
286 }
287
288 out <- BuildEvent{
289 Typename: sc.typename,
290 Event: BuildEventProgress,
291 Progress: e.CurrentEntity,
292 Total: e.TotalEntities,
293 }
294 }
295
296 if batchCount > 0 {
297 err = indexEnd()
298 if err != nil {
299 out <- BuildEvent{
300 Typename: sc.typename,
301 Err: err,
302 }
303 return
304 }
305 }
306
307 err = sc.write()
308 if err != nil {
309 out <- BuildEvent{
310 Typename: sc.typename,
311 Err: err,
312 }
313 return
314 }
315
316 out <- BuildEvent{
317 Typename: sc.typename,
318 Event: BuildEventFinished,
319 }
320 }()
321
322 return out
323}
324
325func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
326 sc.maxLoaded = size
327 sc.evictIfNeeded()
328}
329
330func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
331 sc.mu.Lock()
332 defer sc.mu.Unlock()
333 sc.excerpts = nil
334 sc.cached = make(map[entity.Id]CacheT)
335 return nil
336}
337
338func (sc *SubCache[EntityT, ExcerptT, CacheT]) RegisterObserver(repoName string, observer Observer) {
339 sc.muObservers.Lock()
340 defer sc.muObservers.Unlock()
341 if sc.observers == nil {
342 sc.observers = make(map[Observer]string)
343 }
344 sc.observers[observer] = repoName
345}
346
347func (sc *SubCache[EntityT, ExcerptT, CacheT]) UnregisterObserver(observer Observer) {
348 sc.muObservers.Lock()
349 defer sc.muObservers.Unlock()
350 delete(sc.observers, observer)
351}
352
353// AllIds return all known bug ids
354func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
355 sc.mu.RLock()
356 defer sc.mu.RUnlock()
357
358 result := make([]entity.Id, len(sc.excerpts))
359
360 i := 0
361 for _, excerpt := range sc.excerpts {
362 result[i] = excerpt.Id()
363 i++
364 }
365
366 return result
367}
368
369// Resolve retrieve an entity matching the exact given id
370func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
371 sc.mu.RLock()
372 cached, ok := sc.cached[id]
373 if ok {
374 sc.lru.Get(id)
375 sc.mu.RUnlock()
376 return cached, nil
377 }
378 sc.mu.RUnlock()
379
380 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
381 if err != nil {
382 return *new(CacheT), err
383 }
384
385 cached = sc.makeCached(e, sc.entityUpdated)
386
387 sc.mu.Lock()
388 sc.cached[id] = cached
389 sc.lru.Add(id)
390 sc.mu.Unlock()
391
392 sc.evictIfNeeded()
393
394 return cached, nil
395}
396
397// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
398// entities match.
399func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
400 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
401 return excerpt.Id().HasPrefix(prefix)
402 })
403}
404
405func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
406 id, err := sc.resolveMatcher(f)
407 if err != nil {
408 return *new(CacheT), err
409 }
410 return sc.Resolve(id)
411}
412
413// ResolveExcerpt retrieves an Excerpt matching the exact given id
414func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
415 sc.mu.RLock()
416 defer sc.mu.RUnlock()
417
418 excerpt, ok := sc.excerpts[id]
419 if !ok {
420 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
421 }
422
423 return excerpt, nil
424}
425
426// ResolveExcerptPrefix retrieves an Excerpt matching an id prefix. It fails if multiple
427// entities match.
428func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
429 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
430 return excerpt.Id().HasPrefix(prefix)
431 })
432}
433
434// ResolveExcerptMatcher retrieves an Excerpt selected by the given matcher function.
435func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
436 id, err := sc.resolveMatcher(f)
437 if err != nil {
438 return *new(ExcerptT), err
439 }
440 return sc.ResolveExcerpt(id)
441}
442
443func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
444 sc.mu.RLock()
445 defer sc.mu.RUnlock()
446
447 // preallocate but empty
448 matching := make([]entity.Id, 0, 5)
449
450 for _, excerpt := range sc.excerpts {
451 if f(excerpt) {
452 matching = append(matching, excerpt.Id())
453 }
454 }
455
456 if len(matching) > 1 {
457 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
458 }
459
460 if len(matching) == 0 {
461 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
462 }
463
464 return matching[0], nil
465}
466
467func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
468 sc.mu.Lock()
469 if _, has := sc.cached[e.Id()]; has {
470 sc.mu.Unlock()
471 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
472 }
473
474 cached := sc.makeCached(e, sc.entityUpdated)
475 sc.cached[e.Id()] = cached
476 sc.lru.Add(e.Id())
477 sc.mu.Unlock()
478
479 sc.evictIfNeeded()
480
481 // force the write of the excerpt
482 err := sc.updateExcerptAndIndex(e.Id())
483 if err != nil {
484 return *new(CacheT), err
485 }
486
487 // defer to notify after the release of the mutex
488 defer sc.notifyObservers(EntityEventCreated, e.Id())
489
490 return cached, nil
491}
492
493func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
494 e, err := sc.ResolvePrefix(prefix)
495 if err != nil {
496 return err
497 }
498
499 sc.mu.Lock()
500
501 err = sc.actions.Remove(sc.repo, e.Id())
502 if err != nil {
503 sc.mu.Unlock()
504 return err
505 }
506
507 delete(sc.cached, e.Id())
508 delete(sc.excerpts, e.Id())
509 sc.lru.Remove(e.Id())
510
511 index, err := sc.repo.GetIndex(sc.namespace)
512 if err != nil {
513 sc.mu.Unlock()
514 return err
515 }
516
517 err = index.Remove(e.Id().String())
518 sc.mu.Unlock()
519 if err != nil {
520 return err
521 }
522
523 // defer to notify after the release of the mutex
524 defer sc.notifyObservers(EntityEventRemoved, e.Id())
525
526 return sc.write()
527}
528
529func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
530 sc.mu.Lock()
531
532 err := sc.actions.RemoveAll(sc.repo)
533 if err != nil {
534 sc.mu.Unlock()
535 return err
536 }
537
538 ids := make(map[entity.Id]struct{})
539
540 for id, _ := range sc.cached {
541 delete(sc.cached, id)
542 sc.lru.Remove(id)
543 ids[id] = struct{}{}
544 }
545 for id, _ := range sc.excerpts {
546 delete(sc.excerpts, id)
547 ids[id] = struct{}{}
548 }
549
550 index, err := sc.repo.GetIndex(sc.namespace)
551 if err != nil {
552 sc.mu.Unlock()
553 return err
554 }
555
556 err = index.Clear()
557 sc.mu.Unlock()
558 if err != nil {
559 return err
560 }
561
562 // defer to notify after the release of the mutex
563 defer func() {
564 for id := range ids {
565 sc.notifyObservers(EntityEventRemoved, id)
566 }
567 }()
568
569 return sc.write()
570}
571
572func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
573 out := make(chan entity.MergeResult)
574
575 // Intercept merge results to update the cache properly
576 go func() {
577 defer close(out)
578
579 author, err := sc.getUserIdentity()
580 if err != nil {
581 out <- entity.NewMergeError(err, "")
582 return
583 }
584
585 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
586 for result := range results {
587 out <- result
588
589 if result.Err != nil {
590 continue
591 }
592
593 switch result.Status {
594 case entity.MergeStatusNew:
595 e := result.Entity.(EntityT)
596 cached := sc.makeCached(e, sc.entityUpdated)
597
598 sc.mu.Lock()
599 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
600 // might as well keep them in memory
601 sc.cached[result.Id] = cached
602 sc.mu.Unlock()
603 sc.notifyObservers(EntityEventCreated, result.Id)
604
605 case entity.MergeStatusUpdated:
606 // TODO: can that result in multiple copy of the same entity?
607 e := result.Entity.(EntityT)
608 cached := sc.makeCached(e, sc.entityUpdated)
609
610 sc.mu.Lock()
611 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
612 // might as well keep them in memory
613 sc.cached[result.Id] = cached
614 sc.mu.Unlock()
615 sc.notifyObservers(EntityEventUpdated, result.Id)
616 }
617 }
618
619 err = sc.write()
620 if err != nil {
621 out <- entity.NewMergeError(err, "")
622 return
623 }
624 }()
625
626 return out
627
628}
629
630// GetNamespace expose the namespace in git where entities are located.
631func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
632 return sc.namespace
633}
634
635// entityUpdated is a callback to trigger when the excerpt of an entity changed
636func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
637 sc.notifyObservers(EntityEventUpdated, id)
638 return sc.updateExcerptAndIndex(id)
639}
640
641// notifyObservers notifies all the observers when something happening for an entity
642func (sc *SubCache[EntityT, ExcerptT, CacheT]) notifyObservers(event EntityEventType, id entity.Id) {
643 sc.muObservers.RLock()
644 for observer, repoName := range sc.observers {
645 observer.EntityEvent(event, repoName, sc.typename, id)
646 }
647 sc.muObservers.RUnlock()
648}
649
650func (sc *SubCache[EntityT, ExcerptT, CacheT]) updateExcerptAndIndex(id entity.Id) error {
651 sc.mu.Lock()
652 e, ok := sc.cached[id]
653 if !ok {
654 sc.mu.Unlock()
655
656 // if the bug is not loaded at this point, it means it was loaded before
657 // but got evicted. Which means we potentially have multiple copies in
658 // memory and thus concurrent write.
659 // Failing immediately here is the simple and safe solution to avoid
660 // complicated data loss.
661 return errors.New("entity missing from cache")
662 }
663 sc.lru.Get(id)
664 sc.excerpts[id] = sc.makeExcerpt(e)
665 sc.mu.Unlock()
666
667 index, err := sc.repo.GetIndex(sc.namespace)
668 if err != nil {
669 return err
670 }
671
672 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
673 if err != nil {
674 return err
675 }
676
677 return sc.write()
678}
679
680// evictIfNeeded will evict an entity from the cache if needed
681func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
682 sc.mu.Lock()
683 defer sc.mu.Unlock()
684 if sc.lru.Len() <= sc.maxLoaded {
685 return
686 }
687
688 for _, id := range sc.lru.GetOldestToNewest() {
689 b := sc.cached[id]
690 if b.NeedCommit() {
691 continue
692 }
693
694 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
695 // if something tries to do it anyway, it will lock the program and make it obvious.
696 b.Lock()
697
698 sc.lru.Remove(id)
699 delete(sc.cached, id)
700
701 if sc.lru.Len() <= sc.maxLoaded {
702 return
703 }
704 }
705}