subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/git-bug/git-bug/entities/identity"
 13	"github.com/git-bug/git-bug/entity"
 14	"github.com/git-bug/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19	setId(id entity.Id)
 20}
 21
 22type CacheEntity interface {
 23	Id() entity.Id
 24	NeedCommit() bool
 25	Lock()
 26}
 27
 28type getUserIdentityFunc func() (*IdentityCache, error)
 29
 30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 32// functions from this package, but right now identities are not using that framework.
 33type Actions[EntityT entity.Interface] struct {
 34	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 35	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 36	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 37	RemoveAll           func(repo repository.ClockedRepo) error
 38	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 39}
 40
 41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 42
 43type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 44	repo      repository.ClockedRepo
 45	resolvers func() entity.Resolvers
 46
 47	getUserIdentity getUserIdentityFunc
 48	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 49	makeExcerpt     func(CacheT) ExcerptT
 50	makeIndexData   func(CacheT) []string
 51	actions         Actions[EntityT]
 52
 53	typename  string
 54	namespace string
 55	version   uint
 56	maxLoaded int
 57
 58	mu       sync.RWMutex
 59	excerpts map[entity.Id]ExcerptT
 60	cached   map[entity.Id]CacheT
 61	lru      lruIdCache
 62
 63	muObservers sync.RWMutex
 64	observers   map[Observer]struct{}
 65}
 66
 67func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 68	repo repository.ClockedRepo,
 69	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 70	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 71	makeExcerpt func(CacheT) ExcerptT,
 72	makeIndexData func(CacheT) []string,
 73	actions Actions[EntityT],
 74	typename, namespace string,
 75	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 76	return &SubCache[EntityT, ExcerptT, CacheT]{
 77		repo:            repo,
 78		resolvers:       resolvers,
 79		getUserIdentity: getUserIdentity,
 80		makeCached:      makeCached,
 81		makeExcerpt:     makeExcerpt,
 82		makeIndexData:   makeIndexData,
 83		actions:         actions,
 84		typename:        typename,
 85		namespace:       namespace,
 86		version:         version,
 87		maxLoaded:       maxLoaded,
 88		excerpts:        make(map[entity.Id]ExcerptT),
 89		cached:          make(map[entity.Id]CacheT),
 90		lru:             newLRUIdCache(),
 91	}
 92}
 93
 94func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 95	return sc.typename
 96}
 97
 98// Load will try to read from the disk the entity cache file
 99func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
100	sc.mu.Lock()
101	defer sc.mu.Unlock()
102
103	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
104	if err != nil {
105		return err
106	}
107
108	aux := struct {
109		Version  uint
110		Excerpts map[entity.Id]ExcerptT
111	}{}
112
113	decoder := gob.NewDecoder(f)
114	err = decoder.Decode(&aux)
115	if err != nil {
116		_ = f.Close()
117		return err
118	}
119
120	err = f.Close()
121	if err != nil {
122		return err
123	}
124
125	if aux.Version != sc.version {
126		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
127	}
128
129	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
130	// so we fix it here, which doubles as enforcing coherency.
131	for id, excerpt := range aux.Excerpts {
132		excerpt.setId(id)
133	}
134
135	sc.excerpts = aux.Excerpts
136
137	index, err := sc.repo.GetIndex(sc.namespace)
138	if err != nil {
139		return err
140	}
141
142	// simple heuristic to detect a mismatch between the index and the entities
143	count, err := index.DocCount()
144	if err != nil {
145		return err
146	}
147	if count != uint64(len(sc.excerpts)) {
148		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
149	}
150
151	// TODO: find a way to check lamport clocks
152
153	return nil
154}
155
156// Write will serialize on disk the entity cache file
157func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
158	sc.mu.RLock()
159	defer sc.mu.RUnlock()
160
161	var data bytes.Buffer
162
163	aux := struct {
164		Version  uint
165		Excerpts map[entity.Id]ExcerptT
166	}{
167		Version:  sc.version,
168		Excerpts: sc.excerpts,
169	}
170
171	encoder := gob.NewEncoder(&data)
172
173	err := encoder.Encode(aux)
174	if err != nil {
175		return err
176	}
177
178	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
179	if err != nil {
180		return err
181	}
182
183	_, err = f.Write(data.Bytes())
184	if err != nil {
185		_ = f.Close()
186		return err
187	}
188
189	return f.Close()
190}
191
192func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
193	// value chosen experimentally as giving the fasted indexing, while
194	// not driving the cache size on disk too high.
195	//
196	// | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
197	// |:----------:|:-------------:|:------------:|:--------:|
198	// |     10     |      24       |      84      |   1,59   |
199	// |     30     |      26       |      84      |  1,388   |
200	// |     50     |      26       |      84      |   1,44   |
201	// |     60     |      26       |      80      |  1,377   |
202	// |     68     |      27       |      80      |  1,385   |
203	// |     75     |      26       |      84      |   1,32   |
204	// |     80     |      26       |      80      |   1,37   |
205	// |     85     |      27       |      80      |  1,317   |
206	// |    100     |      26       |      80      |  1,455   |
207	// |    150     |      26       |      80      |  2,066   |
208	// |    200     |      28       |      80      |  2,885   |
209	// |    250     |      30       |      72      |  3,555   |
210	// |    300     |      31       |      72      |  4,787   |
211	// |    500     |      23       |      72      |   5,4    |
212	const maxBatchCount = 75
213
214	out := make(chan BuildEvent)
215
216	go func() {
217		defer close(out)
218
219		out <- BuildEvent{
220			Typename: sc.typename,
221			Event:    BuildEventStarted,
222		}
223
224		sc.excerpts = make(map[entity.Id]ExcerptT)
225
226		allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
227
228		index, err := sc.repo.GetIndex(sc.namespace)
229		if err != nil {
230			out <- BuildEvent{
231				Typename: sc.typename,
232				Err:      err,
233			}
234			return
235		}
236
237		// wipe the index just to be sure
238		err = index.Clear()
239		if err != nil {
240			out <- BuildEvent{
241				Typename: sc.typename,
242				Err:      err,
243			}
244			return
245		}
246
247		indexer, indexEnd := index.IndexBatch()
248		var batchCount int
249
250		for e := range allEntities {
251			if e.Err != nil {
252				out <- BuildEvent{
253					Typename: sc.typename,
254					Err:      e.Err,
255				}
256				return
257			}
258
259			cached := sc.makeCached(e.Entity, sc.entityUpdated)
260			sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
261			// might as well keep them in memory
262			sc.cached[e.Entity.Id()] = cached
263
264			indexData := sc.makeIndexData(cached)
265			if err := indexer(e.Entity.Id().String(), indexData); err != nil {
266				out <- BuildEvent{
267					Typename: sc.typename,
268					Err:      err,
269				}
270				return
271			}
272
273			batchCount++
274			if batchCount >= maxBatchCount {
275				err = indexEnd()
276				if err != nil {
277					out <- BuildEvent{
278						Typename: sc.typename,
279						Err:      err,
280					}
281					return
282				}
283
284				indexer, indexEnd = index.IndexBatch()
285				batchCount = 0
286			}
287
288			out <- BuildEvent{
289				Typename: sc.typename,
290				Event:    BuildEventProgress,
291				Progress: e.CurrentEntity,
292				Total:    e.TotalEntities,
293			}
294		}
295
296		if batchCount > 0 {
297			err = indexEnd()
298			if err != nil {
299				out <- BuildEvent{
300					Typename: sc.typename,
301					Err:      err,
302				}
303				return
304			}
305		}
306
307		err = sc.write()
308		if err != nil {
309			out <- BuildEvent{
310				Typename: sc.typename,
311				Err:      err,
312			}
313			return
314		}
315
316		out <- BuildEvent{
317			Typename: sc.typename,
318			Event:    BuildEventFinished,
319		}
320	}()
321
322	return out
323}
324
325func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
326	sc.maxLoaded = size
327	sc.evictIfNeeded()
328}
329
330func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
331	sc.mu.Lock()
332	defer sc.mu.Unlock()
333	sc.excerpts = nil
334	sc.cached = make(map[entity.Id]CacheT)
335	return nil
336}
337
338func (sc *SubCache[EntityT, ExcerptT, CacheT]) RegisterObserver(observer Observer) {
339	sc.muObservers.Lock()
340	defer sc.muObservers.Unlock()
341	sc.observers[observer] = struct{}{}
342}
343
344func (sc *SubCache[EntityT, ExcerptT, CacheT]) UnregisterObserver(observer Observer) {
345	sc.muObservers.Lock()
346	defer sc.muObservers.Unlock()
347	delete(sc.observers, observer)
348}
349
350// AllIds return all known bug ids
351func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
352	sc.mu.RLock()
353	defer sc.mu.RUnlock()
354
355	result := make([]entity.Id, len(sc.excerpts))
356
357	i := 0
358	for _, excerpt := range sc.excerpts {
359		result[i] = excerpt.Id()
360		i++
361	}
362
363	return result
364}
365
366// Resolve retrieve an entity matching the exact given id
367func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
368	sc.mu.RLock()
369	cached, ok := sc.cached[id]
370	if ok {
371		sc.lru.Get(id)
372		sc.mu.RUnlock()
373		return cached, nil
374	}
375	sc.mu.RUnlock()
376
377	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
378	if err != nil {
379		return *new(CacheT), err
380	}
381
382	cached = sc.makeCached(e, sc.entityUpdated)
383
384	sc.mu.Lock()
385	sc.cached[id] = cached
386	sc.lru.Add(id)
387	sc.mu.Unlock()
388
389	sc.evictIfNeeded()
390
391	return cached, nil
392}
393
394// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
395// entities match.
396func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
397	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
398		return excerpt.Id().HasPrefix(prefix)
399	})
400}
401
402func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
403	id, err := sc.resolveMatcher(f)
404	if err != nil {
405		return *new(CacheT), err
406	}
407	return sc.Resolve(id)
408}
409
410// ResolveExcerpt retrieve an Excerpt matching the exact given id
411func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
412	sc.mu.RLock()
413	defer sc.mu.RUnlock()
414
415	excerpt, ok := sc.excerpts[id]
416	if !ok {
417		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
418	}
419
420	return excerpt, nil
421}
422
423// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
424// entities match.
425func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
426	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
427		return excerpt.Id().HasPrefix(prefix)
428	})
429}
430
431func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
432	id, err := sc.resolveMatcher(f)
433	if err != nil {
434		return *new(ExcerptT), err
435	}
436	return sc.ResolveExcerpt(id)
437}
438
439func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
440	sc.mu.RLock()
441	defer sc.mu.RUnlock()
442
443	// preallocate but empty
444	matching := make([]entity.Id, 0, 5)
445
446	for _, excerpt := range sc.excerpts {
447		if f(excerpt) {
448			matching = append(matching, excerpt.Id())
449		}
450	}
451
452	if len(matching) > 1 {
453		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
454	}
455
456	if len(matching) == 0 {
457		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
458	}
459
460	return matching[0], nil
461}
462
463func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
464	sc.mu.Lock()
465	if _, has := sc.cached[e.Id()]; has {
466		sc.mu.Unlock()
467		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
468	}
469
470	cached := sc.makeCached(e, sc.entityUpdated)
471	sc.cached[e.Id()] = cached
472	sc.lru.Add(e.Id())
473	sc.mu.Unlock()
474
475	sc.evictIfNeeded()
476
477	// force the write of the excerpt
478	err := sc.entityCreated(e.Id())
479	if err != nil {
480		return *new(CacheT), err
481	}
482
483	return cached, nil
484}
485
486func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
487	e, err := sc.ResolvePrefix(prefix)
488	if err != nil {
489		return err
490	}
491
492	sc.mu.Lock()
493
494	err = sc.actions.Remove(sc.repo, e.Id())
495	if err != nil {
496		sc.mu.Unlock()
497		return err
498	}
499
500	delete(sc.cached, e.Id())
501	delete(sc.excerpts, e.Id())
502	sc.lru.Remove(e.Id())
503
504	index, err := sc.repo.GetIndex(sc.namespace)
505	if err != nil {
506		sc.mu.Unlock()
507		return err
508	}
509
510	err = index.Remove(e.Id().String())
511	sc.mu.Unlock()
512	if err != nil {
513		return err
514	}
515
516	return sc.write()
517}
518
519func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
520	sc.mu.Lock()
521
522	err := sc.actions.RemoveAll(sc.repo)
523	if err != nil {
524		sc.mu.Unlock()
525		return err
526	}
527
528	for id, _ := range sc.cached {
529		delete(sc.cached, id)
530		sc.lru.Remove(id)
531	}
532	for id, _ := range sc.excerpts {
533		delete(sc.excerpts, id)
534	}
535
536	index, err := sc.repo.GetIndex(sc.namespace)
537	if err != nil {
538		sc.mu.Unlock()
539		return err
540	}
541
542	err = index.Clear()
543	sc.mu.Unlock()
544	if err != nil {
545		return err
546	}
547
548	return sc.write()
549}
550
551func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
552	out := make(chan entity.MergeResult)
553
554	// Intercept merge results to update the cache properly
555	go func() {
556		defer close(out)
557
558		author, err := sc.getUserIdentity()
559		if err != nil {
560			out <- entity.NewMergeError(err, "")
561			return
562		}
563
564		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
565		for result := range results {
566			out <- result
567
568			if result.Err != nil {
569				continue
570			}
571
572			switch result.Status {
573			case entity.MergeStatusNew, entity.MergeStatusUpdated:
574				e := result.Entity.(EntityT)
575				cached := sc.makeCached(e, sc.entityUpdated)
576
577				sc.mu.Lock()
578				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
579				// might as well keep them in memory
580				sc.cached[result.Id] = cached
581				sc.mu.Unlock()
582			}
583		}
584
585		err = sc.write()
586		if err != nil {
587			out <- entity.NewMergeError(err, "")
588			return
589		}
590	}()
591
592	return out
593
594}
595
596func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
597	return sc.namespace
598}
599
600func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityCreated(id entity.Id) error {
601	sc.muObservers.RLock()
602	for observer := range sc.observers {
603		observer.EntityCreated(sc.typename, id)
604	}
605	sc.muObservers.RUnlock()
606
607	return sc.updateExcerptAndIndex(id)
608}
609
610// entityUpdated is a callback to trigger when the excerpt of an entity changed
611func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
612	sc.muObservers.RLock()
613	for observer := range sc.observers {
614		observer.EntityCreated(sc.typename, id)
615	}
616	sc.muObservers.RUnlock()
617
618	return sc.updateExcerptAndIndex(id)
619}
620
621func (sc *SubCache[EntityT, ExcerptT, CacheT]) updateExcerptAndIndex(id entity.Id) error {
622	sc.mu.Lock()
623	e, ok := sc.cached[id]
624	if !ok {
625		sc.mu.Unlock()
626
627		// if the bug is not loaded at this point, it means it was loaded before
628		// but got evicted. Which means we potentially have multiple copies in
629		// memory and thus concurrent write.
630		// Failing immediately here is the simple and safe solution to avoid
631		// complicated data loss.
632		return errors.New("entity missing from cache")
633	}
634	sc.lru.Get(id)
635	sc.excerpts[id] = sc.makeExcerpt(e)
636	sc.mu.Unlock()
637
638	index, err := sc.repo.GetIndex(sc.namespace)
639	if err != nil {
640		return err
641	}
642
643	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
644	if err != nil {
645		return err
646	}
647
648	return sc.write()
649}
650
651// evictIfNeeded will evict an entity from the cache if needed
652func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
653	sc.mu.Lock()
654	defer sc.mu.Unlock()
655	if sc.lru.Len() <= sc.maxLoaded {
656		return
657	}
658
659	for _, id := range sc.lru.GetOldestToNewest() {
660		b := sc.cached[id]
661		if b.NeedCommit() {
662			continue
663		}
664
665		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
666		// if something tries to do it anyway, it will lock the program and make it obvious.
667		b.Lock()
668
669		sc.lru.Remove(id)
670		delete(sc.cached, id)
671
672		if sc.lru.Len() <= sc.maxLoaded {
673			return
674		}
675	}
676}