subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/git-bug/git-bug/entities/identity"
 13	"github.com/git-bug/git-bug/entity"
 14	"github.com/git-bug/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19	setId(id entity.Id)
 20}
 21
 22type CacheEntity interface {
 23	Id() entity.Id
 24	NeedCommit() bool
 25	Lock()
 26}
 27
 28type getUserIdentityFunc func() (*IdentityCache, error)
 29
 30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 32// functions from this package, but right now identities are not using that framework.
 33type Actions[EntityT entity.Interface] struct {
 34	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 35	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 36	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 37	RemoveAll           func(repo repository.ClockedRepo) error
 38	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 39}
 40
 41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 42
 43// SubCache provides caching management for one type of Entity.
 44type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 45	repo      repository.ClockedRepo
 46	resolvers func() entity.Resolvers
 47
 48	getUserIdentity getUserIdentityFunc
 49	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 50	makeExcerpt     func(CacheT) ExcerptT
 51	makeIndexData   func(CacheT) []string
 52	actions         Actions[EntityT]
 53
 54	typename  string
 55	namespace string
 56	version   uint
 57	maxLoaded int
 58
 59	mu       sync.RWMutex
 60	excerpts map[entity.Id]ExcerptT
 61	cached   map[entity.Id]CacheT
 62	lru      lruIdCache
 63}
 64
 65func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 66	repo repository.ClockedRepo,
 67	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 68	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 69	makeExcerpt func(CacheT) ExcerptT,
 70	makeIndexData func(CacheT) []string,
 71	actions Actions[EntityT],
 72	typename, namespace string,
 73	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 74	return &SubCache[EntityT, ExcerptT, CacheT]{
 75		repo:            repo,
 76		resolvers:       resolvers,
 77		getUserIdentity: getUserIdentity,
 78		makeCached:      makeCached,
 79		makeExcerpt:     makeExcerpt,
 80		makeIndexData:   makeIndexData,
 81		actions:         actions,
 82		typename:        typename,
 83		namespace:       namespace,
 84		version:         version,
 85		maxLoaded:       maxLoaded,
 86		excerpts:        make(map[entity.Id]ExcerptT),
 87		cached:          make(map[entity.Id]CacheT),
 88		lru:             newLRUIdCache(),
 89	}
 90}
 91
 92func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 93	return sc.typename
 94}
 95
 96// Load will try to read from the disk the entity cache file
 97func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 98	sc.mu.Lock()
 99	defer sc.mu.Unlock()
100
101	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
102	if err != nil {
103		return err
104	}
105
106	aux := struct {
107		Version  uint
108		Excerpts map[entity.Id]ExcerptT
109	}{}
110
111	decoder := gob.NewDecoder(f)
112	err = decoder.Decode(&aux)
113	if err != nil {
114		_ = f.Close()
115		return err
116	}
117
118	err = f.Close()
119	if err != nil {
120		return err
121	}
122
123	if aux.Version != sc.version {
124		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
125	}
126
127	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
128	// so we fix it here, which doubles as enforcing coherency.
129	for id, excerpt := range aux.Excerpts {
130		excerpt.setId(id)
131	}
132
133	sc.excerpts = aux.Excerpts
134
135	index, err := sc.repo.GetIndex(sc.namespace)
136	if err != nil {
137		return err
138	}
139
140	// simple heuristic to detect a mismatch between the index and the entities
141	count, err := index.DocCount()
142	if err != nil {
143		return err
144	}
145	if count != uint64(len(sc.excerpts)) {
146		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
147	}
148
149	// TODO: find a way to check lamport clocks
150
151	return nil
152}
153
154// Write will serialize on disk the entity cache file
155func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
156	sc.mu.RLock()
157	defer sc.mu.RUnlock()
158
159	var data bytes.Buffer
160
161	aux := struct {
162		Version  uint
163		Excerpts map[entity.Id]ExcerptT
164	}{
165		Version:  sc.version,
166		Excerpts: sc.excerpts,
167	}
168
169	encoder := gob.NewEncoder(&data)
170
171	err := encoder.Encode(aux)
172	if err != nil {
173		return err
174	}
175
176	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
177	if err != nil {
178		return err
179	}
180
181	_, err = f.Write(data.Bytes())
182	if err != nil {
183		_ = f.Close()
184		return err
185	}
186
187	return f.Close()
188}
189
190func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
191	// value chosen experimentally as giving the fasted indexing, while
192	// not driving the cache size on disk too high.
193	//
194	// | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
195	// |:----------:|:-------------:|:------------:|:--------:|
196	// |     10     |      24       |      84      |   1,59   |
197	// |     30     |      26       |      84      |  1,388   |
198	// |     50     |      26       |      84      |   1,44   |
199	// |     60     |      26       |      80      |  1,377   |
200	// |     68     |      27       |      80      |  1,385   |
201	// |     75     |      26       |      84      |   1,32   |
202	// |     80     |      26       |      80      |   1,37   |
203	// |     85     |      27       |      80      |  1,317   |
204	// |    100     |      26       |      80      |  1,455   |
205	// |    150     |      26       |      80      |  2,066   |
206	// |    200     |      28       |      80      |  2,885   |
207	// |    250     |      30       |      72      |  3,555   |
208	// |    300     |      31       |      72      |  4,787   |
209	// |    500     |      23       |      72      |   5,4    |
210	const maxBatchCount = 75
211
212	out := make(chan BuildEvent)
213
214	go func() {
215		defer close(out)
216
217		out <- BuildEvent{
218			Typename: sc.typename,
219			Event:    BuildEventStarted,
220		}
221
222		sc.excerpts = make(map[entity.Id]ExcerptT)
223
224		allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
225
226		index, err := sc.repo.GetIndex(sc.namespace)
227		if err != nil {
228			out <- BuildEvent{
229				Typename: sc.typename,
230				Err:      err,
231			}
232			return
233		}
234
235		// wipe the index just to be sure
236		err = index.Clear()
237		if err != nil {
238			out <- BuildEvent{
239				Typename: sc.typename,
240				Err:      err,
241			}
242			return
243		}
244
245		indexer, indexEnd := index.IndexBatch()
246		var batchCount int
247
248		for e := range allEntities {
249			if e.Err != nil {
250				out <- BuildEvent{
251					Typename: sc.typename,
252					Err:      e.Err,
253				}
254				return
255			}
256
257			cached := sc.makeCached(e.Entity, sc.entityUpdated)
258			sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
259			// might as well keep them in memory
260			sc.cached[e.Entity.Id()] = cached
261
262			indexData := sc.makeIndexData(cached)
263			if err := indexer(e.Entity.Id().String(), indexData); err != nil {
264				out <- BuildEvent{
265					Typename: sc.typename,
266					Err:      err,
267				}
268				return
269			}
270
271			batchCount++
272			if batchCount >= maxBatchCount {
273				err = indexEnd()
274				if err != nil {
275					out <- BuildEvent{
276						Typename: sc.typename,
277						Err:      err,
278					}
279					return
280				}
281
282				indexer, indexEnd = index.IndexBatch()
283				batchCount = 0
284			}
285
286			out <- BuildEvent{
287				Typename: sc.typename,
288				Event:    BuildEventProgress,
289				Progress: e.CurrentEntity,
290				Total:    e.TotalEntities,
291			}
292		}
293
294		if batchCount > 0 {
295			err = indexEnd()
296			if err != nil {
297				out <- BuildEvent{
298					Typename: sc.typename,
299					Err:      err,
300				}
301				return
302			}
303		}
304
305		err = sc.write()
306		if err != nil {
307			out <- BuildEvent{
308				Typename: sc.typename,
309				Err:      err,
310			}
311			return
312		}
313
314		out <- BuildEvent{
315			Typename: sc.typename,
316			Event:    BuildEventFinished,
317		}
318	}()
319
320	return out
321}
322
323func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
324	sc.maxLoaded = size
325	sc.evictIfNeeded()
326}
327
328func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
329	sc.mu.Lock()
330	defer sc.mu.Unlock()
331	sc.excerpts = nil
332	sc.cached = make(map[entity.Id]CacheT)
333	return nil
334}
335
336// AllIds return all known bug ids
337func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
338	sc.mu.RLock()
339	defer sc.mu.RUnlock()
340
341	result := make([]entity.Id, len(sc.excerpts))
342
343	i := 0
344	for _, excerpt := range sc.excerpts {
345		result[i] = excerpt.Id()
346		i++
347	}
348
349	return result
350}
351
352// Resolve retrieves an entity matching the exact given id
353func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
354	sc.mu.RLock()
355	cached, ok := sc.cached[id]
356	if ok {
357		sc.lru.Get(id)
358		sc.mu.RUnlock()
359		return cached, nil
360	}
361	sc.mu.RUnlock()
362
363	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
364	if err != nil {
365		return *new(CacheT), err
366	}
367
368	cached = sc.makeCached(e, sc.entityUpdated)
369
370	sc.mu.Lock()
371	sc.cached[id] = cached
372	sc.lru.Add(id)
373	sc.mu.Unlock()
374
375	sc.evictIfNeeded()
376
377	return cached, nil
378}
379
380// ResolvePrefix retrieves an entity matching an id prefix. It fails if multiple entities match.
381func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
382	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
383		return excerpt.Id().HasPrefix(prefix)
384	})
385}
386
387// ResolveMatcher retrieves an entity matching the given matched. It fails if multiple entities match.
388func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
389	id, err := sc.resolveMatcher(f)
390	if err != nil {
391		return *new(CacheT), err
392	}
393	return sc.Resolve(id)
394}
395
396// ResolveExcerpt retrieve an Excerpt matching the exact given id
397func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
398	sc.mu.RLock()
399	defer sc.mu.RUnlock()
400
401	excerpt, ok := sc.excerpts[id]
402	if !ok {
403		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
404	}
405
406	return excerpt, nil
407}
408
409// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple entities match.
410func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
411	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
412		return excerpt.Id().HasPrefix(prefix)
413	})
414}
415
416// ResolveExcerptMatcher retrieve an Excerpt matching a given matcher. It fails if multiple entities match.
417func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
418	id, err := sc.resolveMatcher(f)
419	if err != nil {
420		return *new(ExcerptT), err
421	}
422	return sc.ResolveExcerpt(id)
423}
424
425// QueryExcerptMatcher finds all the Excerpt matching the given matcher.
426func (sc *SubCache[EntityT, ExcerptT, CacheT]) QueryExcerptMatcher(f func(ExcerptT) bool) ([]ExcerptT, error) {
427	ids, err := sc.queryMatcher(f)
428	if err != nil {
429		return nil, err
430	}
431	res := make([]ExcerptT, len(ids))
432	for i, id := range ids {
433		res[i], err = sc.ResolveExcerpt(id)
434		if err != nil {
435			return nil, err
436		}
437	}
438	return res, nil
439}
440
441// resolveMatcher finds the id of the entity matching the given matcher. It fails if multiple entities match.
442func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
443	sc.mu.RLock()
444	defer sc.mu.RUnlock()
445
446	// preallocate but empty
447	matching := make([]entity.Id, 0, 5)
448
449	for _, excerpt := range sc.excerpts {
450		if f(excerpt) {
451			matching = append(matching, excerpt.Id())
452		}
453	}
454
455	if len(matching) > 1 {
456		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
457	}
458
459	if len(matching) == 0 {
460		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
461	}
462
463	return matching[0], nil
464}
465
466// queryMatcher find the ids of all the entities matching the given matcher.
467func (sc *SubCache[EntityT, ExcerptT, CacheT]) queryMatcher(f func(ExcerptT) bool) ([]entity.Id, error) {
468	// TODO: this might use some pagination, or better: a go1.23 iterator?
469
470	sc.mu.RLock()
471	defer sc.mu.RUnlock()
472
473	// preallocate but empty
474	matching := make([]entity.Id, 0, 5)
475
476	for _, excerpt := range sc.excerpts {
477		if f(excerpt) {
478			matching = append(matching, excerpt.Id())
479		}
480	}
481
482	return matching, nil
483}
484
485func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
486	sc.mu.Lock()
487	if _, has := sc.cached[e.Id()]; has {
488		sc.mu.Unlock()
489		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
490	}
491
492	cached := sc.makeCached(e, sc.entityUpdated)
493	sc.cached[e.Id()] = cached
494	sc.lru.Add(e.Id())
495	sc.mu.Unlock()
496
497	sc.evictIfNeeded()
498
499	// force the write of the excerpt
500	err := sc.entityUpdated(e.Id())
501	if err != nil {
502		return *new(CacheT), err
503	}
504
505	return cached, nil
506}
507
508func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
509	e, err := sc.ResolvePrefix(prefix)
510	if err != nil {
511		return err
512	}
513
514	sc.mu.Lock()
515
516	err = sc.actions.Remove(sc.repo, e.Id())
517	if err != nil {
518		sc.mu.Unlock()
519		return err
520	}
521
522	delete(sc.cached, e.Id())
523	delete(sc.excerpts, e.Id())
524	sc.lru.Remove(e.Id())
525
526	index, err := sc.repo.GetIndex(sc.namespace)
527	if err != nil {
528		sc.mu.Unlock()
529		return err
530	}
531
532	err = index.Remove(e.Id().String())
533	sc.mu.Unlock()
534	if err != nil {
535		return err
536	}
537
538	return sc.write()
539}
540
541func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
542	sc.mu.Lock()
543
544	err := sc.actions.RemoveAll(sc.repo)
545	if err != nil {
546		sc.mu.Unlock()
547		return err
548	}
549
550	for id, _ := range sc.cached {
551		delete(sc.cached, id)
552		sc.lru.Remove(id)
553	}
554	for id, _ := range sc.excerpts {
555		delete(sc.excerpts, id)
556	}
557
558	index, err := sc.repo.GetIndex(sc.namespace)
559	if err != nil {
560		sc.mu.Unlock()
561		return err
562	}
563
564	err = index.Clear()
565	sc.mu.Unlock()
566	if err != nil {
567		return err
568	}
569
570	return sc.write()
571}
572
573func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
574	out := make(chan entity.MergeResult)
575
576	// Intercept merge results to update the cache properly
577	go func() {
578		defer close(out)
579
580		author, err := sc.getUserIdentity()
581		if err != nil {
582			out <- entity.NewMergeError(err, "")
583			return
584		}
585
586		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
587		for result := range results {
588			out <- result
589
590			if result.Err != nil {
591				continue
592			}
593
594			switch result.Status {
595			case entity.MergeStatusNew, entity.MergeStatusUpdated:
596				e := result.Entity.(EntityT)
597				cached := sc.makeCached(e, sc.entityUpdated)
598
599				sc.mu.Lock()
600				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
601				// might as well keep them in memory
602				sc.cached[result.Id] = cached
603				sc.mu.Unlock()
604			}
605		}
606
607		err = sc.write()
608		if err != nil {
609			out <- entity.NewMergeError(err, "")
610			return
611		}
612	}()
613
614	return out
615
616}
617
618func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
619	return sc.namespace
620}
621
622// entityUpdated is a callback to trigger when the excerpt of an entity changed
623func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
624	sc.mu.Lock()
625	e, ok := sc.cached[id]
626	if !ok {
627		sc.mu.Unlock()
628
629		// if the bug is not loaded at this point, it means it was loaded before
630		// but got evicted. Which means we potentially have multiple copies in
631		// memory and thus concurrent write.
632		// Failing immediately here is the simple and safe solution to avoid
633		// complicated data loss.
634		return errors.New("entity missing from cache")
635	}
636	sc.lru.Get(id)
637	// sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
638	sc.excerpts[id] = sc.makeExcerpt(e)
639	sc.mu.Unlock()
640
641	index, err := sc.repo.GetIndex(sc.namespace)
642	if err != nil {
643		return err
644	}
645
646	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
647	if err != nil {
648		return err
649	}
650
651	return sc.write()
652}
653
654// evictIfNeeded will evict an entity from the cache if needed
655func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
656	sc.mu.Lock()
657	defer sc.mu.Unlock()
658	if sc.lru.Len() <= sc.maxLoaded {
659		return
660	}
661
662	for _, id := range sc.lru.GetOldestToNewest() {
663		b := sc.cached[id]
664		if b.NeedCommit() {
665			continue
666		}
667
668		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
669		// if something tries to do it anyway, it will lock the program and make it obvious.
670		b.Lock()
671
672		sc.lru.Remove(id)
673		delete(sc.cached, id)
674
675		if sc.lru.Len() <= sc.maxLoaded {
676			return
677		}
678	}
679}