1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/git-bug/git-bug/entities/identity"
13 "github.com/git-bug/git-bug/entity"
14 "github.com/git-bug/git-bug/repository"
15)
16
17type Excerpt interface {
18 Id() entity.Id
19 setId(id entity.Id)
20}
21
22type CacheEntity interface {
23 Id() entity.Id
24 NeedCommit() bool
25 Lock()
26}
27
28type getUserIdentityFunc func() (*IdentityCache, error)
29
30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
32// functions from this package, but right now identities are not using that framework.
33type Actions[EntityT entity.Interface] struct {
34 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
35 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
36 Remove func(repo repository.ClockedRepo, id entity.Id) error
37 RemoveAll func(repo repository.ClockedRepo) error
38 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
39}
40
41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
42
43// SubCache provides caching management for one type of Entity.
44type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
45 repo repository.ClockedRepo
46 resolvers func() entity.Resolvers
47
48 getUserIdentity getUserIdentityFunc
49 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
50 makeExcerpt func(CacheT) ExcerptT
51 makeIndexData func(CacheT) []string
52 actions Actions[EntityT]
53
54 typename string
55 namespace string
56 version uint
57 maxLoaded int
58
59 mu sync.RWMutex
60 excerpts map[entity.Id]ExcerptT
61 cached map[entity.Id]CacheT
62 lru lruIdCache
63}
64
65func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
66 repo repository.ClockedRepo,
67 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
68 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
69 makeExcerpt func(CacheT) ExcerptT,
70 makeIndexData func(CacheT) []string,
71 actions Actions[EntityT],
72 typename, namespace string,
73 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
74 return &SubCache[EntityT, ExcerptT, CacheT]{
75 repo: repo,
76 resolvers: resolvers,
77 getUserIdentity: getUserIdentity,
78 makeCached: makeCached,
79 makeExcerpt: makeExcerpt,
80 makeIndexData: makeIndexData,
81 actions: actions,
82 typename: typename,
83 namespace: namespace,
84 version: version,
85 maxLoaded: maxLoaded,
86 excerpts: make(map[entity.Id]ExcerptT),
87 cached: make(map[entity.Id]CacheT),
88 lru: newLRUIdCache(),
89 }
90}
91
92func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
93 return sc.typename
94}
95
96// Load will try to read from the disk the entity cache file
97func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
98 sc.mu.Lock()
99 defer sc.mu.Unlock()
100
101 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
102 if err != nil {
103 return err
104 }
105
106 aux := struct {
107 Version uint
108 Excerpts map[entity.Id]ExcerptT
109 }{}
110
111 decoder := gob.NewDecoder(f)
112 err = decoder.Decode(&aux)
113 if err != nil {
114 _ = f.Close()
115 return err
116 }
117
118 err = f.Close()
119 if err != nil {
120 return err
121 }
122
123 if aux.Version != sc.version {
124 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
125 }
126
127 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
128 // so we fix it here, which doubles as enforcing coherency.
129 for id, excerpt := range aux.Excerpts {
130 excerpt.setId(id)
131 }
132
133 sc.excerpts = aux.Excerpts
134
135 index, err := sc.repo.GetIndex(sc.namespace)
136 if err != nil {
137 return err
138 }
139
140 // simple heuristic to detect a mismatch between the index and the entities
141 count, err := index.DocCount()
142 if err != nil {
143 return err
144 }
145 if count != uint64(len(sc.excerpts)) {
146 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
147 }
148
149 // TODO: find a way to check lamport clocks
150
151 return nil
152}
153
154// Write will serialize on disk the entity cache file
155func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
156 sc.mu.RLock()
157 defer sc.mu.RUnlock()
158
159 var data bytes.Buffer
160
161 aux := struct {
162 Version uint
163 Excerpts map[entity.Id]ExcerptT
164 }{
165 Version: sc.version,
166 Excerpts: sc.excerpts,
167 }
168
169 encoder := gob.NewEncoder(&data)
170
171 err := encoder.Encode(aux)
172 if err != nil {
173 return err
174 }
175
176 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
177 if err != nil {
178 return err
179 }
180
181 _, err = f.Write(data.Bytes())
182 if err != nil {
183 _ = f.Close()
184 return err
185 }
186
187 return f.Close()
188}
189
190func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
191 // value chosen experimentally as giving the fasted indexing, while
192 // not driving the cache size on disk too high.
193 //
194 // | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
195 // |:----------:|:-------------:|:------------:|:--------:|
196 // | 10 | 24 | 84 | 1,59 |
197 // | 30 | 26 | 84 | 1,388 |
198 // | 50 | 26 | 84 | 1,44 |
199 // | 60 | 26 | 80 | 1,377 |
200 // | 68 | 27 | 80 | 1,385 |
201 // | 75 | 26 | 84 | 1,32 |
202 // | 80 | 26 | 80 | 1,37 |
203 // | 85 | 27 | 80 | 1,317 |
204 // | 100 | 26 | 80 | 1,455 |
205 // | 150 | 26 | 80 | 2,066 |
206 // | 200 | 28 | 80 | 2,885 |
207 // | 250 | 30 | 72 | 3,555 |
208 // | 300 | 31 | 72 | 4,787 |
209 // | 500 | 23 | 72 | 5,4 |
210 const maxBatchCount = 75
211
212 out := make(chan BuildEvent)
213
214 go func() {
215 defer close(out)
216
217 out <- BuildEvent{
218 Typename: sc.typename,
219 Event: BuildEventStarted,
220 }
221
222 sc.excerpts = make(map[entity.Id]ExcerptT)
223
224 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
225
226 index, err := sc.repo.GetIndex(sc.namespace)
227 if err != nil {
228 out <- BuildEvent{
229 Typename: sc.typename,
230 Err: err,
231 }
232 return
233 }
234
235 // wipe the index just to be sure
236 err = index.Clear()
237 if err != nil {
238 out <- BuildEvent{
239 Typename: sc.typename,
240 Err: err,
241 }
242 return
243 }
244
245 indexer, indexEnd := index.IndexBatch()
246 var batchCount int
247
248 for e := range allEntities {
249 if e.Err != nil {
250 out <- BuildEvent{
251 Typename: sc.typename,
252 Err: e.Err,
253 }
254 return
255 }
256
257 cached := sc.makeCached(e.Entity, sc.entityUpdated)
258 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
259 // might as well keep them in memory
260 sc.cached[e.Entity.Id()] = cached
261
262 indexData := sc.makeIndexData(cached)
263 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
264 out <- BuildEvent{
265 Typename: sc.typename,
266 Err: err,
267 }
268 return
269 }
270
271 batchCount++
272 if batchCount >= maxBatchCount {
273 err = indexEnd()
274 if err != nil {
275 out <- BuildEvent{
276 Typename: sc.typename,
277 Err: err,
278 }
279 return
280 }
281
282 indexer, indexEnd = index.IndexBatch()
283 batchCount = 0
284 }
285
286 out <- BuildEvent{
287 Typename: sc.typename,
288 Event: BuildEventProgress,
289 Progress: e.CurrentEntity,
290 Total: e.TotalEntities,
291 }
292 }
293
294 if batchCount > 0 {
295 err = indexEnd()
296 if err != nil {
297 out <- BuildEvent{
298 Typename: sc.typename,
299 Err: err,
300 }
301 return
302 }
303 }
304
305 err = sc.write()
306 if err != nil {
307 out <- BuildEvent{
308 Typename: sc.typename,
309 Err: err,
310 }
311 return
312 }
313
314 out <- BuildEvent{
315 Typename: sc.typename,
316 Event: BuildEventFinished,
317 }
318 }()
319
320 return out
321}
322
323func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
324 sc.maxLoaded = size
325 sc.evictIfNeeded()
326}
327
328func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
329 sc.mu.Lock()
330 defer sc.mu.Unlock()
331 sc.excerpts = nil
332 sc.cached = make(map[entity.Id]CacheT)
333 return nil
334}
335
336// AllIds return all known bug ids
337func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
338 sc.mu.RLock()
339 defer sc.mu.RUnlock()
340
341 result := make([]entity.Id, len(sc.excerpts))
342
343 i := 0
344 for _, excerpt := range sc.excerpts {
345 result[i] = excerpt.Id()
346 i++
347 }
348
349 return result
350}
351
352// Resolve retrieves an entity matching the exact given id
353func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
354 sc.mu.RLock()
355 cached, ok := sc.cached[id]
356 if ok {
357 sc.lru.Get(id)
358 sc.mu.RUnlock()
359 return cached, nil
360 }
361 sc.mu.RUnlock()
362
363 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
364 if err != nil {
365 return *new(CacheT), err
366 }
367
368 cached = sc.makeCached(e, sc.entityUpdated)
369
370 sc.mu.Lock()
371 sc.cached[id] = cached
372 sc.lru.Add(id)
373 sc.mu.Unlock()
374
375 sc.evictIfNeeded()
376
377 return cached, nil
378}
379
380// ResolvePrefix retrieves an entity matching an id prefix. It fails if multiple entities match.
381func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
382 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
383 return excerpt.Id().HasPrefix(prefix)
384 })
385}
386
387// ResolveMatcher retrieves an entity matching the given matched. It fails if multiple entities match.
388func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
389 id, err := sc.resolveMatcher(f)
390 if err != nil {
391 return *new(CacheT), err
392 }
393 return sc.Resolve(id)
394}
395
396// ResolveExcerpt retrieve an Excerpt matching the exact given id
397func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
398 sc.mu.RLock()
399 defer sc.mu.RUnlock()
400
401 excerpt, ok := sc.excerpts[id]
402 if !ok {
403 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
404 }
405
406 return excerpt, nil
407}
408
409// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple entities match.
410func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
411 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
412 return excerpt.Id().HasPrefix(prefix)
413 })
414}
415
416// ResolveExcerptMatcher retrieve an Excerpt matching a given matcher. It fails if multiple entities match.
417func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
418 id, err := sc.resolveMatcher(f)
419 if err != nil {
420 return *new(ExcerptT), err
421 }
422 return sc.ResolveExcerpt(id)
423}
424
425// QueryExcerptMatcher finds all the Excerpt matching the given matcher.
426func (sc *SubCache[EntityT, ExcerptT, CacheT]) QueryExcerptMatcher(f func(ExcerptT) bool) ([]ExcerptT, error) {
427 ids, err := sc.queryMatcher(f)
428 if err != nil {
429 return nil, err
430 }
431 res := make([]ExcerptT, len(ids))
432 for i, id := range ids {
433 res[i], err = sc.ResolveExcerpt(id)
434 if err != nil {
435 return nil, err
436 }
437 }
438 return res, nil
439}
440
441// resolveMatcher finds the id of the entity matching the given matcher. It fails if multiple entities match.
442func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
443 sc.mu.RLock()
444 defer sc.mu.RUnlock()
445
446 // preallocate but empty
447 matching := make([]entity.Id, 0, 5)
448
449 for _, excerpt := range sc.excerpts {
450 if f(excerpt) {
451 matching = append(matching, excerpt.Id())
452 }
453 }
454
455 if len(matching) > 1 {
456 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
457 }
458
459 if len(matching) == 0 {
460 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
461 }
462
463 return matching[0], nil
464}
465
466// queryMatcher find the ids of all the entities matching the given matcher.
467func (sc *SubCache[EntityT, ExcerptT, CacheT]) queryMatcher(f func(ExcerptT) bool) ([]entity.Id, error) {
468 // TODO: this might use some pagination, or better: a go1.23 iterator?
469
470 sc.mu.RLock()
471 defer sc.mu.RUnlock()
472
473 // preallocate but empty
474 matching := make([]entity.Id, 0, 5)
475
476 for _, excerpt := range sc.excerpts {
477 if f(excerpt) {
478 matching = append(matching, excerpt.Id())
479 }
480 }
481
482 return matching, nil
483}
484
485func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
486 sc.mu.Lock()
487 if _, has := sc.cached[e.Id()]; has {
488 sc.mu.Unlock()
489 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
490 }
491
492 cached := sc.makeCached(e, sc.entityUpdated)
493 sc.cached[e.Id()] = cached
494 sc.lru.Add(e.Id())
495 sc.mu.Unlock()
496
497 sc.evictIfNeeded()
498
499 // force the write of the excerpt
500 err := sc.entityUpdated(e.Id())
501 if err != nil {
502 return *new(CacheT), err
503 }
504
505 return cached, nil
506}
507
508func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
509 e, err := sc.ResolvePrefix(prefix)
510 if err != nil {
511 return err
512 }
513
514 sc.mu.Lock()
515
516 err = sc.actions.Remove(sc.repo, e.Id())
517 if err != nil {
518 sc.mu.Unlock()
519 return err
520 }
521
522 delete(sc.cached, e.Id())
523 delete(sc.excerpts, e.Id())
524 sc.lru.Remove(e.Id())
525
526 index, err := sc.repo.GetIndex(sc.namespace)
527 if err != nil {
528 sc.mu.Unlock()
529 return err
530 }
531
532 err = index.Remove(e.Id().String())
533 sc.mu.Unlock()
534 if err != nil {
535 return err
536 }
537
538 return sc.write()
539}
540
541func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
542 sc.mu.Lock()
543
544 err := sc.actions.RemoveAll(sc.repo)
545 if err != nil {
546 sc.mu.Unlock()
547 return err
548 }
549
550 for id, _ := range sc.cached {
551 delete(sc.cached, id)
552 sc.lru.Remove(id)
553 }
554 for id, _ := range sc.excerpts {
555 delete(sc.excerpts, id)
556 }
557
558 index, err := sc.repo.GetIndex(sc.namespace)
559 if err != nil {
560 sc.mu.Unlock()
561 return err
562 }
563
564 err = index.Clear()
565 sc.mu.Unlock()
566 if err != nil {
567 return err
568 }
569
570 return sc.write()
571}
572
573func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
574 out := make(chan entity.MergeResult)
575
576 // Intercept merge results to update the cache properly
577 go func() {
578 defer close(out)
579
580 author, err := sc.getUserIdentity()
581 if err != nil {
582 out <- entity.NewMergeError(err, "")
583 return
584 }
585
586 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
587 for result := range results {
588 out <- result
589
590 if result.Err != nil {
591 continue
592 }
593
594 switch result.Status {
595 case entity.MergeStatusNew, entity.MergeStatusUpdated:
596 e := result.Entity.(EntityT)
597 cached := sc.makeCached(e, sc.entityUpdated)
598
599 sc.mu.Lock()
600 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
601 // might as well keep them in memory
602 sc.cached[result.Id] = cached
603 sc.mu.Unlock()
604 }
605 }
606
607 err = sc.write()
608 if err != nil {
609 out <- entity.NewMergeError(err, "")
610 return
611 }
612 }()
613
614 return out
615
616}
617
618func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
619 return sc.namespace
620}
621
622// entityUpdated is a callback to trigger when the excerpt of an entity changed
623func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
624 sc.mu.Lock()
625 e, ok := sc.cached[id]
626 if !ok {
627 sc.mu.Unlock()
628
629 // if the bug is not loaded at this point, it means it was loaded before
630 // but got evicted. Which means we potentially have multiple copies in
631 // memory and thus concurrent write.
632 // Failing immediately here is the simple and safe solution to avoid
633 // complicated data loss.
634 return errors.New("entity missing from cache")
635 }
636 sc.lru.Get(id)
637 // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
638 sc.excerpts[id] = sc.makeExcerpt(e)
639 sc.mu.Unlock()
640
641 index, err := sc.repo.GetIndex(sc.namespace)
642 if err != nil {
643 return err
644 }
645
646 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
647 if err != nil {
648 return err
649 }
650
651 return sc.write()
652}
653
654// evictIfNeeded will evict an entity from the cache if needed
655func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
656 sc.mu.Lock()
657 defer sc.mu.Unlock()
658 if sc.lru.Len() <= sc.maxLoaded {
659 return
660 }
661
662 for _, id := range sc.lru.GetOldestToNewest() {
663 b := sc.cached[id]
664 if b.NeedCommit() {
665 continue
666 }
667
668 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
669 // if something tries to do it anyway, it will lock the program and make it obvious.
670 b.Lock()
671
672 sc.lru.Remove(id)
673 delete(sc.cached, id)
674
675 if sc.lru.Len() <= sc.maxLoaded {
676 return
677 }
678 }
679}