subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/git-bug/git-bug/entities/identity"
 13	"github.com/git-bug/git-bug/entity"
 14	"github.com/git-bug/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19	setId(id entity.Id)
 20}
 21
 22type CacheEntity interface {
 23	Id() entity.Id
 24	NeedCommit() bool
 25	Lock()
 26}
 27
 28type getUserIdentityFunc func() (*IdentityCache, error)
 29
 30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 32// functions from this package, but right now identities are not using that framework.
 33type Actions[EntityT entity.Interface] struct {
 34	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 35	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 36	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 37	RemoveAll           func(repo repository.ClockedRepo) error
 38	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 39}
 40
 41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 42
 43type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 44	repo      repository.ClockedRepo
 45	resolvers func() entity.Resolvers
 46
 47	getUserIdentity getUserIdentityFunc
 48	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 49	makeExcerpt     func(CacheT) ExcerptT
 50	makeIndexData   func(CacheT) []string
 51	actions         Actions[EntityT]
 52
 53	typename  string
 54	namespace string
 55	version   uint
 56	maxLoaded int
 57
 58	mu       sync.RWMutex
 59	excerpts map[entity.Id]ExcerptT
 60	cached   map[entity.Id]CacheT
 61	lru      lruIdCache
 62
 63	muObservers sync.RWMutex
 64	observers   map[Observer]string // observer --> repo name
 65}
 66
 67func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 68	repo repository.ClockedRepo,
 69	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 70	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 71	makeExcerpt func(CacheT) ExcerptT,
 72	makeIndexData func(CacheT) []string,
 73	actions Actions[EntityT],
 74	typename, namespace string,
 75	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 76	return &SubCache[EntityT, ExcerptT, CacheT]{
 77		repo:            repo,
 78		resolvers:       resolvers,
 79		getUserIdentity: getUserIdentity,
 80		makeCached:      makeCached,
 81		makeExcerpt:     makeExcerpt,
 82		makeIndexData:   makeIndexData,
 83		actions:         actions,
 84		typename:        typename,
 85		namespace:       namespace,
 86		version:         version,
 87		maxLoaded:       maxLoaded,
 88		excerpts:        make(map[entity.Id]ExcerptT),
 89		cached:          make(map[entity.Id]CacheT),
 90		lru:             newLRUIdCache(),
 91	}
 92}
 93
 94func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 95	return sc.typename
 96}
 97
 98// Load will try to read from the disk the entity cache file
 99func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
100	sc.mu.Lock()
101	defer sc.mu.Unlock()
102
103	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
104	if err != nil {
105		return err
106	}
107
108	aux := struct {
109		Version  uint
110		Excerpts map[entity.Id]ExcerptT
111	}{}
112
113	decoder := gob.NewDecoder(f)
114	err = decoder.Decode(&aux)
115	if err != nil {
116		_ = f.Close()
117		return err
118	}
119
120	err = f.Close()
121	if err != nil {
122		return err
123	}
124
125	if aux.Version != sc.version {
126		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
127	}
128
129	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
130	// so we fix it here, which doubles as enforcing coherency.
131	for id, excerpt := range aux.Excerpts {
132		excerpt.setId(id)
133	}
134
135	sc.excerpts = aux.Excerpts
136
137	index, err := sc.repo.GetIndex(sc.namespace)
138	if err != nil {
139		return err
140	}
141
142	// simple heuristic to detect a mismatch between the index and the entities
143	count, err := index.DocCount()
144	if err != nil {
145		return err
146	}
147	if count != uint64(len(sc.excerpts)) {
148		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
149	}
150
151	// TODO: find a way to check lamport clocks
152
153	return nil
154}
155
156// Write will serialize on disk the entity cache file
157func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
158	sc.mu.RLock()
159	defer sc.mu.RUnlock()
160
161	var data bytes.Buffer
162
163	aux := struct {
164		Version  uint
165		Excerpts map[entity.Id]ExcerptT
166	}{
167		Version:  sc.version,
168		Excerpts: sc.excerpts,
169	}
170
171	encoder := gob.NewEncoder(&data)
172
173	err := encoder.Encode(aux)
174	if err != nil {
175		return err
176	}
177
178	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
179	if err != nil {
180		return err
181	}
182
183	_, err = f.Write(data.Bytes())
184	if err != nil {
185		_ = f.Close()
186		return err
187	}
188
189	return f.Close()
190}
191
192func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
193	// value chosen experimentally as giving the fasted indexing, while
194	// not driving the cache size on disk too high.
195	//
196	// | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
197	// |:----------:|:-------------:|:------------:|:--------:|
198	// |     10     |      24       |      84      |   1,59   |
199	// |     30     |      26       |      84      |  1,388   |
200	// |     50     |      26       |      84      |   1,44   |
201	// |     60     |      26       |      80      |  1,377   |
202	// |     68     |      27       |      80      |  1,385   |
203	// |     75     |      26       |      84      |   1,32   |
204	// |     80     |      26       |      80      |   1,37   |
205	// |     85     |      27       |      80      |  1,317   |
206	// |    100     |      26       |      80      |  1,455   |
207	// |    150     |      26       |      80      |  2,066   |
208	// |    200     |      28       |      80      |  2,885   |
209	// |    250     |      30       |      72      |  3,555   |
210	// |    300     |      31       |      72      |  4,787   |
211	// |    500     |      23       |      72      |   5,4    |
212	const maxBatchCount = 75
213
214	out := make(chan BuildEvent)
215
216	go func() {
217		defer close(out)
218
219		out <- BuildEvent{
220			Typename: sc.typename,
221			Event:    BuildEventStarted,
222		}
223
224		sc.excerpts = make(map[entity.Id]ExcerptT)
225
226		allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
227
228		index, err := sc.repo.GetIndex(sc.namespace)
229		if err != nil {
230			out <- BuildEvent{
231				Typename: sc.typename,
232				Err:      err,
233			}
234			return
235		}
236
237		// wipe the index just to be sure
238		err = index.Clear()
239		if err != nil {
240			out <- BuildEvent{
241				Typename: sc.typename,
242				Err:      err,
243			}
244			return
245		}
246
247		indexer, indexEnd := index.IndexBatch()
248		var batchCount int
249
250		for e := range allEntities {
251			if e.Err != nil {
252				out <- BuildEvent{
253					Typename: sc.typename,
254					Err:      e.Err,
255				}
256				return
257			}
258
259			cached := sc.makeCached(e.Entity, sc.entityUpdated)
260			sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
261			// might as well keep them in memory
262			sc.cached[e.Entity.Id()] = cached
263
264			indexData := sc.makeIndexData(cached)
265			if err := indexer(e.Entity.Id().String(), indexData); err != nil {
266				out <- BuildEvent{
267					Typename: sc.typename,
268					Err:      err,
269				}
270				return
271			}
272
273			batchCount++
274			if batchCount >= maxBatchCount {
275				err = indexEnd()
276				if err != nil {
277					out <- BuildEvent{
278						Typename: sc.typename,
279						Err:      err,
280					}
281					return
282				}
283
284				indexer, indexEnd = index.IndexBatch()
285				batchCount = 0
286			}
287
288			out <- BuildEvent{
289				Typename: sc.typename,
290				Event:    BuildEventProgress,
291				Progress: e.CurrentEntity,
292				Total:    e.TotalEntities,
293			}
294		}
295
296		if batchCount > 0 {
297			err = indexEnd()
298			if err != nil {
299				out <- BuildEvent{
300					Typename: sc.typename,
301					Err:      err,
302				}
303				return
304			}
305		}
306
307		err = sc.write()
308		if err != nil {
309			out <- BuildEvent{
310				Typename: sc.typename,
311				Err:      err,
312			}
313			return
314		}
315
316		out <- BuildEvent{
317			Typename: sc.typename,
318			Event:    BuildEventFinished,
319		}
320	}()
321
322	return out
323}
324
325func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
326	sc.maxLoaded = size
327	sc.evictIfNeeded()
328}
329
330func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
331	sc.mu.Lock()
332	defer sc.mu.Unlock()
333	sc.excerpts = nil
334	sc.cached = make(map[entity.Id]CacheT)
335	return nil
336}
337
338func (sc *SubCache[EntityT, ExcerptT, CacheT]) RegisterObserver(repoName string, observer Observer) {
339	sc.muObservers.Lock()
340	defer sc.muObservers.Unlock()
341	if sc.observers == nil {
342		sc.observers = make(map[Observer]string)
343	}
344	sc.observers[observer] = repoName
345}
346
347func (sc *SubCache[EntityT, ExcerptT, CacheT]) UnregisterObserver(observer Observer) {
348	sc.muObservers.Lock()
349	defer sc.muObservers.Unlock()
350	delete(sc.observers, observer)
351}
352
353// AllIds return all known bug ids
354func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
355	sc.mu.RLock()
356	defer sc.mu.RUnlock()
357
358	result := make([]entity.Id, len(sc.excerpts))
359
360	i := 0
361	for _, excerpt := range sc.excerpts {
362		result[i] = excerpt.Id()
363		i++
364	}
365
366	return result
367}
368
369// Resolve retrieve an entity matching the exact given id
370func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
371	sc.mu.RLock()
372	cached, ok := sc.cached[id]
373	if ok {
374		sc.lru.Get(id)
375		sc.mu.RUnlock()
376		return cached, nil
377	}
378	sc.mu.RUnlock()
379
380	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
381	if err != nil {
382		return *new(CacheT), err
383	}
384
385	cached = sc.makeCached(e, sc.entityUpdated)
386
387	sc.mu.Lock()
388	sc.cached[id] = cached
389	sc.lru.Add(id)
390	sc.mu.Unlock()
391
392	sc.evictIfNeeded()
393
394	return cached, nil
395}
396
397// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
398// entities match.
399func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
400	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
401		return excerpt.Id().HasPrefix(prefix)
402	})
403}
404
405func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
406	id, err := sc.resolveMatcher(f)
407	if err != nil {
408		return *new(CacheT), err
409	}
410	return sc.Resolve(id)
411}
412
413// ResolveExcerpt retrieves an Excerpt matching the exact given id
414func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
415	sc.mu.RLock()
416	defer sc.mu.RUnlock()
417
418	excerpt, ok := sc.excerpts[id]
419	if !ok {
420		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
421	}
422
423	return excerpt, nil
424}
425
426// ResolveExcerptPrefix retrieves an Excerpt matching an id prefix. It fails if multiple
427// entities match.
428func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
429	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
430		return excerpt.Id().HasPrefix(prefix)
431	})
432}
433
434// ResolveExcerptMatcher retrieves an Excerpt selected by the given matcher function.
435func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
436	id, err := sc.resolveMatcher(f)
437	if err != nil {
438		return *new(ExcerptT), err
439	}
440	return sc.ResolveExcerpt(id)
441}
442
443func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
444	sc.mu.RLock()
445	defer sc.mu.RUnlock()
446
447	// preallocate but empty
448	matching := make([]entity.Id, 0, 5)
449
450	for _, excerpt := range sc.excerpts {
451		if f(excerpt) {
452			matching = append(matching, excerpt.Id())
453		}
454	}
455
456	if len(matching) > 1 {
457		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
458	}
459
460	if len(matching) == 0 {
461		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
462	}
463
464	return matching[0], nil
465}
466
467func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
468	sc.mu.Lock()
469	if _, has := sc.cached[e.Id()]; has {
470		sc.mu.Unlock()
471		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
472	}
473
474	cached := sc.makeCached(e, sc.entityUpdated)
475	sc.cached[e.Id()] = cached
476	sc.lru.Add(e.Id())
477	sc.mu.Unlock()
478
479	sc.evictIfNeeded()
480
481	// force the write of the excerpt
482	err := sc.updateExcerptAndIndex(e.Id())
483	if err != nil {
484		return *new(CacheT), err
485	}
486
487	// defer to notify after the release of the mutex
488	defer sc.notifyObservers(EntityEventCreated, e.Id())
489
490	return cached, nil
491}
492
493func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
494	e, err := sc.ResolvePrefix(prefix)
495	if err != nil {
496		return err
497	}
498
499	sc.mu.Lock()
500
501	err = sc.actions.Remove(sc.repo, e.Id())
502	if err != nil {
503		sc.mu.Unlock()
504		return err
505	}
506
507	delete(sc.cached, e.Id())
508	delete(sc.excerpts, e.Id())
509	sc.lru.Remove(e.Id())
510
511	index, err := sc.repo.GetIndex(sc.namespace)
512	if err != nil {
513		sc.mu.Unlock()
514		return err
515	}
516
517	err = index.Remove(e.Id().String())
518	sc.mu.Unlock()
519	if err != nil {
520		return err
521	}
522
523	// defer to notify after the release of the mutex
524	defer sc.notifyObservers(EntityEventRemoved, e.Id())
525
526	return sc.write()
527}
528
529func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
530	sc.mu.Lock()
531
532	err := sc.actions.RemoveAll(sc.repo)
533	if err != nil {
534		sc.mu.Unlock()
535		return err
536	}
537
538	ids := make(map[entity.Id]struct{})
539
540	for id, _ := range sc.cached {
541		delete(sc.cached, id)
542		sc.lru.Remove(id)
543		ids[id] = struct{}{}
544	}
545	for id, _ := range sc.excerpts {
546		delete(sc.excerpts, id)
547		ids[id] = struct{}{}
548	}
549
550	index, err := sc.repo.GetIndex(sc.namespace)
551	if err != nil {
552		sc.mu.Unlock()
553		return err
554	}
555
556	err = index.Clear()
557	sc.mu.Unlock()
558	if err != nil {
559		return err
560	}
561
562	// defer to notify after the release of the mutex
563	defer func() {
564		for id := range ids {
565			sc.notifyObservers(EntityEventRemoved, id)
566		}
567	}()
568
569	return sc.write()
570}
571
572func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
573	out := make(chan entity.MergeResult)
574
575	// Intercept merge results to update the cache properly
576	go func() {
577		defer close(out)
578
579		author, err := sc.getUserIdentity()
580		if err != nil {
581			out <- entity.NewMergeError(err, "")
582			return
583		}
584
585		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
586		for result := range results {
587			out <- result
588
589			if result.Err != nil {
590				continue
591			}
592
593			switch result.Status {
594			case entity.MergeStatusNew:
595				e := result.Entity.(EntityT)
596				cached := sc.makeCached(e, sc.entityUpdated)
597
598				sc.mu.Lock()
599				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
600				// might as well keep them in memory
601				sc.cached[result.Id] = cached
602				sc.mu.Unlock()
603				sc.notifyObservers(EntityEventCreated, result.Id)
604
605			case entity.MergeStatusUpdated:
606				// TODO: can that result in multiple copy of the same entity?
607				e := result.Entity.(EntityT)
608				cached := sc.makeCached(e, sc.entityUpdated)
609
610				sc.mu.Lock()
611				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
612				// might as well keep them in memory
613				sc.cached[result.Id] = cached
614				sc.mu.Unlock()
615				sc.notifyObservers(EntityEventUpdated, result.Id)
616			}
617		}
618
619		err = sc.write()
620		if err != nil {
621			out <- entity.NewMergeError(err, "")
622			return
623		}
624	}()
625
626	return out
627
628}
629
630// GetNamespace expose the namespace in git where entities are located.
631func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
632	return sc.namespace
633}
634
635// entityUpdated is a callback to trigger when the excerpt of an entity changed
636func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
637	sc.notifyObservers(EntityEventUpdated, id)
638	return sc.updateExcerptAndIndex(id)
639}
640
641// notifyObservers notifies all the observers when something happening for an entity
642func (sc *SubCache[EntityT, ExcerptT, CacheT]) notifyObservers(event EntityEventType, id entity.Id) {
643	sc.muObservers.RLock()
644	for observer, repoName := range sc.observers {
645		observer.EntityEvent(event, repoName, sc.typename, id)
646	}
647	sc.muObservers.RUnlock()
648}
649
650func (sc *SubCache[EntityT, ExcerptT, CacheT]) updateExcerptAndIndex(id entity.Id) error {
651	sc.mu.Lock()
652	e, ok := sc.cached[id]
653	if !ok {
654		sc.mu.Unlock()
655
656		// if the bug is not loaded at this point, it means it was loaded before
657		// but got evicted. Which means we potentially have multiple copies in
658		// memory and thus concurrent write.
659		// Failing immediately here is the simple and safe solution to avoid
660		// complicated data loss.
661		return errors.New("entity missing from cache")
662	}
663	sc.lru.Get(id)
664	sc.excerpts[id] = sc.makeExcerpt(e)
665	sc.mu.Unlock()
666
667	index, err := sc.repo.GetIndex(sc.namespace)
668	if err != nil {
669		return err
670	}
671
672	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
673	if err != nil {
674		return err
675	}
676
677	return sc.write()
678}
679
680// evictIfNeeded will evict an entity from the cache if needed
681func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
682	sc.mu.Lock()
683	defer sc.mu.Unlock()
684	if sc.lru.Len() <= sc.maxLoaded {
685		return
686	}
687
688	for _, id := range sc.lru.GetOldestToNewest() {
689		b := sc.cached[id]
690		if b.NeedCommit() {
691			continue
692		}
693
694		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
695		// if something tries to do it anyway, it will lock the program and make it obvious.
696		b.Lock()
697
698		sc.lru.Remove(id)
699		delete(sc.cached, id)
700
701		if sc.lru.Len() <= sc.maxLoaded {
702			return
703		}
704	}
705}