1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 "github.com/MichaelMure/git-bug/repository"
15)
16
17type Excerpt interface {
18 Id() entity.Id
19 setId(id entity.Id)
20}
21
22type CacheEntity interface {
23 Id() entity.Id
24 NeedCommit() bool
25 Lock()
26}
27
28type getUserIdentityFunc func() (*IdentityCache, error)
29
30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
32// functions from this package, but right now identities are not using that framework.
33type Actions[EntityT entity.Interface] struct {
34 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
35 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
36 Remove func(repo repository.ClockedRepo, id entity.Id) error
37 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
38}
39
40var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
41
42type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
43 repo repository.ClockedRepo
44 resolvers func() entity.Resolvers
45
46 getUserIdentity getUserIdentityFunc
47 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
48 makeExcerpt func(CacheT) ExcerptT
49 makeIndexData func(CacheT) []string
50 actions Actions[EntityT]
51
52 typename string
53 namespace string
54 version uint
55 maxLoaded int
56
57 mu sync.RWMutex
58 excerpts map[entity.Id]ExcerptT
59 cached map[entity.Id]CacheT
60 lru *lruIdCache
61}
62
63func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
64 repo repository.ClockedRepo,
65 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
66 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
67 makeExcerpt func(CacheT) ExcerptT,
68 makeIndexData func(CacheT) []string,
69 actions Actions[EntityT],
70 typename, namespace string,
71 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
72 return &SubCache[EntityT, ExcerptT, CacheT]{
73 repo: repo,
74 resolvers: resolvers,
75 getUserIdentity: getUserIdentity,
76 makeCached: makeCached,
77 makeExcerpt: makeExcerpt,
78 makeIndexData: makeIndexData,
79 actions: actions,
80 typename: typename,
81 namespace: namespace,
82 version: version,
83 maxLoaded: maxLoaded,
84 excerpts: make(map[entity.Id]ExcerptT),
85 cached: make(map[entity.Id]CacheT),
86 lru: newLRUIdCache(),
87 }
88}
89
90func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
91 return sc.typename
92}
93
94// Load will try to read from the disk the entity cache file
95func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
96 sc.mu.Lock()
97 defer sc.mu.Unlock()
98
99 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
100 if err != nil {
101 return err
102 }
103
104 decoder := gob.NewDecoder(f)
105
106 aux := struct {
107 Version uint
108 Excerpts map[entity.Id]ExcerptT
109 }{}
110
111 err = decoder.Decode(&aux)
112 if err != nil {
113 return err
114 }
115
116 if aux.Version != sc.version {
117 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
118 }
119
120 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
121 // so we fix it here, which doubles as enforcing coherency.
122 for id, excerpt := range aux.Excerpts {
123 excerpt.setId(id)
124 }
125
126 sc.excerpts = aux.Excerpts
127
128 index, err := sc.repo.GetIndex(sc.namespace)
129 if err != nil {
130 return err
131 }
132
133 // simple heuristic to detect a mismatch between the index and the entities
134 count, err := index.DocCount()
135 if err != nil {
136 return err
137 }
138 if count != uint64(len(sc.excerpts)) {
139 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
140 }
141
142 // TODO: find a way to check lamport clocks
143
144 return nil
145}
146
147// Write will serialize on disk the entity cache file
148func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
149 sc.mu.RLock()
150 defer sc.mu.RUnlock()
151
152 var data bytes.Buffer
153
154 aux := struct {
155 Version uint
156 Excerpts map[entity.Id]ExcerptT
157 }{
158 Version: sc.version,
159 Excerpts: sc.excerpts,
160 }
161
162 encoder := gob.NewEncoder(&data)
163
164 err := encoder.Encode(aux)
165 if err != nil {
166 return err
167 }
168
169 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
170 if err != nil {
171 return err
172 }
173
174 _, err = f.Write(data.Bytes())
175 if err != nil {
176 return err
177 }
178
179 return f.Close()
180}
181
182func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
183 sc.excerpts = make(map[entity.Id]ExcerptT)
184
185 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
186
187 index, err := sc.repo.GetIndex(sc.namespace)
188 if err != nil {
189 return err
190 }
191
192 // wipe the index just to be sure
193 err = index.Clear()
194 if err != nil {
195 return err
196 }
197
198 indexer, indexEnd := index.IndexBatch()
199
200 for e := range allEntities {
201 if e.Err != nil {
202 return e.Err
203 }
204
205 cached := sc.makeCached(e.Entity, sc.entityUpdated)
206 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
207 // might as well keep them in memory
208 sc.cached[e.Entity.Id()] = cached
209
210 indexData := sc.makeIndexData(cached)
211 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
212 return err
213 }
214 }
215
216 err = indexEnd()
217 if err != nil {
218 return err
219 }
220
221 err = sc.write()
222 if err != nil {
223 return err
224 }
225
226 return nil
227}
228
229func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
230 sc.maxLoaded = size
231 sc.evictIfNeeded()
232}
233
234func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
235 sc.mu.Lock()
236 defer sc.mu.Unlock()
237 sc.excerpts = nil
238 sc.cached = make(map[entity.Id]CacheT)
239 return nil
240}
241
242// AllIds return all known bug ids
243func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
244 sc.mu.RLock()
245 defer sc.mu.RUnlock()
246
247 result := make([]entity.Id, len(sc.excerpts))
248
249 i := 0
250 for _, excerpt := range sc.excerpts {
251 result[i] = excerpt.Id()
252 i++
253 }
254
255 return result
256}
257
258// Resolve retrieve an entity matching the exact given id
259func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
260 sc.mu.RLock()
261 cached, ok := sc.cached[id]
262 if ok {
263 sc.lru.Get(id)
264 sc.mu.RUnlock()
265 return cached, nil
266 }
267 sc.mu.RUnlock()
268
269 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
270 if err != nil {
271 return *new(CacheT), err
272 }
273
274 cached = sc.makeCached(e, sc.entityUpdated)
275
276 sc.mu.Lock()
277 sc.cached[id] = cached
278 sc.lru.Add(id)
279 sc.mu.Unlock()
280
281 sc.evictIfNeeded()
282
283 return cached, nil
284}
285
286// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
287// entity match.
288func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
289 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
290 return excerpt.Id().HasPrefix(prefix)
291 })
292}
293
294func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
295 id, err := sc.resolveMatcher(f)
296 if err != nil {
297 return *new(CacheT), err
298 }
299 return sc.Resolve(id)
300}
301
302// ResolveExcerpt retrieve an Excerpt matching the exact given id
303func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
304 sc.mu.RLock()
305 defer sc.mu.RUnlock()
306
307 excerpt, ok := sc.excerpts[id]
308 if !ok {
309 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
310 }
311
312 return excerpt, nil
313}
314
315// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
316// entity match.
317func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
318 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
319 return excerpt.Id().HasPrefix(prefix)
320 })
321}
322
323func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
324 id, err := sc.resolveMatcher(f)
325 if err != nil {
326 return *new(ExcerptT), err
327 }
328 return sc.ResolveExcerpt(id)
329}
330
331func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
332 sc.mu.RLock()
333 defer sc.mu.RUnlock()
334
335 // preallocate but empty
336 matching := make([]entity.Id, 0, 5)
337
338 for _, excerpt := range sc.excerpts {
339 if f(excerpt) {
340 matching = append(matching, excerpt.Id())
341 }
342 }
343
344 if len(matching) > 1 {
345 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
346 }
347
348 if len(matching) == 0 {
349 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
350 }
351
352 return matching[0], nil
353}
354
355func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
356 sc.mu.Lock()
357 if _, has := sc.cached[e.Id()]; has {
358 sc.mu.Unlock()
359 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
360 }
361
362 cached := sc.makeCached(e, sc.entityUpdated)
363 sc.cached[e.Id()] = cached
364 sc.lru.Add(e.Id())
365 sc.mu.Unlock()
366
367 sc.evictIfNeeded()
368
369 // force the write of the excerpt
370 err := sc.entityUpdated(e.Id())
371 if err != nil {
372 return *new(CacheT), err
373 }
374
375 return cached, nil
376}
377
378func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
379 e, err := sc.ResolvePrefix(prefix)
380 if err != nil {
381 return err
382 }
383
384 sc.mu.Lock()
385
386 err = sc.actions.Remove(sc.repo, e.Id())
387 if err != nil {
388 sc.mu.Unlock()
389 return err
390 }
391
392 delete(sc.cached, e.Id())
393 delete(sc.excerpts, e.Id())
394 sc.lru.Remove(e.Id())
395
396 sc.mu.Unlock()
397
398 return sc.write()
399}
400
401func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
402 out := make(chan entity.MergeResult)
403
404 // Intercept merge results to update the cache properly
405 go func() {
406 defer close(out)
407
408 author, err := sc.getUserIdentity()
409 if err != nil {
410 out <- entity.NewMergeError(err, "")
411 return
412 }
413
414 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
415 for result := range results {
416 out <- result
417
418 if result.Err != nil {
419 continue
420 }
421
422 switch result.Status {
423 case entity.MergeStatusNew, entity.MergeStatusUpdated:
424 e := result.Entity.(EntityT)
425 cached := sc.makeCached(e, sc.entityUpdated)
426
427 sc.mu.Lock()
428 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
429 // might as well keep them in memory
430 sc.cached[result.Id] = cached
431 sc.mu.Unlock()
432 }
433 }
434
435 err = sc.write()
436 if err != nil {
437 out <- entity.NewMergeError(err, "")
438 return
439 }
440 }()
441
442 return out
443
444}
445
446func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
447 return sc.namespace
448}
449
450// entityUpdated is a callback to trigger when the excerpt of an entity changed
451func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
452 sc.mu.Lock()
453 e, ok := sc.cached[id]
454 if !ok {
455 sc.mu.Unlock()
456
457 // if the bug is not loaded at this point, it means it was loaded before
458 // but got evicted. Which means we potentially have multiple copies in
459 // memory and thus concurrent write.
460 // Failing immediately here is the simple and safe solution to avoid
461 // complicated data loss.
462 return errors.New("entity missing from cache")
463 }
464 sc.lru.Get(id)
465 // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
466 sc.excerpts[id] = sc.makeExcerpt(e)
467 sc.mu.Unlock()
468
469 index, err := sc.repo.GetIndex(sc.namespace)
470 if err != nil {
471 return err
472 }
473
474 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
475 if err != nil {
476 return err
477 }
478
479 return sc.write()
480}
481
482// evictIfNeeded will evict an entity from the cache if needed
483func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
484 sc.mu.Lock()
485 defer sc.mu.Unlock()
486 if sc.lru.Len() <= sc.maxLoaded {
487 return
488 }
489
490 for _, id := range sc.lru.GetOldestToNewest() {
491 b := sc.cached[id]
492 if b.NeedCommit() {
493 continue
494 }
495
496 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
497 // if something try to do it anyway, it will lock the program and make it obvious.
498 b.Lock()
499
500 sc.lru.Remove(id)
501 delete(sc.cached, id)
502
503 if sc.lru.Len() <= sc.maxLoaded {
504 return
505 }
506 }
507}