subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entities/identity"
 13	"github.com/MichaelMure/git-bug/entity"
 14	"github.com/MichaelMure/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19	setId(id entity.Id)
 20}
 21
 22type CacheEntity interface {
 23	Id() entity.Id
 24	NeedCommit() bool
 25	Lock()
 26}
 27
 28type getUserIdentityFunc func() (*IdentityCache, error)
 29
 30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 32// functions from this package, but right now identities are not using that framework.
 33type Actions[EntityT entity.Interface] struct {
 34	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 35	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 36	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 37	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 38}
 39
 40var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 41
 42type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 43	repo      repository.ClockedRepo
 44	resolvers func() entity.Resolvers
 45
 46	getUserIdentity getUserIdentityFunc
 47	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 48	makeExcerpt     func(CacheT) ExcerptT
 49	makeIndexData   func(CacheT) []string
 50	actions         Actions[EntityT]
 51
 52	typename  string
 53	namespace string
 54	version   uint
 55	maxLoaded int
 56
 57	mu       sync.RWMutex
 58	excerpts map[entity.Id]ExcerptT
 59	cached   map[entity.Id]CacheT
 60	lru      *lruIdCache
 61}
 62
 63func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 64	repo repository.ClockedRepo,
 65	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 66	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 67	makeExcerpt func(CacheT) ExcerptT,
 68	makeIndexData func(CacheT) []string,
 69	actions Actions[EntityT],
 70	typename, namespace string,
 71	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 72	return &SubCache[EntityT, ExcerptT, CacheT]{
 73		repo:            repo,
 74		resolvers:       resolvers,
 75		getUserIdentity: getUserIdentity,
 76		makeCached:      makeCached,
 77		makeExcerpt:     makeExcerpt,
 78		makeIndexData:   makeIndexData,
 79		actions:         actions,
 80		typename:        typename,
 81		namespace:       namespace,
 82		version:         version,
 83		maxLoaded:       maxLoaded,
 84		excerpts:        make(map[entity.Id]ExcerptT),
 85		cached:          make(map[entity.Id]CacheT),
 86		lru:             newLRUIdCache(),
 87	}
 88}
 89
 90func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 91	return sc.typename
 92}
 93
 94// Load will try to read from the disk the entity cache file
 95func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 96	sc.mu.Lock()
 97	defer sc.mu.Unlock()
 98
 99	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
100	if err != nil {
101		return err
102	}
103
104	decoder := gob.NewDecoder(f)
105
106	aux := struct {
107		Version  uint
108		Excerpts map[entity.Id]ExcerptT
109	}{}
110
111	err = decoder.Decode(&aux)
112	if err != nil {
113		return err
114	}
115
116	if aux.Version != sc.version {
117		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
118	}
119
120	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
121	// so we fix it here, which doubles as enforcing coherency.
122	for id, excerpt := range aux.Excerpts {
123		excerpt.setId(id)
124	}
125
126	sc.excerpts = aux.Excerpts
127
128	index, err := sc.repo.GetIndex(sc.namespace)
129	if err != nil {
130		return err
131	}
132
133	// simple heuristic to detect a mismatch between the index and the entities
134	count, err := index.DocCount()
135	if err != nil {
136		return err
137	}
138	if count != uint64(len(sc.excerpts)) {
139		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
140	}
141
142	// TODO: find a way to check lamport clocks
143
144	return nil
145}
146
147// Write will serialize on disk the entity cache file
148func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
149	sc.mu.RLock()
150	defer sc.mu.RUnlock()
151
152	var data bytes.Buffer
153
154	aux := struct {
155		Version  uint
156		Excerpts map[entity.Id]ExcerptT
157	}{
158		Version:  sc.version,
159		Excerpts: sc.excerpts,
160	}
161
162	encoder := gob.NewEncoder(&data)
163
164	err := encoder.Encode(aux)
165	if err != nil {
166		return err
167	}
168
169	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
170	if err != nil {
171		return err
172	}
173
174	_, err = f.Write(data.Bytes())
175	if err != nil {
176		return err
177	}
178
179	return f.Close()
180}
181
182func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
183	sc.excerpts = make(map[entity.Id]ExcerptT)
184
185	allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
186
187	index, err := sc.repo.GetIndex(sc.namespace)
188	if err != nil {
189		return err
190	}
191
192	// wipe the index just to be sure
193	err = index.Clear()
194	if err != nil {
195		return err
196	}
197
198	indexer, indexEnd := index.IndexBatch()
199
200	for e := range allEntities {
201		if e.Err != nil {
202			return e.Err
203		}
204
205		cached := sc.makeCached(e.Entity, sc.entityUpdated)
206		sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
207		// might as well keep them in memory
208		sc.cached[e.Entity.Id()] = cached
209
210		indexData := sc.makeIndexData(cached)
211		if err := indexer(e.Entity.Id().String(), indexData); err != nil {
212			return err
213		}
214	}
215
216	err = indexEnd()
217	if err != nil {
218		return err
219	}
220
221	err = sc.write()
222	if err != nil {
223		return err
224	}
225
226	return nil
227}
228
229func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
230	sc.maxLoaded = size
231	sc.evictIfNeeded()
232}
233
234func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
235	sc.mu.Lock()
236	defer sc.mu.Unlock()
237	sc.excerpts = nil
238	sc.cached = make(map[entity.Id]CacheT)
239	return nil
240}
241
242// AllIds return all known bug ids
243func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
244	sc.mu.RLock()
245	defer sc.mu.RUnlock()
246
247	result := make([]entity.Id, len(sc.excerpts))
248
249	i := 0
250	for _, excerpt := range sc.excerpts {
251		result[i] = excerpt.Id()
252		i++
253	}
254
255	return result
256}
257
258// Resolve retrieve an entity matching the exact given id
259func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
260	sc.mu.RLock()
261	cached, ok := sc.cached[id]
262	if ok {
263		sc.lru.Get(id)
264		sc.mu.RUnlock()
265		return cached, nil
266	}
267	sc.mu.RUnlock()
268
269	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
270	if err != nil {
271		return *new(CacheT), err
272	}
273
274	cached = sc.makeCached(e, sc.entityUpdated)
275
276	sc.mu.Lock()
277	sc.cached[id] = cached
278	sc.lru.Add(id)
279	sc.mu.Unlock()
280
281	sc.evictIfNeeded()
282
283	return cached, nil
284}
285
286// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
287// entity match.
288func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
289	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
290		return excerpt.Id().HasPrefix(prefix)
291	})
292}
293
294func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
295	id, err := sc.resolveMatcher(f)
296	if err != nil {
297		return *new(CacheT), err
298	}
299	return sc.Resolve(id)
300}
301
302// ResolveExcerpt retrieve an Excerpt matching the exact given id
303func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
304	sc.mu.RLock()
305	defer sc.mu.RUnlock()
306
307	excerpt, ok := sc.excerpts[id]
308	if !ok {
309		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
310	}
311
312	return excerpt, nil
313}
314
315// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
316// entity match.
317func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
318	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
319		return excerpt.Id().HasPrefix(prefix)
320	})
321}
322
323func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
324	id, err := sc.resolveMatcher(f)
325	if err != nil {
326		return *new(ExcerptT), err
327	}
328	return sc.ResolveExcerpt(id)
329}
330
331func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
332	sc.mu.RLock()
333	defer sc.mu.RUnlock()
334
335	// preallocate but empty
336	matching := make([]entity.Id, 0, 5)
337
338	for _, excerpt := range sc.excerpts {
339		if f(excerpt) {
340			matching = append(matching, excerpt.Id())
341		}
342	}
343
344	if len(matching) > 1 {
345		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
346	}
347
348	if len(matching) == 0 {
349		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
350	}
351
352	return matching[0], nil
353}
354
355func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
356	sc.mu.Lock()
357	if _, has := sc.cached[e.Id()]; has {
358		sc.mu.Unlock()
359		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
360	}
361
362	cached := sc.makeCached(e, sc.entityUpdated)
363	sc.cached[e.Id()] = cached
364	sc.lru.Add(e.Id())
365	sc.mu.Unlock()
366
367	sc.evictIfNeeded()
368
369	// force the write of the excerpt
370	err := sc.entityUpdated(e.Id())
371	if err != nil {
372		return *new(CacheT), err
373	}
374
375	return cached, nil
376}
377
378func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
379	e, err := sc.ResolvePrefix(prefix)
380	if err != nil {
381		return err
382	}
383
384	sc.mu.Lock()
385
386	err = sc.actions.Remove(sc.repo, e.Id())
387	if err != nil {
388		sc.mu.Unlock()
389		return err
390	}
391
392	delete(sc.cached, e.Id())
393	delete(sc.excerpts, e.Id())
394	sc.lru.Remove(e.Id())
395
396	sc.mu.Unlock()
397
398	return sc.write()
399}
400
401func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
402	out := make(chan entity.MergeResult)
403
404	// Intercept merge results to update the cache properly
405	go func() {
406		defer close(out)
407
408		author, err := sc.getUserIdentity()
409		if err != nil {
410			out <- entity.NewMergeError(err, "")
411			return
412		}
413
414		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
415		for result := range results {
416			out <- result
417
418			if result.Err != nil {
419				continue
420			}
421
422			switch result.Status {
423			case entity.MergeStatusNew, entity.MergeStatusUpdated:
424				e := result.Entity.(EntityT)
425				cached := sc.makeCached(e, sc.entityUpdated)
426
427				sc.mu.Lock()
428				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
429				// might as well keep them in memory
430				sc.cached[result.Id] = cached
431				sc.mu.Unlock()
432			}
433		}
434
435		err = sc.write()
436		if err != nil {
437			out <- entity.NewMergeError(err, "")
438			return
439		}
440	}()
441
442	return out
443
444}
445
446func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
447	return sc.namespace
448}
449
450// entityUpdated is a callback to trigger when the excerpt of an entity changed
451func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
452	sc.mu.Lock()
453	e, ok := sc.cached[id]
454	if !ok {
455		sc.mu.Unlock()
456
457		// if the bug is not loaded at this point, it means it was loaded before
458		// but got evicted. Which means we potentially have multiple copies in
459		// memory and thus concurrent write.
460		// Failing immediately here is the simple and safe solution to avoid
461		// complicated data loss.
462		return errors.New("entity missing from cache")
463	}
464	sc.lru.Get(id)
465	// sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
466	sc.excerpts[id] = sc.makeExcerpt(e)
467	sc.mu.Unlock()
468
469	index, err := sc.repo.GetIndex(sc.namespace)
470	if err != nil {
471		return err
472	}
473
474	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
475	if err != nil {
476		return err
477	}
478
479	return sc.write()
480}
481
482// evictIfNeeded will evict an entity from the cache if needed
483func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
484	sc.mu.Lock()
485	defer sc.mu.Unlock()
486	if sc.lru.Len() <= sc.maxLoaded {
487		return
488	}
489
490	for _, id := range sc.lru.GetOldestToNewest() {
491		b := sc.cached[id]
492		if b.NeedCommit() {
493			continue
494		}
495
496		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
497		// if something try to do it anyway, it will lock the program and make it obvious.
498		b.Lock()
499
500		sc.lru.Remove(id)
501		delete(sc.cached, id)
502
503		if sc.lru.Len() <= sc.maxLoaded {
504			return
505		}
506	}
507}