subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entity"
 13	"github.com/MichaelMure/git-bug/repository"
 14)
 15
 16type Excerpt interface {
 17	Id() entity.Id
 18	setId(id entity.Id)
 19}
 20
 21type CacheEntity interface {
 22	Id() entity.Id
 23	NeedCommit() bool
 24	Lock()
 25}
 26
 27type getUserIdentityFunc func() (*IdentityCache, error)
 28
 29// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 30// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 31// functions from this package, but right now identities are not using that framework.
 32type Actions[EntityT entity.Interface] struct {
 33	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 34	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 35	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 36	RemoveAll           func(repo repository.ClockedRepo) error
 37	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor entity.Interface) <-chan entity.MergeResult
 38}
 39
 40var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 41
 42type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 43	repo      repository.ClockedRepo
 44	resolvers func() entity.Resolvers
 45
 46	getUserIdentity getUserIdentityFunc
 47	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 48	makeExcerpt     func(CacheT) ExcerptT
 49	makeIndexData   func(CacheT) []string
 50	actions         Actions[EntityT]
 51
 52	typename  string
 53	namespace string
 54	version   uint
 55	maxLoaded int
 56
 57	mu       sync.RWMutex
 58	excerpts map[entity.Id]ExcerptT
 59	cached   map[entity.Id]CacheT
 60	lru      *lruIdCache
 61}
 62
 63func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 64	repo repository.ClockedRepo,
 65	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 66	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 67	makeExcerpt func(CacheT) ExcerptT,
 68	makeIndexData func(CacheT) []string,
 69	actions Actions[EntityT],
 70	typename, namespace string,
 71	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 72	return &SubCache[EntityT, ExcerptT, CacheT]{
 73		repo:            repo,
 74		resolvers:       resolvers,
 75		getUserIdentity: getUserIdentity,
 76		makeCached:      makeCached,
 77		makeExcerpt:     makeExcerpt,
 78		makeIndexData:   makeIndexData,
 79		actions:         actions,
 80		typename:        typename,
 81		namespace:       namespace,
 82		version:         version,
 83		maxLoaded:       maxLoaded,
 84		excerpts:        make(map[entity.Id]ExcerptT),
 85		cached:          make(map[entity.Id]CacheT),
 86		lru:             newLRUIdCache(),
 87	}
 88}
 89
 90func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 91	return sc.typename
 92}
 93
 94// Load will try to read from the disk the entity cache file
 95func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 96	sc.mu.Lock()
 97	defer sc.mu.Unlock()
 98
 99	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
100	if err != nil {
101		return err
102	}
103
104	aux := struct {
105		Version  uint
106		Excerpts map[entity.Id]ExcerptT
107	}{}
108
109	decoder := gob.NewDecoder(f)
110	err = decoder.Decode(&aux)
111	if err != nil {
112		_ = f.Close()
113		return err
114	}
115
116	err = f.Close()
117	if err != nil {
118		return err
119	}
120
121	if aux.Version != sc.version {
122		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
123	}
124
125	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
126	// so we fix it here, which doubles as enforcing coherency.
127	for id, excerpt := range aux.Excerpts {
128		excerpt.setId(id)
129	}
130
131	sc.excerpts = aux.Excerpts
132
133	index, err := sc.repo.GetIndex(sc.namespace)
134	if err != nil {
135		return err
136	}
137
138	// simple heuristic to detect a mismatch between the index and the entities
139	count, err := index.DocCount()
140	if err != nil {
141		return err
142	}
143	if count != uint64(len(sc.excerpts)) {
144		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
145	}
146
147	// TODO: find a way to check lamport clocks
148
149	return nil
150}
151
152// Write will serialize on disk the entity cache file
153func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
154	sc.mu.RLock()
155	defer sc.mu.RUnlock()
156
157	var data bytes.Buffer
158
159	aux := struct {
160		Version  uint
161		Excerpts map[entity.Id]ExcerptT
162	}{
163		Version:  sc.version,
164		Excerpts: sc.excerpts,
165	}
166
167	encoder := gob.NewEncoder(&data)
168
169	err := encoder.Encode(aux)
170	if err != nil {
171		return err
172	}
173
174	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
175	if err != nil {
176		return err
177	}
178
179	_, err = f.Write(data.Bytes())
180	if err != nil {
181		_ = f.Close()
182		return err
183	}
184
185	return f.Close()
186}
187
188func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
189	// value chosen experimentally as giving the fasted indexing, while
190	// not driving the cache size on disk too high.
191	//
192	// | batchCount | bugIndex (MB) | idIndex (kB) | time (s) |
193	// |:----------:|:-------------:|:------------:|:--------:|
194	// |     10     |      24       |      84      |   1,59   |
195	// |     30     |      26       |      84      |  1,388   |
196	// |     50     |      26       |      84      |   1,44   |
197	// |     60     |      26       |      80      |  1,377   |
198	// |     68     |      27       |      80      |  1,385   |
199	// |     75     |      26       |      84      |   1,32   |
200	// |     80     |      26       |      80      |   1,37   |
201	// |     85     |      27       |      80      |  1,317   |
202	// |    100     |      26       |      80      |  1,455   |
203	// |    150     |      26       |      80      |  2,066   |
204	// |    200     |      28       |      80      |  2,885   |
205	// |    250     |      30       |      72      |  3,555   |
206	// |    300     |      31       |      72      |  4,787   |
207	// |    500     |      23       |      72      |   5,4    |
208	const maxBatchCount = 75
209
210	out := make(chan BuildEvent)
211
212	go func() {
213		defer close(out)
214
215		out <- BuildEvent{
216			Typename: sc.typename,
217			Event:    BuildEventStarted,
218		}
219
220		sc.excerpts = make(map[entity.Id]ExcerptT)
221
222		allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
223
224		index, err := sc.repo.GetIndex(sc.namespace)
225		if err != nil {
226			out <- BuildEvent{
227				Typename: sc.typename,
228				Err:      err,
229			}
230			return
231		}
232
233		// wipe the index just to be sure
234		err = index.Clear()
235		if err != nil {
236			out <- BuildEvent{
237				Typename: sc.typename,
238				Err:      err,
239			}
240			return
241		}
242
243		indexer, indexEnd := index.IndexBatch()
244		var batchCount int
245
246		for e := range allEntities {
247			if e.Err != nil {
248				out <- BuildEvent{
249					Typename: sc.typename,
250					Err:      e.Err,
251				}
252				return
253			}
254
255			cached := sc.makeCached(e.Entity, sc.entityUpdated)
256			sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
257			// might as well keep them in memory
258			sc.cached[e.Entity.Id()] = cached
259
260			indexData := sc.makeIndexData(cached)
261			if err := indexer(e.Entity.Id().String(), indexData); err != nil {
262				out <- BuildEvent{
263					Typename: sc.typename,
264					Err:      err,
265				}
266				return
267			}
268
269			batchCount++
270			if batchCount >= maxBatchCount {
271				err = indexEnd()
272				if err != nil {
273					out <- BuildEvent{
274						Typename: sc.typename,
275						Err:      err,
276					}
277					return
278				}
279
280				indexer, indexEnd = index.IndexBatch()
281				batchCount = 0
282			}
283
284			out <- BuildEvent{
285				Typename: sc.typename,
286				Event:    BuildEventProgress,
287				Progress: e.CurrentEntity,
288				Total:    e.TotalEntities,
289			}
290		}
291
292		if batchCount > 0 {
293			err = indexEnd()
294			if err != nil {
295				out <- BuildEvent{
296					Typename: sc.typename,
297					Err:      err,
298				}
299				return
300			}
301		}
302
303		err = sc.write()
304		if err != nil {
305			out <- BuildEvent{
306				Typename: sc.typename,
307				Err:      err,
308			}
309			return
310		}
311
312		out <- BuildEvent{
313			Typename: sc.typename,
314			Event:    BuildEventFinished,
315		}
316	}()
317
318	return out
319}
320
321func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
322	sc.maxLoaded = size
323	sc.evictIfNeeded()
324}
325
326func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
327	sc.mu.Lock()
328	defer sc.mu.Unlock()
329	sc.excerpts = nil
330	sc.cached = make(map[entity.Id]CacheT)
331	return nil
332}
333
334// AllIds return all known bug ids
335func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
336	sc.mu.RLock()
337	defer sc.mu.RUnlock()
338
339	result := make([]entity.Id, len(sc.excerpts))
340
341	i := 0
342	for _, excerpt := range sc.excerpts {
343		result[i] = excerpt.Id()
344		i++
345	}
346
347	return result
348}
349
350// Resolve retrieve an entity matching the exact given id
351func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
352	sc.mu.RLock()
353	cached, ok := sc.cached[id]
354	if ok {
355		sc.lru.Get(id)
356		sc.mu.RUnlock()
357		return cached, nil
358	}
359	sc.mu.RUnlock()
360
361	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
362	if err != nil {
363		return *new(CacheT), err
364	}
365
366	cached = sc.makeCached(e, sc.entityUpdated)
367
368	sc.mu.Lock()
369	sc.cached[id] = cached
370	sc.lru.Add(id)
371	sc.mu.Unlock()
372
373	sc.evictIfNeeded()
374
375	return cached, nil
376}
377
378// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
379// entity match.
380func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
381	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
382		return excerpt.Id().HasPrefix(prefix)
383	})
384}
385
386func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
387	id, err := sc.resolveMatcher(f)
388	if err != nil {
389		return *new(CacheT), err
390	}
391	return sc.Resolve(id)
392}
393
394// ResolveExcerpt retrieve an Excerpt matching the exact given id
395func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
396	sc.mu.RLock()
397	defer sc.mu.RUnlock()
398
399	excerpt, ok := sc.excerpts[id]
400	if !ok {
401		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
402	}
403
404	return excerpt, nil
405}
406
407// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
408// entity match.
409func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
410	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
411		return excerpt.Id().HasPrefix(prefix)
412	})
413}
414
415func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
416	id, err := sc.resolveMatcher(f)
417	if err != nil {
418		return *new(ExcerptT), err
419	}
420	return sc.ResolveExcerpt(id)
421}
422
423func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
424	sc.mu.RLock()
425	defer sc.mu.RUnlock()
426
427	// preallocate but empty
428	matching := make([]entity.Id, 0, 5)
429
430	for _, excerpt := range sc.excerpts {
431		if f(excerpt) {
432			matching = append(matching, excerpt.Id())
433		}
434	}
435
436	if len(matching) > 1 {
437		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
438	}
439
440	if len(matching) == 0 {
441		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
442	}
443
444	return matching[0], nil
445}
446
447func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
448	sc.mu.Lock()
449	if _, has := sc.cached[e.Id()]; has {
450		sc.mu.Unlock()
451		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
452	}
453
454	cached := sc.makeCached(e, sc.entityUpdated)
455	sc.cached[e.Id()] = cached
456	sc.lru.Add(e.Id())
457	sc.mu.Unlock()
458
459	sc.evictIfNeeded()
460
461	// force the write of the excerpt
462	err := sc.entityUpdated(e.Id())
463	if err != nil {
464		return *new(CacheT), err
465	}
466
467	return cached, nil
468}
469
470func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
471	e, err := sc.ResolvePrefix(prefix)
472	if err != nil {
473		return err
474	}
475
476	sc.mu.Lock()
477
478	err = sc.actions.Remove(sc.repo, e.Id())
479	if err != nil {
480		sc.mu.Unlock()
481		return err
482	}
483
484	delete(sc.cached, e.Id())
485	delete(sc.excerpts, e.Id())
486	sc.lru.Remove(e.Id())
487
488	index, err := sc.repo.GetIndex(sc.namespace)
489	if err != nil {
490		sc.mu.Unlock()
491		return err
492	}
493
494	err = index.Remove(e.Id().String())
495	sc.mu.Unlock()
496	if err != nil {
497		return err
498	}
499
500	return sc.write()
501}
502
503func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
504	sc.mu.Lock()
505
506	err := sc.actions.RemoveAll(sc.repo)
507	if err != nil {
508		sc.mu.Unlock()
509		return err
510	}
511
512	for id, _ := range sc.cached {
513		delete(sc.cached, id)
514		sc.lru.Remove(id)
515	}
516	for id, _ := range sc.excerpts {
517		delete(sc.excerpts, id)
518	}
519
520	index, err := sc.repo.GetIndex(sc.namespace)
521	if err != nil {
522		sc.mu.Unlock()
523		return err
524	}
525
526	err = index.Clear()
527	sc.mu.Unlock()
528	if err != nil {
529		return err
530	}
531
532	return sc.write()
533}
534
535func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
536	out := make(chan entity.MergeResult)
537
538	// Intercept merge results to update the cache properly
539	go func() {
540		defer close(out)
541
542		author, err := sc.getUserIdentity()
543		if err != nil {
544			out <- entity.NewMergeError(err, "")
545			return
546		}
547
548		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
549		for result := range results {
550			out <- result
551
552			if result.Err != nil {
553				continue
554			}
555
556			switch result.Status {
557			case entity.MergeStatusNew, entity.MergeStatusUpdated:
558				e := result.Entity.(EntityT)
559				cached := sc.makeCached(e, sc.entityUpdated)
560
561				sc.mu.Lock()
562				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
563				// might as well keep them in memory
564				sc.cached[result.Id] = cached
565				sc.mu.Unlock()
566			}
567		}
568
569		err = sc.write()
570		if err != nil {
571			out <- entity.NewMergeError(err, "")
572			return
573		}
574	}()
575
576	return out
577
578}
579
580func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
581	return sc.namespace
582}
583
584// entityUpdated is a callback to trigger when the excerpt of an entity changed
585func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
586	sc.mu.Lock()
587	e, ok := sc.cached[id]
588	if !ok {
589		sc.mu.Unlock()
590
591		// if the bug is not loaded at this point, it means it was loaded before
592		// but got evicted. Which means we potentially have multiple copies in
593		// memory and thus concurrent write.
594		// Failing immediately here is the simple and safe solution to avoid
595		// complicated data loss.
596		return errors.New("entity missing from cache")
597	}
598	sc.lru.Get(id)
599	sc.excerpts[id] = sc.makeExcerpt(e)
600	sc.mu.Unlock()
601
602	index, err := sc.repo.GetIndex(sc.namespace)
603	if err != nil {
604		return err
605	}
606
607	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
608	if err != nil {
609		return err
610	}
611
612	return sc.write()
613}
614
615// evictIfNeeded will evict an entity from the cache if needed
616func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
617	sc.mu.Lock()
618	defer sc.mu.Unlock()
619	if sc.lru.Len() <= sc.maxLoaded {
620		return
621	}
622
623	for _, id := range sc.lru.GetOldestToNewest() {
624		b := sc.cached[id]
625		if b.NeedCommit() {
626			continue
627		}
628
629		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
630		// if something try to do it anyway, it will lock the program and make it obvious.
631		b.Lock()
632
633		sc.lru.Remove(id)
634		delete(sc.cached, id)
635
636		if sc.lru.Len() <= sc.maxLoaded {
637			return
638		}
639	}
640}