subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"path/filepath"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entities/identity"
 13	"github.com/MichaelMure/git-bug/entity"
 14	"github.com/MichaelMure/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19	setId(id entity.Id)
 20}
 21
 22type CacheEntity interface {
 23	Id() entity.Id
 24	NeedCommit() bool
 25	Lock()
 26}
 27
 28type getUserIdentityFunc func() (*IdentityCache, error)
 29
 30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
 31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
 32// functions from this package, but right now identities are not using that framework.
 33type Actions[EntityT entity.Interface] struct {
 34	ReadWithResolver    func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 35	ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
 36	Remove              func(repo repository.ClockedRepo, id entity.Id) error
 37	MergeAll            func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 38}
 39
 40var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
 41
 42type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 43	repo      repository.ClockedRepo
 44	resolvers func() entity.Resolvers
 45
 46	getUserIdentity getUserIdentityFunc
 47	makeCached      func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 48	makeExcerpt     func(CacheT) ExcerptT
 49	makeIndexData   func(CacheT) []string
 50	actions         Actions[EntityT]
 51
 52	typename  string
 53	namespace string
 54	version   uint
 55	maxLoaded int
 56
 57	mu       sync.RWMutex
 58	excerpts map[entity.Id]ExcerptT
 59	cached   map[entity.Id]CacheT
 60	lru      *lruIdCache
 61}
 62
 63func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 64	repo repository.ClockedRepo,
 65	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 66	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 67	makeExcerpt func(CacheT) ExcerptT,
 68	makeIndexData func(CacheT) []string,
 69	actions Actions[EntityT],
 70	typename, namespace string,
 71	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 72	return &SubCache[EntityT, ExcerptT, CacheT]{
 73		repo:            repo,
 74		resolvers:       resolvers,
 75		getUserIdentity: getUserIdentity,
 76		makeCached:      makeCached,
 77		makeExcerpt:     makeExcerpt,
 78		makeIndexData:   makeIndexData,
 79		actions:         actions,
 80		typename:        typename,
 81		namespace:       namespace,
 82		version:         version,
 83		maxLoaded:       maxLoaded,
 84		excerpts:        make(map[entity.Id]ExcerptT),
 85		cached:          make(map[entity.Id]CacheT),
 86		lru:             newLRUIdCache(),
 87	}
 88}
 89
 90func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 91	return sc.typename
 92}
 93
 94// Load will try to read from the disk the entity cache file
 95func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 96	sc.mu.Lock()
 97	defer sc.mu.Unlock()
 98
 99	f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
100	if err != nil {
101		return err
102	}
103
104	aux := struct {
105		Version  uint
106		Excerpts map[entity.Id]ExcerptT
107	}{}
108
109	decoder := gob.NewDecoder(f)
110	err = decoder.Decode(&aux)
111	if err != nil {
112		_ = f.Close()
113		return err
114	}
115
116	err = f.Close()
117	if err != nil {
118		return err
119	}
120
121	if aux.Version != sc.version {
122		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
123	}
124
125	// the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
126	// so we fix it here, which doubles as enforcing coherency.
127	for id, excerpt := range aux.Excerpts {
128		excerpt.setId(id)
129	}
130
131	sc.excerpts = aux.Excerpts
132
133	index, err := sc.repo.GetIndex(sc.namespace)
134	if err != nil {
135		return err
136	}
137
138	// simple heuristic to detect a mismatch between the index and the entities
139	count, err := index.DocCount()
140	if err != nil {
141		return err
142	}
143	if count != uint64(len(sc.excerpts)) {
144		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
145	}
146
147	// TODO: find a way to check lamport clocks
148
149	return nil
150}
151
152// Write will serialize on disk the entity cache file
153func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
154	sc.mu.RLock()
155	defer sc.mu.RUnlock()
156
157	var data bytes.Buffer
158
159	aux := struct {
160		Version  uint
161		Excerpts map[entity.Id]ExcerptT
162	}{
163		Version:  sc.version,
164		Excerpts: sc.excerpts,
165	}
166
167	encoder := gob.NewEncoder(&data)
168
169	err := encoder.Encode(aux)
170	if err != nil {
171		return err
172	}
173
174	f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
175	if err != nil {
176		return err
177	}
178
179	_, err = f.Write(data.Bytes())
180	if err != nil {
181		_ = f.Close()
182		return err
183	}
184
185	return f.Close()
186}
187
188func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
189	out := make(chan BuildEvent)
190
191	go func() {
192		defer close(out)
193
194		out <- BuildEvent{
195			Typename: sc.typename,
196			Event:    BuildEventStarted,
197		}
198
199		sc.excerpts = make(map[entity.Id]ExcerptT)
200
201		allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
202
203		index, err := sc.repo.GetIndex(sc.namespace)
204		if err != nil {
205			out <- BuildEvent{
206				Typename: sc.typename,
207				Err:      err,
208			}
209			return
210		}
211
212		// wipe the index just to be sure
213		err = index.Clear()
214		if err != nil {
215			out <- BuildEvent{
216				Typename: sc.typename,
217				Err:      err,
218			}
219			return
220		}
221
222		indexer, indexEnd := index.IndexBatch()
223
224		for e := range allEntities {
225			if e.Err != nil {
226				out <- BuildEvent{
227					Typename: sc.typename,
228					Err:      e.Err,
229				}
230				return
231			}
232
233			cached := sc.makeCached(e.Entity, sc.entityUpdated)
234			sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
235			// might as well keep them in memory
236			sc.cached[e.Entity.Id()] = cached
237
238			indexData := sc.makeIndexData(cached)
239			if err := indexer(e.Entity.Id().String(), indexData); err != nil {
240				out <- BuildEvent{
241					Typename: sc.typename,
242					Err:      err,
243				}
244				return
245			}
246
247			out <- BuildEvent{
248				Typename: sc.typename,
249				Event:    BuildEventProgress,
250				Progress: e.CurrentEntity,
251				Total:    e.TotalEntities,
252			}
253		}
254
255		err = indexEnd()
256		if err != nil {
257			out <- BuildEvent{
258				Typename: sc.typename,
259				Err:      err,
260			}
261			return
262		}
263
264		err = sc.write()
265		if err != nil {
266			out <- BuildEvent{
267				Typename: sc.typename,
268				Err:      err,
269			}
270			return
271		}
272
273		out <- BuildEvent{
274			Typename: sc.typename,
275			Event:    BuildEventFinished,
276		}
277	}()
278
279	return out
280}
281
282func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
283	sc.maxLoaded = size
284	sc.evictIfNeeded()
285}
286
287func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
288	sc.mu.Lock()
289	defer sc.mu.Unlock()
290	sc.excerpts = nil
291	sc.cached = make(map[entity.Id]CacheT)
292	return nil
293}
294
295// AllIds return all known bug ids
296func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
297	sc.mu.RLock()
298	defer sc.mu.RUnlock()
299
300	result := make([]entity.Id, len(sc.excerpts))
301
302	i := 0
303	for _, excerpt := range sc.excerpts {
304		result[i] = excerpt.Id()
305		i++
306	}
307
308	return result
309}
310
311// Resolve retrieve an entity matching the exact given id
312func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
313	sc.mu.RLock()
314	cached, ok := sc.cached[id]
315	if ok {
316		sc.lru.Get(id)
317		sc.mu.RUnlock()
318		return cached, nil
319	}
320	sc.mu.RUnlock()
321
322	e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
323	if err != nil {
324		return *new(CacheT), err
325	}
326
327	cached = sc.makeCached(e, sc.entityUpdated)
328
329	sc.mu.Lock()
330	sc.cached[id] = cached
331	sc.lru.Add(id)
332	sc.mu.Unlock()
333
334	sc.evictIfNeeded()
335
336	return cached, nil
337}
338
339// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
340// entity match.
341func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
342	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
343		return excerpt.Id().HasPrefix(prefix)
344	})
345}
346
347func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
348	id, err := sc.resolveMatcher(f)
349	if err != nil {
350		return *new(CacheT), err
351	}
352	return sc.Resolve(id)
353}
354
355// ResolveExcerpt retrieve an Excerpt matching the exact given id
356func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
357	sc.mu.RLock()
358	defer sc.mu.RUnlock()
359
360	excerpt, ok := sc.excerpts[id]
361	if !ok {
362		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
363	}
364
365	return excerpt, nil
366}
367
368// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
369// entity match.
370func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
371	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
372		return excerpt.Id().HasPrefix(prefix)
373	})
374}
375
376func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
377	id, err := sc.resolveMatcher(f)
378	if err != nil {
379		return *new(ExcerptT), err
380	}
381	return sc.ResolveExcerpt(id)
382}
383
384func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
385	sc.mu.RLock()
386	defer sc.mu.RUnlock()
387
388	// preallocate but empty
389	matching := make([]entity.Id, 0, 5)
390
391	for _, excerpt := range sc.excerpts {
392		if f(excerpt) {
393			matching = append(matching, excerpt.Id())
394		}
395	}
396
397	if len(matching) > 1 {
398		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
399	}
400
401	if len(matching) == 0 {
402		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
403	}
404
405	return matching[0], nil
406}
407
408func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
409	sc.mu.Lock()
410	if _, has := sc.cached[e.Id()]; has {
411		sc.mu.Unlock()
412		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
413	}
414
415	cached := sc.makeCached(e, sc.entityUpdated)
416	sc.cached[e.Id()] = cached
417	sc.lru.Add(e.Id())
418	sc.mu.Unlock()
419
420	sc.evictIfNeeded()
421
422	// force the write of the excerpt
423	err := sc.entityUpdated(e.Id())
424	if err != nil {
425		return *new(CacheT), err
426	}
427
428	return cached, nil
429}
430
431func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
432	e, err := sc.ResolvePrefix(prefix)
433	if err != nil {
434		return err
435	}
436
437	sc.mu.Lock()
438
439	err = sc.actions.Remove(sc.repo, e.Id())
440	if err != nil {
441		sc.mu.Unlock()
442		return err
443	}
444
445	delete(sc.cached, e.Id())
446	delete(sc.excerpts, e.Id())
447	sc.lru.Remove(e.Id())
448
449	sc.mu.Unlock()
450
451	return sc.write()
452}
453
454func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
455	out := make(chan entity.MergeResult)
456
457	// Intercept merge results to update the cache properly
458	go func() {
459		defer close(out)
460
461		author, err := sc.getUserIdentity()
462		if err != nil {
463			out <- entity.NewMergeError(err, "")
464			return
465		}
466
467		results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
468		for result := range results {
469			out <- result
470
471			if result.Err != nil {
472				continue
473			}
474
475			switch result.Status {
476			case entity.MergeStatusNew, entity.MergeStatusUpdated:
477				e := result.Entity.(EntityT)
478				cached := sc.makeCached(e, sc.entityUpdated)
479
480				sc.mu.Lock()
481				sc.excerpts[result.Id] = sc.makeExcerpt(cached)
482				// might as well keep them in memory
483				sc.cached[result.Id] = cached
484				sc.mu.Unlock()
485			}
486		}
487
488		err = sc.write()
489		if err != nil {
490			out <- entity.NewMergeError(err, "")
491			return
492		}
493	}()
494
495	return out
496
497}
498
499func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
500	return sc.namespace
501}
502
503// entityUpdated is a callback to trigger when the excerpt of an entity changed
504func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
505	sc.mu.Lock()
506	e, ok := sc.cached[id]
507	if !ok {
508		sc.mu.Unlock()
509
510		// if the bug is not loaded at this point, it means it was loaded before
511		// but got evicted. Which means we potentially have multiple copies in
512		// memory and thus concurrent write.
513		// Failing immediately here is the simple and safe solution to avoid
514		// complicated data loss.
515		return errors.New("entity missing from cache")
516	}
517	sc.lru.Get(id)
518	// sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
519	sc.excerpts[id] = sc.makeExcerpt(e)
520	sc.mu.Unlock()
521
522	index, err := sc.repo.GetIndex(sc.namespace)
523	if err != nil {
524		return err
525	}
526
527	err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
528	if err != nil {
529		return err
530	}
531
532	return sc.write()
533}
534
535// evictIfNeeded will evict an entity from the cache if needed
536func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
537	sc.mu.Lock()
538	defer sc.mu.Unlock()
539	if sc.lru.Len() <= sc.maxLoaded {
540		return
541	}
542
543	for _, id := range sc.lru.GetOldestToNewest() {
544		b := sc.cached[id]
545		if b.NeedCommit() {
546			continue
547		}
548
549		// as a form of assurance that evicted entities don't get manipulated, we lock them here.
550		// if something try to do it anyway, it will lock the program and make it obvious.
551		b.Lock()
552
553		sc.lru.Remove(id)
554		delete(sc.cached, id)
555
556		if sc.lru.Len() <= sc.maxLoaded {
557			return
558		}
559	}
560}