subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entity"
 13	bootstrap "github.com/MichaelMure/git-bug/entity/boostrap"
 14	"github.com/MichaelMure/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19	setId(id entity.Id)
 20}
 21
 22type CacheEntity interface {
 23	Id() entity.Id
 24	NeedCommit() bool
 25	Lock()
 26}
 27
 28type getUserIdentityFunc func() (*IdentityCache, error)
 29
 30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 32// functions from this package, but right now identities are not using that framework.
 33type Actions[EntityT entity.Bare] struct {
 34	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 35	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan bootstrap.StreamedEntity[EntityT]
 36	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 37	RemoveAll           func(repo repository.ClockedRepo) error
 38	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor entity.Identity) <-chan entity.MergeResult
 39}
 40
 41var _ cacheMgmt = &SubCache[entity.Bare, Excerpt, CacheEntity]{}
 42
 43type SubCache[EntityT entity.Bare, ExcerptT Excerpt, CacheT CacheEntity] struct {
 44	repo      repository.ClockedRepo
 45	resolvers func() entity.Resolvers
 46
 47	getUserIdentity getUserIdentityFunc
 48	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 49	makeExcerpt     func(CacheT) ExcerptT
 50	makeIndexData   func(CacheT) []string
 51	actions         Actions[EntityT]
 52
 53	typename  string
 54	namespace string
 55	version   uint
 56	maxLoaded int
 57
 58	mu       sync.RWMutex
 59	excerpts map[entity.Id]ExcerptT
 60	cached   map[entity.Id]CacheT
 61	lru      *lruIdCache
 62}
 63
 64func NewSubCache[EntityT entity.Bare, ExcerptT Excerpt, CacheT CacheEntity](
 65	repo repository.ClockedRepo,
 66	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 67	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 68	makeExcerpt func(CacheT) ExcerptT,
 69	makeIndexData func(CacheT) []string,
 70	actions Actions[EntityT],
 71	typename, namespace string,
 72	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 73	return &SubCache[EntityT, ExcerptT, CacheT]{
 74		repo:            repo,
 75		resolvers:       resolvers,
 76		getUserIdentity: getUserIdentity,
 77		makeCached:      makeCached,
 78		makeExcerpt:     makeExcerpt,
 79		makeIndexData:   makeIndexData,
 80		actions:         actions,
 81		typename:        typename,
 82		namespace:       namespace,
 83		version:         version,
 84		maxLoaded:       maxLoaded,
 85		excerpts:        make(map[entity.Id]ExcerptT),
 86		cached:          make(map[entity.Id]CacheT),
 87		lru:             newLRUIdCache(),
 88	}
 89}
 90
 91func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 92	return sc.typename
 93}
 94
 95// Load will try to read from the disk the entity cache file
 96func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 97	sc.mu.Lock()
 98	defer sc.mu.Unlock()
 99
100	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
101	if err != nil {
102		return err
103	}
104
105	aux := struct {
106		Version  uint
107		Excerpts map[entity.Id]ExcerptT
108	}{}
109
110	decoder := gob.NewDecoder(f)
111	err = decoder.Decode(&aux)
112	if err != nil {
113		_ = f.Close()
114		return err
115	}
116
117	err = f.Close()
118	if err != nil {
119		return err
120	}
121
122	if aux.Version != sc.version {
123		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
124	}
125
126	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
127	// so we fix it here, which doubles as enforcing coherency.
128	for id, excerpt := range aux.Excerpts {
129		excerpt.setId(id)
130	}
131
132	sc.excerpts = aux.Excerpts
133
134	index, err := sc.repo.GetIndex(sc.namespace)
135	if err != nil {
136		return err
137	}
138
139	// simple heuristic to detect a mismatch between the index and the entities
140	count, err := index.DocCount()
141	if err != nil {
142		return err
143	}
144	if count != uint64(len(sc.excerpts)) {
145		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
146	}
147
148	// TODO: find a way to check lamport clocks
149
150	return nil
151}
152
153// Write will serialize on disk the entity cache file
154func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
155	sc.mu.RLock()
156	defer sc.mu.RUnlock()
157
158	var data bytes.Buffer
159
160	aux := struct {
161		Version  uint
162		Excerpts map[entity.Id]ExcerptT
163	}{
164		Version:  sc.version,
165		Excerpts: sc.excerpts,
166	}
167
168	encoder := gob.NewEncoder(&data)
169
170	err := encoder.Encode(aux)
171	if err != nil {
172		return err
173	}
174
175	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
176	if err != nil {
177		return err
178	}
179
180	_, err = f.Write(data.Bytes())
181	if err != nil {
182		_ = f.Close()
183		return err
184	}
185
186	return f.Close()
187}
188
189func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
190	// value chosen experimentally as giving the fasted indexing, while
191	// not driving the cache size on disk too high.
192	//
193	// | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
194	// |:----------:|:-------------:|:------------:|:--------:|
195	// |     10     |      24       |      84      |   1,59   |
196	// |     30     |      26       |      84      |  1,388   |
197	// |     50     |      26       |      84      |   1,44   |
198	// |     60     |      26       |      80      |  1,377   |
199	// |     68     |      27       |      80      |  1,385   |
200	// |     75     |      26       |      84      |   1,32   |
201	// |     80     |      26       |      80      |   1,37   |
202	// |     85     |      27       |      80      |  1,317   |
203	// |    100     |      26       |      80      |  1,455   |
204	// |    150     |      26       |      80      |  2,066   |
205	// |    200     |      28       |      80      |  2,885   |
206	// |    250     |      30       |      72      |  3,555   |
207	// |    300     |      31       |      72      |  4,787   |
208	// |    500     |      23       |      72      |   5,4    |
209	const maxBatchCount = 75
210
211	out := make(chan BuildEvent)
212
213	go func() {
214		defer close(out)
215
216		out <- BuildEvent{
217			Typename: sc.typename,
218			Event:    BuildEventStarted,
219		}
220
221		sc.excerpts = make(map[entity.Id]ExcerptT)
222
223		allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
224
225		index, err := sc.repo.GetIndex(sc.namespace)
226		if err != nil {
227			out <- BuildEvent{
228				Typename: sc.typename,
229				Err:      err,
230			}
231			return
232		}
233
234		// wipe the index just to be sure
235		err = index.Clear()
236		if err != nil {
237			out <- BuildEvent{
238				Typename: sc.typename,
239				Err:      err,
240			}
241			return
242		}
243
244		indexer, indexEnd := index.IndexBatch()
245		var batchCount int
246
247		for e := range allEntities {
248			if e.Err != nil {
249				out <- BuildEvent{
250					Typename: sc.typename,
251					Err:      e.Err,
252				}
253				return
254			}
255
256			cached := sc.makeCached(e.Entity, sc.entityUpdated)
257			sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
258			// might as well keep them in memory
259			sc.cached[e.Entity.Id()] = cached
260
261			indexData := sc.makeIndexData(cached)
262			if err := indexer(e.Entity.Id().String(), indexData); err != nil {
263				out <- BuildEvent{
264					Typename: sc.typename,
265					Err:      err,
266				}
267				return
268			}
269
270			batchCount++
271			if batchCount >= maxBatchCount {
272				err = indexEnd()
273				if err != nil {
274					out <- BuildEvent{
275						Typename: sc.typename,
276						Err:      err,
277					}
278					return
279				}
280
281				indexer, indexEnd = index.IndexBatch()
282				batchCount = 0
283			}
284
285			out <- BuildEvent{
286				Typename: sc.typename,
287				Event:    BuildEventProgress,
288				Progress: e.CurrentEntity,
289				Total:    e.TotalEntities,
290			}
291		}
292
293		if batchCount > 0 {
294			err = indexEnd()
295			if err != nil {
296				out <- BuildEvent{
297					Typename: sc.typename,
298					Err:      err,
299				}
300				return
301			}
302		}
303
304		err = sc.write()
305		if err != nil {
306			out <- BuildEvent{
307				Typename: sc.typename,
308				Err:      err,
309			}
310			return
311		}
312
313		out <- BuildEvent{
314			Typename: sc.typename,
315			Event:    BuildEventFinished,
316		}
317	}()
318
319	return out
320}
321
322func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
323	sc.maxLoaded = size
324	sc.evictIfNeeded()
325}
326
327func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
328	sc.mu.Lock()
329	defer sc.mu.Unlock()
330	sc.excerpts = nil
331	sc.cached = make(map[entity.Id]CacheT)
332	return nil
333}
334
335// AllIds return all known bug ids
336func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
337	sc.mu.RLock()
338	defer sc.mu.RUnlock()
339
340	result := make([]entity.Id, len(sc.excerpts))
341
342	i := 0
343	for _, excerpt := range sc.excerpts {
344		result[i] = excerpt.Id()
345		i++
346	}
347
348	return result
349}
350
351// Resolve retrieve an entity matching the exact given id
352func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
353	sc.mu.RLock()
354	cached, ok := sc.cached[id]
355	if ok {
356		sc.lru.Get(id)
357		sc.mu.RUnlock()
358		return cached, nil
359	}
360	sc.mu.RUnlock()
361
362	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
363	if err != nil {
364		return *new(CacheT), err
365	}
366
367	cached = sc.makeCached(e, sc.entityUpdated)
368
369	sc.mu.Lock()
370	sc.cached[id] = cached
371	sc.lru.Add(id)
372	sc.mu.Unlock()
373
374	sc.evictIfNeeded()
375
376	return cached, nil
377}
378
379// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
380// entity match.
381func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
382	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
383		return excerpt.Id().HasPrefix(prefix)
384	})
385}
386
387func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
388	id, err := sc.resolveMatcher(f)
389	if err != nil {
390		return *new(CacheT), err
391	}
392	return sc.Resolve(id)
393}
394
395// ResolveExcerpt retrieve an Excerpt matching the exact given id
396func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
397	sc.mu.RLock()
398	defer sc.mu.RUnlock()
399
400	excerpt, ok := sc.excerpts[id]
401	if !ok {
402		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
403	}
404
405	return excerpt, nil
406}
407
408// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
409// entity match.
410func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
411	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
412		return excerpt.Id().HasPrefix(prefix)
413	})
414}
415
416func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
417	id, err := sc.resolveMatcher(f)
418	if err != nil {
419		return *new(ExcerptT), err
420	}
421	return sc.ResolveExcerpt(id)
422}
423
424func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
425	sc.mu.RLock()
426	defer sc.mu.RUnlock()
427
428	// preallocate but empty
429	matching := make([]entity.Id, 0, 5)
430
431	for _, excerpt := range sc.excerpts {
432		if f(excerpt) {
433			matching = append(matching, excerpt.Id())
434		}
435	}
436
437	if len(matching) > 1 {
438		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
439	}
440
441	if len(matching) == 0 {
442		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
443	}
444
445	return matching[0], nil
446}
447
448func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
449	sc.mu.Lock()
450	if _, has := sc.cached[e.Id()]; has {
451		sc.mu.Unlock()
452		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
453	}
454
455	cached := sc.makeCached(e, sc.entityUpdated)
456	sc.cached[e.Id()] = cached
457	sc.lru.Add(e.Id())
458	sc.mu.Unlock()
459
460	sc.evictIfNeeded()
461
462	// force the write of the excerpt
463	err := sc.entityUpdated(e.Id())
464	if err != nil {
465		return *new(CacheT), err
466	}
467
468	return cached, nil
469}
470
471func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
472	e, err := sc.ResolvePrefix(prefix)
473	if err != nil {
474		return err
475	}
476
477	sc.mu.Lock()
478
479	err = sc.actions.Remove(sc.repo, e.Id())
480	if err != nil {
481		sc.mu.Unlock()
482		return err
483	}
484
485	delete(sc.cached, e.Id())
486	delete(sc.excerpts, e.Id())
487	sc.lru.Remove(e.Id())
488
489	index, err := sc.repo.GetIndex(sc.namespace)
490	if err != nil {
491		sc.mu.Unlock()
492		return err
493	}
494
495	err = index.Remove(e.Id().String())
496	sc.mu.Unlock()
497	if err != nil {
498		return err
499	}
500
501	return sc.write()
502}
503
504func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
505	sc.mu.Lock()
506
507	err := sc.actions.RemoveAll(sc.repo)
508	if err != nil {
509		sc.mu.Unlock()
510		return err
511	}
512
513	for id, _ := range sc.cached {
514		delete(sc.cached, id)
515		sc.lru.Remove(id)
516	}
517	for id, _ := range sc.excerpts {
518		delete(sc.excerpts, id)
519	}
520
521	index, err := sc.repo.GetIndex(sc.namespace)
522	if err != nil {
523		sc.mu.Unlock()
524		return err
525	}
526
527	err = index.Clear()
528	sc.mu.Unlock()
529	if err != nil {
530		return err
531	}
532
533	return sc.write()
534}
535
536func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
537	out := make(chan entity.MergeResult)
538
539	// Intercept merge results to update the cache properly
540	go func() {
541		defer close(out)
542
543		author, err := sc.getUserIdentity()
544		if err != nil {
545			out <- entity.NewMergeError(err, "")
546			return
547		}
548
549		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
550		for result := range results {
551			out <- result
552
553			if result.Err != nil {
554				continue
555			}
556
557			switch result.Status {
558			case entity.MergeStatusNew, entity.MergeStatusUpdated:
559				e := result.Entity.(EntityT)
560				cached := sc.makeCached(e, sc.entityUpdated)
561
562				sc.mu.Lock()
563				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
564				// might as well keep them in memory
565				sc.cached[result.Id] = cached
566				sc.mu.Unlock()
567			}
568		}
569
570		err = sc.write()
571		if err != nil {
572			out <- entity.NewMergeError(err, "")
573			return
574		}
575	}()
576
577	return out
578
579}
580
581func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
582	return sc.namespace
583}
584
585// entityUpdated is a callback to trigger when the excerpt of an entity changed
586func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
587	sc.mu.Lock()
588	e, ok := sc.cached[id]
589	if !ok {
590		sc.mu.Unlock()
591
592		// if the bug is not loaded at this point, it means it was loaded before
593		// but got evicted. Which means we potentially have multiple copies in
594		// memory and thus concurrent write.
595		// Failing immediately here is the simple and safe solution to avoid
596		// complicated data loss.
597		return errors.New("entity missing from cache")
598	}
599	sc.lru.Get(id)
600	sc.excerpts[id] = sc.makeExcerpt(e)
601	sc.mu.Unlock()
602
603	index, err := sc.repo.GetIndex(sc.namespace)
604	if err != nil {
605		return err
606	}
607
608	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
609	if err != nil {
610		return err
611	}
612
613	return sc.write()
614}
615
616// evictIfNeeded will evict an entity from the cache if needed
617func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
618	sc.mu.Lock()
619	defer sc.mu.Unlock()
620	if sc.lru.Len() <= sc.maxLoaded {
621		return
622	}
623
624	for _, id := range sc.lru.GetOldestToNewest() {
625		b := sc.cached[id]
626		if b.NeedCommit() {
627			continue
628		}
629
630		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
631		// if something try to do it anyway, it will lock the program and make it obvious.
632		b.Lock()
633
634		sc.lru.Remove(id)
635		delete(sc.cached, id)
636
637		if sc.lru.Len() <= sc.maxLoaded {
638			return
639		}
640	}
641}