1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 "github.com/MichaelMure/git-bug/repository"
15)
16
17type Excerpt interface {
18 Id() entity.Id
19 setId(id entity.Id)
20}
21
22type CacheEntity interface {
23 Id() entity.Id
24 NeedCommit() bool
25 Lock()
26}
27
28type getUserIdentityFunc func() (*IdentityCache, error)
29
30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
32// functions from this package, but right now identities are not using that framework.
33type Actions[EntityT entity.Interface] struct {
34 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
35 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
36 Remove func(repo repository.ClockedRepo, id entity.Id) error
37 RemoveAll func(repo repository.ClockedRepo) error
38 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
39}
40
41var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
42
43type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
44 repo repository.ClockedRepo
45 resolvers func() entity.Resolvers
46
47 getUserIdentity getUserIdentityFunc
48 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
49 makeExcerpt func(CacheT) ExcerptT
50 makeIndexData func(CacheT) []string
51 actions Actions[EntityT]
52
53 typename string
54 namespace string
55 version uint
56 maxLoaded int
57
58 mu sync.RWMutex
59 excerpts map[entity.Id]ExcerptT
60 cached map[entity.Id]CacheT
61 lru *lruIdCache
62}
63
64func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
65 repo repository.ClockedRepo,
66 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
67 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
68 makeExcerpt func(CacheT) ExcerptT,
69 makeIndexData func(CacheT) []string,
70 actions Actions[EntityT],
71 typename, namespace string,
72 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
73 return &SubCache[EntityT, ExcerptT, CacheT]{
74 repo: repo,
75 resolvers: resolvers,
76 getUserIdentity: getUserIdentity,
77 makeCached: makeCached,
78 makeExcerpt: makeExcerpt,
79 makeIndexData: makeIndexData,
80 actions: actions,
81 typename: typename,
82 namespace: namespace,
83 version: version,
84 maxLoaded: maxLoaded,
85 excerpts: make(map[entity.Id]ExcerptT),
86 cached: make(map[entity.Id]CacheT),
87 lru: newLRUIdCache(),
88 }
89}
90
91func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
92 return sc.typename
93}
94
95// Load will try to read from the disk the entity cache file
96func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
97 sc.mu.Lock()
98 defer sc.mu.Unlock()
99
100 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
101 if err != nil {
102 return err
103 }
104
105 aux := struct {
106 Version uint
107 Excerpts map[entity.Id]ExcerptT
108 }{}
109
110 decoder := gob.NewDecoder(f)
111 err = decoder.Decode(&aux)
112 if err != nil {
113 _ = f.Close()
114 return err
115 }
116
117 err = f.Close()
118 if err != nil {
119 return err
120 }
121
122 if aux.Version != sc.version {
123 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
124 }
125
126 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
127 // so we fix it here, which doubles as enforcing coherency.
128 for id, excerpt := range aux.Excerpts {
129 excerpt.setId(id)
130 }
131
132 sc.excerpts = aux.Excerpts
133
134 index, err := sc.repo.GetIndex(sc.namespace)
135 if err != nil {
136 return err
137 }
138
139 // simple heuristic to detect a mismatch between the index and the entities
140 count, err := index.DocCount()
141 if err != nil {
142 return err
143 }
144 if count != uint64(len(sc.excerpts)) {
145 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
146 }
147
148 // TODO: find a way to check lamport clocks
149
150 return nil
151}
152
153// Write will serialize on disk the entity cache file
154func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
155 sc.mu.RLock()
156 defer sc.mu.RUnlock()
157
158 var data bytes.Buffer
159
160 aux := struct {
161 Version uint
162 Excerpts map[entity.Id]ExcerptT
163 }{
164 Version: sc.version,
165 Excerpts: sc.excerpts,
166 }
167
168 encoder := gob.NewEncoder(&data)
169
170 err := encoder.Encode(aux)
171 if err != nil {
172 return err
173 }
174
175 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
176 if err != nil {
177 return err
178 }
179
180 _, err = f.Write(data.Bytes())
181 if err != nil {
182 _ = f.Close()
183 return err
184 }
185
186 return f.Close()
187}
188
189func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
190 out := make(chan BuildEvent)
191
192 go func() {
193 defer close(out)
194
195 out <- BuildEvent{
196 Typename: sc.typename,
197 Event: BuildEventStarted,
198 }
199
200 sc.excerpts = make(map[entity.Id]ExcerptT)
201
202 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
203
204 index, err := sc.repo.GetIndex(sc.namespace)
205 if err != nil {
206 out <- BuildEvent{
207 Typename: sc.typename,
208 Err: err,
209 }
210 return
211 }
212
213 // wipe the index just to be sure
214 err = index.Clear()
215 if err != nil {
216 out <- BuildEvent{
217 Typename: sc.typename,
218 Err: err,
219 }
220 return
221 }
222
223 indexer, indexEnd := index.IndexBatch()
224
225 for e := range allEntities {
226 if e.Err != nil {
227 out <- BuildEvent{
228 Typename: sc.typename,
229 Err: e.Err,
230 }
231 return
232 }
233
234 cached := sc.makeCached(e.Entity, sc.entityUpdated)
235 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
236 // might as well keep them in memory
237 sc.cached[e.Entity.Id()] = cached
238
239 indexData := sc.makeIndexData(cached)
240 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
241 out <- BuildEvent{
242 Typename: sc.typename,
243 Err: err,
244 }
245 return
246 }
247
248 out <- BuildEvent{
249 Typename: sc.typename,
250 Event: BuildEventProgress,
251 Progress: e.CurrentEntity,
252 Total: e.TotalEntities,
253 }
254 }
255
256 err = indexEnd()
257 if err != nil {
258 out <- BuildEvent{
259 Typename: sc.typename,
260 Err: err,
261 }
262 return
263 }
264
265 err = sc.write()
266 if err != nil {
267 out <- BuildEvent{
268 Typename: sc.typename,
269 Err: err,
270 }
271 return
272 }
273
274 out <- BuildEvent{
275 Typename: sc.typename,
276 Event: BuildEventFinished,
277 }
278 }()
279
280 return out
281}
282
283func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
284 sc.maxLoaded = size
285 sc.evictIfNeeded()
286}
287
288func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
289 sc.mu.Lock()
290 defer sc.mu.Unlock()
291 sc.excerpts = nil
292 sc.cached = make(map[entity.Id]CacheT)
293 return nil
294}
295
296// AllIds return all known bug ids
297func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
298 sc.mu.RLock()
299 defer sc.mu.RUnlock()
300
301 result := make([]entity.Id, len(sc.excerpts))
302
303 i := 0
304 for _, excerpt := range sc.excerpts {
305 result[i] = excerpt.Id()
306 i++
307 }
308
309 return result
310}
311
312// Resolve retrieve an entity matching the exact given id
313func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
314 sc.mu.RLock()
315 cached, ok := sc.cached[id]
316 if ok {
317 sc.lru.Get(id)
318 sc.mu.RUnlock()
319 return cached, nil
320 }
321 sc.mu.RUnlock()
322
323 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
324 if err != nil {
325 return *new(CacheT), err
326 }
327
328 cached = sc.makeCached(e, sc.entityUpdated)
329
330 sc.mu.Lock()
331 sc.cached[id] = cached
332 sc.lru.Add(id)
333 sc.mu.Unlock()
334
335 sc.evictIfNeeded()
336
337 return cached, nil
338}
339
340// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
341// entity match.
342func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
343 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
344 return excerpt.Id().HasPrefix(prefix)
345 })
346}
347
348func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
349 id, err := sc.resolveMatcher(f)
350 if err != nil {
351 return *new(CacheT), err
352 }
353 return sc.Resolve(id)
354}
355
356// ResolveExcerpt retrieve an Excerpt matching the exact given id
357func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
358 sc.mu.RLock()
359 defer sc.mu.RUnlock()
360
361 excerpt, ok := sc.excerpts[id]
362 if !ok {
363 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
364 }
365
366 return excerpt, nil
367}
368
369// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
370// entity match.
371func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
372 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
373 return excerpt.Id().HasPrefix(prefix)
374 })
375}
376
377func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
378 id, err := sc.resolveMatcher(f)
379 if err != nil {
380 return *new(ExcerptT), err
381 }
382 return sc.ResolveExcerpt(id)
383}
384
385func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
386 sc.mu.RLock()
387 defer sc.mu.RUnlock()
388
389 // preallocate but empty
390 matching := make([]entity.Id, 0, 5)
391
392 for _, excerpt := range sc.excerpts {
393 if f(excerpt) {
394 matching = append(matching, excerpt.Id())
395 }
396 }
397
398 if len(matching) > 1 {
399 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
400 }
401
402 if len(matching) == 0 {
403 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
404 }
405
406 return matching[0], nil
407}
408
409func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
410 sc.mu.Lock()
411 if _, has := sc.cached[e.Id()]; has {
412 sc.mu.Unlock()
413 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
414 }
415
416 cached := sc.makeCached(e, sc.entityUpdated)
417 sc.cached[e.Id()] = cached
418 sc.lru.Add(e.Id())
419 sc.mu.Unlock()
420
421 sc.evictIfNeeded()
422
423 // force the write of the excerpt
424 err := sc.entityUpdated(e.Id())
425 if err != nil {
426 return *new(CacheT), err
427 }
428
429 return cached, nil
430}
431
432func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
433 e, err := sc.ResolvePrefix(prefix)
434 if err != nil {
435 return err
436 }
437
438 sc.mu.Lock()
439
440 err = sc.actions.Remove(sc.repo, e.Id())
441 if err != nil {
442 sc.mu.Unlock()
443 return err
444 }
445
446 delete(sc.cached, e.Id())
447 delete(sc.excerpts, e.Id())
448 sc.lru.Remove(e.Id())
449
450 index, err := sc.repo.GetIndex(sc.namespace)
451 if err != nil {
452 sc.mu.Unlock()
453 return err
454 }
455
456 err = index.Remove(e.Id().String())
457 sc.mu.Unlock()
458 if err != nil {
459 return err
460 }
461
462 return sc.write()
463}
464
465func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error {
466 sc.mu.Lock()
467
468 err := sc.actions.RemoveAll(sc.repo)
469 if err != nil {
470 sc.mu.Unlock()
471 return err
472 }
473
474 for id, _ := range sc.cached {
475 delete(sc.cached, id)
476 sc.lru.Remove(id)
477 }
478 for id, _ := range sc.excerpts {
479 delete(sc.excerpts, id)
480 }
481
482 index, err := sc.repo.GetIndex(sc.namespace)
483 if err != nil {
484 sc.mu.Unlock()
485 return err
486 }
487
488 err = index.Clear()
489 sc.mu.Unlock()
490 if err != nil {
491 return err
492 }
493
494 return sc.write()
495}
496
497func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
498 out := make(chan entity.MergeResult)
499
500 // Intercept merge results to update the cache properly
501 go func() {
502 defer close(out)
503
504 author, err := sc.getUserIdentity()
505 if err != nil {
506 out <- entity.NewMergeError(err, "")
507 return
508 }
509
510 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
511 for result := range results {
512 out <- result
513
514 if result.Err != nil {
515 continue
516 }
517
518 switch result.Status {
519 case entity.MergeStatusNew, entity.MergeStatusUpdated:
520 e := result.Entity.(EntityT)
521 cached := sc.makeCached(e, sc.entityUpdated)
522
523 sc.mu.Lock()
524 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
525 // might as well keep them in memory
526 sc.cached[result.Id] = cached
527 sc.mu.Unlock()
528 }
529 }
530
531 err = sc.write()
532 if err != nil {
533 out <- entity.NewMergeError(err, "")
534 return
535 }
536 }()
537
538 return out
539
540}
541
542func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
543 return sc.namespace
544}
545
546// entityUpdated is a callback to trigger when the excerpt of an entity changed
547func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
548 sc.mu.Lock()
549 e, ok := sc.cached[id]
550 if !ok {
551 sc.mu.Unlock()
552
553 // if the bug is not loaded at this point, it means it was loaded before
554 // but got evicted. Which means we potentially have multiple copies in
555 // memory and thus concurrent write.
556 // Failing immediately here is the simple and safe solution to avoid
557 // complicated data loss.
558 return errors.New("entity missing from cache")
559 }
560 sc.lru.Get(id)
561 // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
562 sc.excerpts[id] = sc.makeExcerpt(e)
563 sc.mu.Unlock()
564
565 index, err := sc.repo.GetIndex(sc.namespace)
566 if err != nil {
567 return err
568 }
569
570 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
571 if err != nil {
572 return err
573 }
574
575 return sc.write()
576}
577
578// evictIfNeeded will evict an entity from the cache if needed
579func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
580 sc.mu.Lock()
581 defer sc.mu.Unlock()
582 if sc.lru.Len() <= sc.maxLoaded {
583 return
584 }
585
586 for _, id := range sc.lru.GetOldestToNewest() {
587 b := sc.cached[id]
588 if b.NeedCommit() {
589 continue
590 }
591
592 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
593 // if something try to do it anyway, it will lock the program and make it obvious.
594 b.Lock()
595
596 sc.lru.Remove(id)
597 delete(sc.cached, id)
598
599 if sc.lru.Len() <= sc.maxLoaded {
600 return
601 }
602 }
603}