subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"sync"
  8
  9	"github.com/pkg/errors"
 10
 11	"github.com/MichaelMure/git-bug/entities/identity"
 12	"github.com/MichaelMure/git-bug/entity"
 13	"github.com/MichaelMure/git-bug/repository"
 14)
 15
 16type Excerpt interface {
 17	Id() entity.Id
 18	setId(id entity.Id)
 19}
 20
 21type CacheEntity interface {
 22	Id() entity.Id
 23	NeedCommit() bool
 24}
 25
 26type getUserIdentityFunc func() (*IdentityCache, error)
 27
 28// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 29// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 30// functions from this package, but right now identities are not using that framework.
 31type Actions[EntityT entity.Interface] struct {
 32	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 33	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 34	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 35	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 36}
 37
 38var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 39
 40type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 41	repo      repository.ClockedRepo
 42	resolvers func() entity.Resolvers
 43
 44	getUserIdentity getUserIdentityFunc
 45	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 46	makeExcerpt     func(CacheT) ExcerptT
 47	makeIndexData   func(CacheT) []string
 48	actions         Actions[EntityT]
 49
 50	typename  string
 51	namespace string
 52	version   uint
 53	maxLoaded int
 54
 55	mu       sync.RWMutex
 56	excerpts map[entity.Id]ExcerptT
 57	cached   map[entity.Id]CacheT
 58	lru      *lruIdCache
 59}
 60
 61func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 62	repo repository.ClockedRepo,
 63	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 64	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 65	makeExcerpt func(CacheT) ExcerptT,
 66	makeIndexData func(CacheT) []string,
 67	actions Actions[EntityT],
 68	typename, namespace string,
 69	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 70	return &SubCache[EntityT, ExcerptT, CacheT]{
 71		repo:            repo,
 72		resolvers:       resolvers,
 73		getUserIdentity: getUserIdentity,
 74		makeCached:      makeCached,
 75		makeExcerpt:     makeExcerpt,
 76		makeIndexData:   makeIndexData,
 77		actions:         actions,
 78		typename:        typename,
 79		namespace:       namespace,
 80		version:         version,
 81		maxLoaded:       maxLoaded,
 82		excerpts:        make(map[entity.Id]ExcerptT),
 83		cached:          make(map[entity.Id]CacheT),
 84		lru:             newLRUIdCache(),
 85	}
 86}
 87
 88func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 89	return sc.typename
 90}
 91
 92// Load will try to read from the disk the entity cache file
 93func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 94	sc.mu.Lock()
 95	defer sc.mu.Unlock()
 96
 97	f, err := sc.repo.LocalStorage().Open(sc.namespace + "-file")
 98	if err != nil {
 99		return err
100	}
101
102	decoder := gob.NewDecoder(f)
103
104	aux := struct {
105		Version  uint
106		Excerpts map[entity.Id]ExcerptT
107	}{}
108
109	err = decoder.Decode(&aux)
110	if err != nil {
111		return err
112	}
113
114	if aux.Version != sc.version {
115		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
116	}
117
118	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
119	// so we fix it here, which doubles as enforcing coherency.
120	for id, excerpt := range aux.Excerpts {
121		excerpt.setId(id)
122	}
123
124	sc.excerpts = aux.Excerpts
125
126	index, err := sc.repo.GetIndex(sc.typename)
127	if err != nil {
128		return err
129	}
130
131	// simple heuristic to detect a mismatch between the index and the entities
132	count, err := index.DocCount()
133	if err != nil {
134		return err
135	}
136	if count != uint64(len(sc.excerpts)) {
137		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
138	}
139
140	return nil
141}
142
143// Write will serialize on disk the entity cache file
144func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
145	sc.mu.RLock()
146	defer sc.mu.RUnlock()
147
148	var data bytes.Buffer
149
150	aux := struct {
151		Version  uint
152		Excerpts map[entity.Id]ExcerptT
153	}{
154		Version:  sc.version,
155		Excerpts: sc.excerpts,
156	}
157
158	encoder := gob.NewEncoder(&data)
159
160	err := encoder.Encode(aux)
161	if err != nil {
162		return err
163	}
164
165	f, err := sc.repo.LocalStorage().Create(sc.namespace + "-file")
166	if err != nil {
167		return err
168	}
169
170	_, err = f.Write(data.Bytes())
171	if err != nil {
172		return err
173	}
174
175	return f.Close()
176}
177
178func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
179	sc.excerpts = make(map[entity.Id]ExcerptT)
180
181	allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
182
183	index, err := sc.repo.GetIndex(sc.typename)
184	if err != nil {
185		return err
186	}
187
188	// wipe the index just to be sure
189	err = index.Clear()
190	if err != nil {
191		return err
192	}
193
194	indexer, indexEnd := index.IndexBatch()
195
196	for e := range allEntities {
197		if e.Err != nil {
198			return e.Err
199		}
200
201		cached := sc.makeCached(e.Entity, sc.entityUpdated)
202		sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
203		// might as well keep them in memory
204		sc.cached[e.Entity.Id()] = cached
205
206		indexData := sc.makeIndexData(cached)
207		if err := indexer(e.Entity.Id().String(), indexData); err != nil {
208			return err
209		}
210	}
211
212	err = indexEnd()
213	if err != nil {
214		return err
215	}
216
217	err = sc.write()
218	if err != nil {
219		return err
220	}
221
222	return nil
223}
224
225func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
226	sc.maxLoaded = size
227	sc.evictIfNeeded()
228}
229
230func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
231	sc.mu.Lock()
232	defer sc.mu.Unlock()
233	sc.excerpts = nil
234	sc.cached = make(map[entity.Id]CacheT)
235	return nil
236}
237
238// AllIds return all known bug ids
239func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
240	sc.mu.RLock()
241	defer sc.mu.RUnlock()
242
243	result := make([]entity.Id, len(sc.excerpts))
244
245	i := 0
246	for _, excerpt := range sc.excerpts {
247		result[i] = excerpt.Id()
248		i++
249	}
250
251	return result
252}
253
254// Resolve retrieve an entity matching the exact given id
255func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
256	sc.mu.RLock()
257	cached, ok := sc.cached[id]
258	if ok {
259		sc.lru.Get(id)
260		sc.mu.RUnlock()
261		return cached, nil
262	}
263	sc.mu.RUnlock()
264
265	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
266	if err != nil {
267		return *new(CacheT), err
268	}
269
270	cached = sc.makeCached(e, sc.entityUpdated)
271
272	sc.mu.Lock()
273	sc.cached[id] = cached
274	sc.lru.Add(id)
275	sc.mu.Unlock()
276
277	sc.evictIfNeeded()
278
279	return cached, nil
280}
281
282// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
283// entity match.
284func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
285	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
286		return excerpt.Id().HasPrefix(prefix)
287	})
288}
289
290func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
291	id, err := sc.resolveMatcher(f)
292	if err != nil {
293		return *new(CacheT), err
294	}
295	return sc.Resolve(id)
296}
297
298// ResolveExcerpt retrieve an Excerpt matching the exact given id
299func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
300	sc.mu.RLock()
301	defer sc.mu.RUnlock()
302
303	excerpt, ok := sc.excerpts[id]
304	if !ok {
305		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
306	}
307
308	return excerpt, nil
309}
310
311// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
312// entity match.
313func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
314	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
315		return excerpt.Id().HasPrefix(prefix)
316	})
317}
318
319func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
320	id, err := sc.resolveMatcher(f)
321	if err != nil {
322		return *new(ExcerptT), err
323	}
324	return sc.ResolveExcerpt(id)
325}
326
327func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
328	sc.mu.RLock()
329	defer sc.mu.RUnlock()
330
331	// preallocate but empty
332	matching := make([]entity.Id, 0, 5)
333
334	for _, excerpt := range sc.excerpts {
335		if f(excerpt) {
336			matching = append(matching, excerpt.Id())
337		}
338	}
339
340	if len(matching) > 1 {
341		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
342	}
343
344	if len(matching) == 0 {
345		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
346	}
347
348	return matching[0], nil
349}
350
351func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
352	sc.mu.Lock()
353	if _, has := sc.cached[e.Id()]; has {
354		sc.mu.Unlock()
355		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
356	}
357
358	cached := sc.makeCached(e, sc.entityUpdated)
359	sc.cached[e.Id()] = cached
360	sc.lru.Add(e.Id())
361	sc.mu.Unlock()
362
363	sc.evictIfNeeded()
364
365	// force the write of the excerpt
366	err := sc.entityUpdated(e.Id())
367	if err != nil {
368		return *new(CacheT), err
369	}
370
371	return cached, nil
372}
373
374func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
375	e, err := sc.ResolvePrefix(prefix)
376	if err != nil {
377		return err
378	}
379
380	sc.mu.Lock()
381
382	err = sc.actions.Remove(sc.repo, e.Id())
383	if err != nil {
384		sc.mu.Unlock()
385		return err
386	}
387
388	delete(sc.cached, e.Id())
389	delete(sc.excerpts, e.Id())
390	sc.lru.Remove(e.Id())
391
392	sc.mu.Unlock()
393
394	return sc.write()
395}
396
397func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
398	out := make(chan entity.MergeResult)
399
400	// Intercept merge results to update the cache properly
401	go func() {
402		defer close(out)
403
404		author, err := sc.getUserIdentity()
405		if err != nil {
406			out <- entity.NewMergeError(err, "")
407			return
408		}
409
410		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
411		for result := range results {
412			out <- result
413
414			if result.Err != nil {
415				continue
416			}
417
418			switch result.Status {
419			case entity.MergeStatusNew, entity.MergeStatusUpdated:
420				e := result.Entity.(EntityT)
421				cached := sc.makeCached(e, sc.entityUpdated)
422
423				sc.mu.Lock()
424				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
425				// might as well keep them in memory
426				sc.cached[result.Id] = cached
427				sc.mu.Unlock()
428			}
429		}
430
431		err = sc.write()
432		if err != nil {
433			out <- entity.NewMergeError(err, "")
434			return
435		}
436	}()
437
438	return out
439
440}
441
442func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
443	return sc.namespace
444}
445
446// entityUpdated is a callback to trigger when the excerpt of an entity changed
447func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
448	sc.mu.Lock()
449	e, ok := sc.cached[id]
450	if !ok {
451		sc.mu.Unlock()
452
453		// if the bug is not loaded at this point, it means it was loaded before
454		// but got evicted. Which means we potentially have multiple copies in
455		// memory and thus concurrent write.
456		// Failing immediately here is the simple and safe solution to avoid
457		// complicated data loss.
458		return errors.New("entity missing from cache")
459	}
460	sc.lru.Get(id)
461	// sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
462	sc.excerpts[id] = sc.makeExcerpt(e)
463	sc.mu.Unlock()
464
465	index, err := sc.repo.GetIndex(sc.typename)
466	if err != nil {
467		return err
468	}
469
470	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
471	if err != nil {
472		return err
473	}
474
475	return sc.write()
476}
477
478// evictIfNeeded will evict an entity from the cache if needed
479func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
480	sc.mu.Lock()
481	defer sc.mu.Unlock()
482	if sc.lru.Len() <= sc.maxLoaded {
483		return
484	}
485
486	for _, id := range sc.lru.GetOldestToNewest() {
487		b := sc.cached[id]
488		if b.NeedCommit() {
489			continue
490		}
491
492		// TODO
493		// b.Lock()
494		sc.lru.Remove(id)
495		delete(sc.cached, id)
496
497		if sc.lru.Len() <= sc.maxLoaded {
498			return
499		}
500	}
501}