subcache.go

  1package cache
  2
  3import (
  4	"bytes"
  5	"encoding/gob"
  6	"fmt"
  7	"os"
  8	"sync"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entities/bug"
 13	"github.com/MichaelMure/git-bug/entity"
 14	"github.com/MichaelMure/git-bug/repository"
 15)
 16
 17type Excerpt interface {
 18	Id() entity.Id
 19}
 20
 21type CacheEntity interface {
 22	Id() entity.Id
 23	NeedCommit() bool
 24}
 25
 26type getUserIdentityFunc func() (*IdentityCache, error)
 27
 28type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
 29	repo      repository.ClockedRepo
 30	resolvers func() entity.Resolvers
 31
 32	getUserIdentity  getUserIdentityFunc
 33	readWithResolver func(repository.ClockedRepo, entity.Resolvers, entity.Id) (EntityT, error)
 34	makeCached       func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
 35	makeExcerpt      func(EntityT) ExcerptT
 36	makeIndex        func(CacheT) []string
 37
 38	typename  string
 39	namespace string
 40	version   uint
 41	maxLoaded int
 42
 43	mu       sync.RWMutex
 44	excerpts map[entity.Id]ExcerptT
 45	cached   map[entity.Id]CacheT
 46	lru      *lruIdCache
 47}
 48
 49func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
 50	repo repository.ClockedRepo,
 51	resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
 52	makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
 53	makeExcerpt func(EntityT) ExcerptT,
 54	makeIndex func(CacheT) []string,
 55	typename, namespace string,
 56	version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
 57	return &SubCache[EntityT, ExcerptT, CacheT]{
 58		repo:            repo,
 59		resolvers:       resolvers,
 60		getUserIdentity: getUserIdentity,
 61		typename:        typename,
 62		namespace:       namespace,
 63		version:         version,
 64		maxLoaded:       maxLoaded,
 65		excerpts:        make(map[entity.Id]ExcerptT),
 66		cached:          make(map[entity.Id]CacheT),
 67		lru:             newLRUIdCache(),
 68	}
 69}
 70
 71func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
 72	return sc.typename
 73}
 74
 75// Load will try to read from the disk the entity cache file
 76func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
 77	sc.mu.Lock()
 78	defer sc.mu.Unlock()
 79
 80	f, err := sc.repo.LocalStorage().Open(sc.namespace + "-file")
 81	if err != nil {
 82		return err
 83	}
 84
 85	decoder := gob.NewDecoder(f)
 86
 87	aux := struct {
 88		Version  uint
 89		Excerpts map[entity.Id]ExcerptT
 90	}{}
 91
 92	err = decoder.Decode(&aux)
 93	if err != nil {
 94		return err
 95	}
 96
 97	if aux.Version != sc.version {
 98		return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
 99	}
100
101	sc.excerpts = aux.Excerpts
102
103	index, err := sc.repo.GetIndex(sc.typename)
104	if err != nil {
105		return err
106	}
107
108	// simple heuristic to detect a mismatch between the index and the entities
109	count, err := index.DocCount()
110	if err != nil {
111		return err
112	}
113	if count != uint64(len(sc.excerpts)) {
114		return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
115	}
116
117	return nil
118}
119
120// Write will serialize on disk the entity cache file
121func (sc *SubCache[EntityT, ExcerptT, CacheT]) Write() error {
122	sc.mu.RLock()
123	defer sc.mu.RUnlock()
124
125	var data bytes.Buffer
126
127	aux := struct {
128		Version  uint
129		Excerpts map[entity.Id]ExcerptT
130	}{
131		Version:  sc.version,
132		Excerpts: sc.excerpts,
133	}
134
135	encoder := gob.NewEncoder(&data)
136
137	err := encoder.Encode(aux)
138	if err != nil {
139		return err
140	}
141
142	f, err := sc.repo.LocalStorage().Create(sc.namespace + "-file")
143	if err != nil {
144		return err
145	}
146
147	_, err = f.Write(data.Bytes())
148	if err != nil {
149		return err
150	}
151
152	return f.Close()
153}
154
155func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
156	sc.excerpts = make(map[entity.Id]ExcerptT)
157
158	sc.readWithResolver
159
160	allBugs := bug.ReadAllWithResolver(c.repo, c.resolvers)
161
162	index, err := sc.repo.GetIndex(sc.typename)
163	if err != nil {
164		return err
165	}
166
167	// wipe the index just to be sure
168	err = index.Clear()
169	if err != nil {
170		return err
171	}
172
173	indexer, indexEnd := index.IndexBatch()
174
175	for b := range allBugs {
176		if b.Err != nil {
177			return b.Err
178		}
179
180		snap := b.Bug.Compile()
181		c.bugExcerpts[b.Bug.Id()] = NewBugExcerpt(b.Bug, snap)
182
183		if err := indexer(snap); err != nil {
184			return err
185		}
186	}
187
188	err = indexEnd()
189	if err != nil {
190		return err
191	}
192
193	_, _ = fmt.Fprintln(os.Stderr, "Done.")
194	return nil
195}
196
197func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
198	sc.mu.Lock()
199	defer sc.mu.Unlock()
200	sc.excerpts = nil
201	sc.cached = make(map[entity.Id]CacheT)
202	return nil
203}
204
205// AllIds return all known bug ids
206func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
207	sc.mu.RLock()
208	defer sc.mu.RUnlock()
209
210	result := make([]entity.Id, len(sc.excerpts))
211
212	i := 0
213	for _, excerpt := range sc.excerpts {
214		result[i] = excerpt.Id()
215		i++
216	}
217
218	return result
219}
220
221// Resolve retrieve an entity matching the exact given id
222func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
223	sc.mu.RLock()
224	cached, ok := sc.cached[id]
225	if ok {
226		sc.lru.Get(id)
227		sc.mu.RUnlock()
228		return cached, nil
229	}
230	sc.mu.RUnlock()
231
232	e, err := sc.readWithResolver(sc.repo, sc.resolvers(), id)
233	if err != nil {
234		return *new(CacheT), err
235	}
236
237	cached = sc.makeCached(e, sc.entityUpdated)
238
239	sc.mu.Lock()
240	sc.cached[id] = cached
241	sc.lru.Add(id)
242	sc.mu.Unlock()
243
244	sc.evictIfNeeded()
245
246	return cached, nil
247}
248
249// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
250// entity match.
251func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
252	return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
253		return excerpt.Id().HasPrefix(prefix)
254	})
255}
256
257func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
258	id, err := sc.resolveMatcher(f)
259	if err != nil {
260		return *new(CacheT), err
261	}
262	return sc.Resolve(id)
263}
264
265// ResolveExcerpt retrieve an Excerpt matching the exact given id
266func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
267	sc.mu.RLock()
268	defer sc.mu.RUnlock()
269
270	excerpt, ok := sc.excerpts[id]
271	if !ok {
272		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
273	}
274
275	return excerpt, nil
276}
277
278// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
279// entity match.
280func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
281	return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
282		return excerpt.Id().HasPrefix(prefix)
283	})
284}
285
286func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
287	id, err := sc.resolveMatcher(f)
288	if err != nil {
289		return *new(ExcerptT), err
290	}
291	return sc.ResolveExcerpt(id)
292}
293
294func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
295	sc.mu.RLock()
296	defer sc.mu.RUnlock()
297
298	// preallocate but empty
299	matching := make([]entity.Id, 0, 5)
300
301	for _, excerpt := range sc.excerpts {
302		if f(excerpt) {
303			matching = append(matching, excerpt.Id())
304		}
305	}
306
307	if len(matching) > 1 {
308		return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
309	}
310
311	if len(matching) == 0 {
312		return entity.UnsetId, entity.NewErrNotFound(sc.typename)
313	}
314
315	return matching[0], nil
316}
317
318var errNotInCache = errors.New("entity missing from cache")
319
320func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
321	sc.mu.Lock()
322	if _, has := sc.cached[e.Id()]; has {
323		sc.mu.Unlock()
324		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
325	}
326
327	cached := sc.makeCached(e, sc.entityUpdated)
328	sc.cached[e.Id()] = cached
329	sc.lru.Add(e.Id())
330	sc.mu.Unlock()
331
332	sc.evictIfNeeded()
333
334	// force the write of the excerpt
335	err := sc.entityUpdated(e.Id())
336	if err != nil {
337		return *new(CacheT), err
338	}
339
340	return cached, nil
341}
342
343func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
344	e, err := sc.ResolvePrefix(prefix)
345	if err != nil {
346		return err
347	}
348
349	sc.mu.Lock()
350
351	err = bug.Remove(c.repo, b.Id())
352	if err != nil {
353		c.muBug.Unlock()
354
355		return err
356	}
357
358	delete(c.bugs, b.Id())
359	delete(c.bugExcerpts, b.Id())
360	c.loadedBugs.Remove(b.Id())
361
362	c.muBug.Unlock()
363
364	return c.writeBugCache()
365}
366
367// entityUpdated is a callback to trigger when the excerpt of an entity changed
368func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
369	sc.mu.Lock()
370	b, ok := sc.cached[id]
371	if !ok {
372		sc.mu.Unlock()
373
374		// if the bug is not loaded at this point, it means it was loaded before
375		// but got evicted. Which means we potentially have multiple copies in
376		// memory and thus concurrent write.
377		// Failing immediately here is the simple and safe solution to avoid
378		// complicated data loss.
379		return errNotInCache
380	}
381	sc.lru.Get(id)
382	// sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
383	sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
384	sc.mu.Unlock()
385
386	if err := sc.addBugToSearchIndex(b.Snapshot()); err != nil {
387		return err
388	}
389
390	// we only need to write the bug cache
391	return sc.Write()
392}
393
394// evictIfNeeded will evict an entity from the cache if needed
395func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
396	sc.mu.Lock()
397	defer sc.mu.Unlock()
398	if sc.lru.Len() <= sc.maxLoaded {
399		return
400	}
401
402	for _, id := range sc.lru.GetOldestToNewest() {
403		b := sc.cached[id]
404		if b.NeedCommit() {
405			continue
406		}
407
408		b.Lock()
409		sc.lru.Remove(id)
410		delete(sc.cached, id)
411
412		if sc.lru.Len() <= sc.maxLoaded {
413			return
414		}
415	}
416}