1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 "github.com/MichaelMure/git-bug/repository"
15)
16
17type Excerpt interface {
18 Id() entity.Id
19 setId(id entity.Id)
20}
21
22type CacheEntity interface {
23 Id() entity.Id
24 NeedCommit() bool
25 Lock()
26}
27
28type getUserIdentityFunc func() (*IdentityCache, error)
29
30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
32// functions from this package, but right now identities are not using that framework.
33type Actions[EntityT entity.Interface] struct {
34 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
35 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
36 Remove func(repo repository.ClockedRepo, id entity.Id) error
37 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
38}
39
40var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
41
42type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
43 repo repository.ClockedRepo
44 resolvers func() entity.Resolvers
45
46 getUserIdentity getUserIdentityFunc
47 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
48 makeExcerpt func(CacheT) ExcerptT
49 makeIndexData func(CacheT) []string
50 actions Actions[EntityT]
51
52 typename string
53 namespace string
54 version uint
55 maxLoaded int
56
57 mu sync.RWMutex
58 excerpts map[entity.Id]ExcerptT
59 cached map[entity.Id]CacheT
60 lru *lruIdCache
61}
62
63func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
64 repo repository.ClockedRepo,
65 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
66 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
67 makeExcerpt func(CacheT) ExcerptT,
68 makeIndexData func(CacheT) []string,
69 actions Actions[EntityT],
70 typename, namespace string,
71 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
72 return &SubCache[EntityT, ExcerptT, CacheT]{
73 repo: repo,
74 resolvers: resolvers,
75 getUserIdentity: getUserIdentity,
76 makeCached: makeCached,
77 makeExcerpt: makeExcerpt,
78 makeIndexData: makeIndexData,
79 actions: actions,
80 typename: typename,
81 namespace: namespace,
82 version: version,
83 maxLoaded: maxLoaded,
84 excerpts: make(map[entity.Id]ExcerptT),
85 cached: make(map[entity.Id]CacheT),
86 lru: newLRUIdCache(),
87 }
88}
89
90func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
91 return sc.typename
92}
93
94// Load will try to read from the disk the entity cache file
95func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
96 sc.mu.Lock()
97 defer sc.mu.Unlock()
98
99 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
100 if err != nil {
101 return err
102 }
103
104 aux := struct {
105 Version uint
106 Excerpts map[entity.Id]ExcerptT
107 }{}
108
109 decoder := gob.NewDecoder(f)
110 err = decoder.Decode(&aux)
111 if err != nil {
112 _ = f.Close()
113 return err
114 }
115
116 err = f.Close()
117 if err != nil {
118 return err
119 }
120
121 if aux.Version != sc.version {
122 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
123 }
124
125 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
126 // so we fix it here, which doubles as enforcing coherency.
127 for id, excerpt := range aux.Excerpts {
128 excerpt.setId(id)
129 }
130
131 sc.excerpts = aux.Excerpts
132
133 index, err := sc.repo.GetIndex(sc.namespace)
134 if err != nil {
135 return err
136 }
137
138 // simple heuristic to detect a mismatch between the index and the entities
139 count, err := index.DocCount()
140 if err != nil {
141 return err
142 }
143 if count != uint64(len(sc.excerpts)) {
144 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
145 }
146
147 // TODO: find a way to check lamport clocks
148
149 return nil
150}
151
152// Write will serialize on disk the entity cache file
153func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
154 sc.mu.RLock()
155 defer sc.mu.RUnlock()
156
157 var data bytes.Buffer
158
159 aux := struct {
160 Version uint
161 Excerpts map[entity.Id]ExcerptT
162 }{
163 Version: sc.version,
164 Excerpts: sc.excerpts,
165 }
166
167 encoder := gob.NewEncoder(&data)
168
169 err := encoder.Encode(aux)
170 if err != nil {
171 return err
172 }
173
174 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
175 if err != nil {
176 return err
177 }
178
179 _, err = f.Write(data.Bytes())
180 if err != nil {
181 _ = f.Close()
182 return err
183 }
184
185 return f.Close()
186}
187
188func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent {
189 out := make(chan BuildEvent)
190
191 go func() {
192 defer close(out)
193
194 out <- BuildEvent{
195 Typename: sc.typename,
196 Event: BuildEventStarted,
197 }
198
199 sc.excerpts = make(map[entity.Id]ExcerptT)
200
201 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
202
203 index, err := sc.repo.GetIndex(sc.namespace)
204 if err != nil {
205 out <- BuildEvent{
206 Typename: sc.typename,
207 Err: err,
208 }
209 return
210 }
211
212 // wipe the index just to be sure
213 err = index.Clear()
214 if err != nil {
215 out <- BuildEvent{
216 Typename: sc.typename,
217 Err: err,
218 }
219 return
220 }
221
222 indexer, indexEnd := index.IndexBatch()
223
224 for e := range allEntities {
225 if e.Err != nil {
226 out <- BuildEvent{
227 Typename: sc.typename,
228 Err: e.Err,
229 }
230 return
231 }
232
233 cached := sc.makeCached(e.Entity, sc.entityUpdated)
234 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
235 // might as well keep them in memory
236 sc.cached[e.Entity.Id()] = cached
237
238 indexData := sc.makeIndexData(cached)
239 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
240 out <- BuildEvent{
241 Typename: sc.typename,
242 Err: err,
243 }
244 return
245 }
246
247 out <- BuildEvent{
248 Typename: sc.typename,
249 Event: BuildEventProgress,
250 Progress: e.CurrentEntity,
251 Total: e.TotalEntities,
252 }
253 }
254
255 err = indexEnd()
256 if err != nil {
257 out <- BuildEvent{
258 Typename: sc.typename,
259 Err: err,
260 }
261 return
262 }
263
264 err = sc.write()
265 if err != nil {
266 out <- BuildEvent{
267 Typename: sc.typename,
268 Err: err,
269 }
270 return
271 }
272
273 out <- BuildEvent{
274 Typename: sc.typename,
275 Event: BuildEventFinished,
276 }
277 }()
278
279 return out
280}
281
282func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
283 sc.maxLoaded = size
284 sc.evictIfNeeded()
285}
286
287func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
288 sc.mu.Lock()
289 defer sc.mu.Unlock()
290 sc.excerpts = nil
291 sc.cached = make(map[entity.Id]CacheT)
292 return nil
293}
294
295// AllIds return all known bug ids
296func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
297 sc.mu.RLock()
298 defer sc.mu.RUnlock()
299
300 result := make([]entity.Id, len(sc.excerpts))
301
302 i := 0
303 for _, excerpt := range sc.excerpts {
304 result[i] = excerpt.Id()
305 i++
306 }
307
308 return result
309}
310
311// Resolve retrieve an entity matching the exact given id
312func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
313 sc.mu.RLock()
314 cached, ok := sc.cached[id]
315 if ok {
316 sc.lru.Get(id)
317 sc.mu.RUnlock()
318 return cached, nil
319 }
320 sc.mu.RUnlock()
321
322 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
323 if err != nil {
324 return *new(CacheT), err
325 }
326
327 cached = sc.makeCached(e, sc.entityUpdated)
328
329 sc.mu.Lock()
330 sc.cached[id] = cached
331 sc.lru.Add(id)
332 sc.mu.Unlock()
333
334 sc.evictIfNeeded()
335
336 return cached, nil
337}
338
339// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
340// entity match.
341func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
342 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
343 return excerpt.Id().HasPrefix(prefix)
344 })
345}
346
347func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
348 id, err := sc.resolveMatcher(f)
349 if err != nil {
350 return *new(CacheT), err
351 }
352 return sc.Resolve(id)
353}
354
355// ResolveExcerpt retrieve an Excerpt matching the exact given id
356func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
357 sc.mu.RLock()
358 defer sc.mu.RUnlock()
359
360 excerpt, ok := sc.excerpts[id]
361 if !ok {
362 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
363 }
364
365 return excerpt, nil
366}
367
368// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
369// entity match.
370func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
371 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
372 return excerpt.Id().HasPrefix(prefix)
373 })
374}
375
376func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
377 id, err := sc.resolveMatcher(f)
378 if err != nil {
379 return *new(ExcerptT), err
380 }
381 return sc.ResolveExcerpt(id)
382}
383
384func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
385 sc.mu.RLock()
386 defer sc.mu.RUnlock()
387
388 // preallocate but empty
389 matching := make([]entity.Id, 0, 5)
390
391 for _, excerpt := range sc.excerpts {
392 if f(excerpt) {
393 matching = append(matching, excerpt.Id())
394 }
395 }
396
397 if len(matching) > 1 {
398 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
399 }
400
401 if len(matching) == 0 {
402 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
403 }
404
405 return matching[0], nil
406}
407
408func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
409 sc.mu.Lock()
410 if _, has := sc.cached[e.Id()]; has {
411 sc.mu.Unlock()
412 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
413 }
414
415 cached := sc.makeCached(e, sc.entityUpdated)
416 sc.cached[e.Id()] = cached
417 sc.lru.Add(e.Id())
418 sc.mu.Unlock()
419
420 sc.evictIfNeeded()
421
422 // force the write of the excerpt
423 err := sc.entityUpdated(e.Id())
424 if err != nil {
425 return *new(CacheT), err
426 }
427
428 return cached, nil
429}
430
431func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
432 e, err := sc.ResolvePrefix(prefix)
433 if err != nil {
434 return err
435 }
436
437 sc.mu.Lock()
438
439 err = sc.actions.Remove(sc.repo, e.Id())
440 if err != nil {
441 sc.mu.Unlock()
442 return err
443 }
444
445 delete(sc.cached, e.Id())
446 delete(sc.excerpts, e.Id())
447 sc.lru.Remove(e.Id())
448
449 sc.mu.Unlock()
450
451 return sc.write()
452}
453
454func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
455 out := make(chan entity.MergeResult)
456
457 // Intercept merge results to update the cache properly
458 go func() {
459 defer close(out)
460
461 author, err := sc.getUserIdentity()
462 if err != nil {
463 out <- entity.NewMergeError(err, "")
464 return
465 }
466
467 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
468 for result := range results {
469 out <- result
470
471 if result.Err != nil {
472 continue
473 }
474
475 switch result.Status {
476 case entity.MergeStatusNew, entity.MergeStatusUpdated:
477 e := result.Entity.(EntityT)
478 cached := sc.makeCached(e, sc.entityUpdated)
479
480 sc.mu.Lock()
481 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
482 // might as well keep them in memory
483 sc.cached[result.Id] = cached
484 sc.mu.Unlock()
485 }
486 }
487
488 err = sc.write()
489 if err != nil {
490 out <- entity.NewMergeError(err, "")
491 return
492 }
493 }()
494
495 return out
496
497}
498
499func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
500 return sc.namespace
501}
502
503// entityUpdated is a callback to trigger when the excerpt of an entity changed
504func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
505 sc.mu.Lock()
506 e, ok := sc.cached[id]
507 if !ok {
508 sc.mu.Unlock()
509
510 // if the bug is not loaded at this point, it means it was loaded before
511 // but got evicted. Which means we potentially have multiple copies in
512 // memory and thus concurrent write.
513 // Failing immediately here is the simple and safe solution to avoid
514 // complicated data loss.
515 return errors.New("entity missing from cache")
516 }
517 sc.lru.Get(id)
518 // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
519 sc.excerpts[id] = sc.makeExcerpt(e)
520 sc.mu.Unlock()
521
522 index, err := sc.repo.GetIndex(sc.namespace)
523 if err != nil {
524 return err
525 }
526
527 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
528 if err != nil {
529 return err
530 }
531
532 return sc.write()
533}
534
535// evictIfNeeded will evict an entity from the cache if needed
536func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
537 sc.mu.Lock()
538 defer sc.mu.Unlock()
539 if sc.lru.Len() <= sc.maxLoaded {
540 return
541 }
542
543 for _, id := range sc.lru.GetOldestToNewest() {
544 b := sc.cached[id]
545 if b.NeedCommit() {
546 continue
547 }
548
549 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
550 // if something try to do it anyway, it will lock the program and make it obvious.
551 b.Lock()
552
553 sc.lru.Remove(id)
554 delete(sc.cached, id)
555
556 if sc.lru.Len() <= sc.maxLoaded {
557 return
558 }
559 }
560}