subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entities/identity"
 13	"github.com/MichaelMure/git-bug/entity"
 14	"github.com/MichaelMure/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19	setId(id entity.Id)
 20}
 21
 22type CacheEntity interface {
 23	Id() entity.Id
 24	NeedCommit() bool
 25	Lock()
 26}
 27
 28type getUserIdentityFunc func() (*IdentityCache, error)
 29
 30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 32// functions from this package, but right now identities are not using that framework.
 33type Actions[EntityT entity.Interface] struct {
 34	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 35	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 36	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 37	RemoveAll           func(repo repository.ClockedRepo) error
 38	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 39}
 40
 41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 42
 43type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 44	repo      repository.ClockedRepo
 45	resolvers func() entity.Resolvers
 46
 47	getUserIdentity getUserIdentityFunc
 48	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 49	makeExcerpt     func(CacheT) ExcerptT
 50	makeIndexData   func(CacheT) []string
 51	actions         Actions[EntityT]
 52
 53	typename  string
 54	namespace string
 55	version   uint
 56	maxLoaded int
 57
 58	mu       sync.RWMutex
 59	excerpts map[entity.Id]ExcerptT
 60	cached   map[entity.Id]CacheT
 61	lru      *lruIdCache
 62}
 63
 64func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 65	repo repository.ClockedRepo,
 66	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 67	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 68	makeExcerpt func(CacheT) ExcerptT,
 69	makeIndexData func(CacheT) []string,
 70	actions Actions[EntityT],
 71	typename, namespace string,
 72	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 73	return &SubCache[EntityT, ExcerptT, CacheT]{
 74		repo:            repo,
 75		resolvers:       resolvers,
 76		getUserIdentity: getUserIdentity,
 77		makeCached:      makeCached,
 78		makeExcerpt:     makeExcerpt,
 79		makeIndexData:   makeIndexData,
 80		actions:         actions,
 81		typename:        typename,
 82		namespace:       namespace,
 83		version:         version,
 84		maxLoaded:       maxLoaded,
 85		excerpts:        make(map[entity.Id]ExcerptT),
 86		cached:          make(map[entity.Id]CacheT),
 87		lru:             newLRUIdCache(),
 88	}
 89}
 90
 91func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 92	return sc.typename
 93}
 94
 95// Load will try to read from the disk the entity cache file
 96func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 97	sc.mu.Lock()
 98	defer sc.mu.Unlock()
 99
100	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
101	if err != nil {
102		return err
103	}
104
105	aux := struct {
106		Version  uint
107		Excerpts map[entity.Id]ExcerptT
108	}{}
109
110	decoder := gob.NewDecoder(f)
111	err = decoder.Decode(&aux)
112	if err != nil {
113		_ = f.Close()
114		return err
115	}
116
117	err = f.Close()
118	if err != nil {
119		return err
120	}
121
122	if aux.Version != sc.version {
123		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
124	}
125
126	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
127	// so we fix it here, which doubles as enforcing coherency.
128	for id, excerpt := range aux.Excerpts {
129		excerpt.setId(id)
130	}
131
132	sc.excerpts = aux.Excerpts
133
134	index, err := sc.repo.GetIndex(sc.namespace)
135	if err != nil {
136		return err
137	}
138
139	// simple heuristic to detect a mismatch between the index and the entities
140	count, err := index.DocCount()
141	if err != nil {
142		return err
143	}
144	if count != uint64(len(sc.excerpts)) {
145		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
146	}
147
148	// TODO: find a way to check lamport clocks
149
150	return nil
151}
152
153// Write will serialize on disk the entity cache file
154func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
155	sc.mu.RLock()
156	defer sc.mu.RUnlock()
157
158	var data bytes.Buffer
159
160	aux := struct {
161		Version  uint
162		Excerpts map[entity.Id]ExcerptT
163	}{
164		Version:  sc.version,
165		Excerpts: sc.excerpts,
166	}
167
168	encoder := gob.NewEncoder(&data)
169
170	err := encoder.Encode(aux)
171	if err != nil {
172		return err
173	}
174
175	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
176	if err != nil {
177		return err
178	}
179
180	_, err = f.Write(data.Bytes())
181	if err != nil {
182		_ = f.Close()
183		return err
184	}
185
186	return f.Close()
187}
188
189func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
190	out := make(chan BuildEvent)
191
192	go func() {
193		defer close(out)
194
195		out <- BuildEvent{
196			Typename: sc.typename,
197			Event:    BuildEventStarted,
198		}
199
200		sc.excerpts = make(map[entity.Id]ExcerptT)
201
202		allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
203
204		index, err := sc.repo.GetIndex(sc.namespace)
205		if err != nil {
206			out <- BuildEvent{
207				Typename: sc.typename,
208				Err:      err,
209			}
210			return
211		}
212
213		// wipe the index just to be sure
214		err = index.Clear()
215		if err != nil {
216			out <- BuildEvent{
217				Typename: sc.typename,
218				Err:      err,
219			}
220			return
221		}
222
223		indexer, indexEnd := index.IndexBatch()
224
225		for e := range allEntities {
226			if e.Err != nil {
227				out <- BuildEvent{
228					Typename: sc.typename,
229					Err:      e.Err,
230				}
231				return
232			}
233
234			cached := sc.makeCached(e.Entity, sc.entityUpdated)
235			sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
236			// might as well keep them in memory
237			sc.cached[e.Entity.Id()] = cached
238
239			indexData := sc.makeIndexData(cached)
240			if err := indexer(e.Entity.Id().String(), indexData); err != nil {
241				out <- BuildEvent{
242					Typename: sc.typename,
243					Err:      err,
244				}
245				return
246			}
247
248			out <- BuildEvent{
249				Typename: sc.typename,
250				Event:    BuildEventProgress,
251				Progress: e.CurrentEntity,
252				Total:    e.TotalEntities,
253			}
254		}
255
256		err = indexEnd()
257		if err != nil {
258			out <- BuildEvent{
259				Typename: sc.typename,
260				Err:      err,
261			}
262			return
263		}
264
265		err = sc.write()
266		if err != nil {
267			out <- BuildEvent{
268				Typename: sc.typename,
269				Err:      err,
270			}
271			return
272		}
273
274		out <- BuildEvent{
275			Typename: sc.typename,
276			Event:    BuildEventFinished,
277		}
278	}()
279
280	return out
281}
282
283func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
284	sc.maxLoaded = size
285	sc.evictIfNeeded()
286}
287
288func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
289	sc.mu.Lock()
290	defer sc.mu.Unlock()
291	sc.excerpts = nil
292	sc.cached = make(map[entity.Id]CacheT)
293	return nil
294}
295
296// AllIds return all known bug ids
297func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
298	sc.mu.RLock()
299	defer sc.mu.RUnlock()
300
301	result := make([]entity.Id, len(sc.excerpts))
302
303	i := 0
304	for _, excerpt := range sc.excerpts {
305		result[i] = excerpt.Id()
306		i++
307	}
308
309	return result
310}
311
312// Resolve retrieve an entity matching the exact given id
313func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
314	sc.mu.RLock()
315	cached, ok := sc.cached[id]
316	if ok {
317		sc.lru.Get(id)
318		sc.mu.RUnlock()
319		return cached, nil
320	}
321	sc.mu.RUnlock()
322
323	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
324	if err != nil {
325		return *new(CacheT), err
326	}
327
328	cached = sc.makeCached(e, sc.entityUpdated)
329
330	sc.mu.Lock()
331	sc.cached[id] = cached
332	sc.lru.Add(id)
333	sc.mu.Unlock()
334
335	sc.evictIfNeeded()
336
337	return cached, nil
338}
339
340// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
341// entity match.
342func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
343	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
344		return excerpt.Id().HasPrefix(prefix)
345	})
346}
347
348func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
349	id, err := sc.resolveMatcher(f)
350	if err != nil {
351		return *new(CacheT), err
352	}
353	return sc.Resolve(id)
354}
355
356// ResolveExcerpt retrieve an Excerpt matching the exact given id
357func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
358	sc.mu.RLock()
359	defer sc.mu.RUnlock()
360
361	excerpt, ok := sc.excerpts[id]
362	if !ok {
363		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
364	}
365
366	return excerpt, nil
367}
368
369// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
370// entity match.
371func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
372	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
373		return excerpt.Id().HasPrefix(prefix)
374	})
375}
376
377func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
378	id, err := sc.resolveMatcher(f)
379	if err != nil {
380		return *new(ExcerptT), err
381	}
382	return sc.ResolveExcerpt(id)
383}
384
385func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
386	sc.mu.RLock()
387	defer sc.mu.RUnlock()
388
389	// preallocate but empty
390	matching := make([]entity.Id, 0, 5)
391
392	for _, excerpt := range sc.excerpts {
393		if f(excerpt) {
394			matching = append(matching, excerpt.Id())
395		}
396	}
397
398	if len(matching) > 1 {
399		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
400	}
401
402	if len(matching) == 0 {
403		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
404	}
405
406	return matching[0], nil
407}
408
409func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
410	sc.mu.Lock()
411	if _, has := sc.cached[e.Id()]; has {
412		sc.mu.Unlock()
413		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
414	}
415
416	cached := sc.makeCached(e, sc.entityUpdated)
417	sc.cached[e.Id()] = cached
418	sc.lru.Add(e.Id())
419	sc.mu.Unlock()
420
421	sc.evictIfNeeded()
422
423	// force the write of the excerpt
424	err := sc.entityUpdated(e.Id())
425	if err != nil {
426		return *new(CacheT), err
427	}
428
429	return cached, nil
430}
431
432func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
433	e, err := sc.ResolvePrefix(prefix)
434	if err != nil {
435		return err
436	}
437
438	sc.mu.Lock()
439
440	err = sc.actions.Remove(sc.repo, e.Id())
441	if err != nil {
442		sc.mu.Unlock()
443		return err
444	}
445
446	delete(sc.cached, e.Id())
447	delete(sc.excerpts, e.Id())
448	sc.lru.Remove(e.Id())
449
450	index, err := sc.repo.GetIndex(sc.namespace)
451	if err != nil {
452		sc.mu.Unlock()
453		return err
454	}
455
456	err = index.Remove(e.Id().String())
457	sc.mu.Unlock()
458	if err != nil {
459		return err
460	}
461
462	return sc.write()
463}
464
465func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
466	sc.mu.Lock()
467
468	err := sc.actions.RemoveAll(sc.repo)
469	if err != nil {
470		sc.mu.Unlock()
471		return err
472	}
473
474	for id, _ := range sc.cached {
475		delete(sc.cached, id)
476		sc.lru.Remove(id)
477	}
478	for id, _ := range sc.excerpts {
479		delete(sc.excerpts, id)
480	}
481
482	index, err := sc.repo.GetIndex(sc.namespace)
483	if err != nil {
484		sc.mu.Unlock()
485		return err
486	}
487
488	err = index.Clear()
489	sc.mu.Unlock()
490	if err != nil {
491		return err
492	}
493
494	return sc.write()
495}
496
497func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
498	out := make(chan entity.MergeResult)
499
500	// Intercept merge results to update the cache properly
501	go func() {
502		defer close(out)
503
504		author, err := sc.getUserIdentity()
505		if err != nil {
506			out <- entity.NewMergeError(err, "")
507			return
508		}
509
510		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
511		for result := range results {
512			out <- result
513
514			if result.Err != nil {
515				continue
516			}
517
518			switch result.Status {
519			case entity.MergeStatusNew, entity.MergeStatusUpdated:
520				e := result.Entity.(EntityT)
521				cached := sc.makeCached(e, sc.entityUpdated)
522
523				sc.mu.Lock()
524				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
525				// might as well keep them in memory
526				sc.cached[result.Id] = cached
527				sc.mu.Unlock()
528			}
529		}
530
531		err = sc.write()
532		if err != nil {
533			out <- entity.NewMergeError(err, "")
534			return
535		}
536	}()
537
538	return out
539
540}
541
542func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
543	return sc.namespace
544}
545
546// entityUpdated is a callback to trigger when the excerpt of an entity changed
547func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
548	sc.mu.Lock()
549	e, ok := sc.cached[id]
550	if !ok {
551		sc.mu.Unlock()
552
553		// if the bug is not loaded at this point, it means it was loaded before
554		// but got evicted. Which means we potentially have multiple copies in
555		// memory and thus concurrent write.
556		// Failing immediately here is the simple and safe solution to avoid
557		// complicated data loss.
558		return errors.New("entity missing from cache")
559	}
560	sc.lru.Get(id)
561	// sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
562	sc.excerpts[id] = sc.makeExcerpt(e)
563	sc.mu.Unlock()
564
565	index, err := sc.repo.GetIndex(sc.namespace)
566	if err != nil {
567		return err
568	}
569
570	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
571	if err != nil {
572		return err
573	}
574
575	return sc.write()
576}
577
578// evictIfNeeded will evict an entity from the cache if needed
579func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
580	sc.mu.Lock()
581	defer sc.mu.Unlock()
582	if sc.lru.Len() <= sc.maxLoaded {
583		return
584	}
585
586	for _, id := range sc.lru.GetOldestToNewest() {
587		b := sc.cached[id]
588		if b.NeedCommit() {
589			continue
590		}
591
592		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
593		// if something try to do it anyway, it will lock the program and make it obvious.
594		b.Lock()
595
596		sc.lru.Remove(id)
597		delete(sc.cached, id)
598
599		if sc.lru.Len() <= sc.maxLoaded {
600			return
601		}
602	}
603}