1package cache
2
3import (
4 "bytes"
5 "encoding/gob"
6 "fmt"
7 "path/filepath"
8 "sync"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 "github.com/MichaelMure/git-bug/repository"
15)
16
17type Excerpt interface {
18 Id() entity.Id
19 setId(id entity.Id)
20}
21
22type CacheEntity interface {
23 Id() entity.Id
24 NeedCommit() bool
25 Lock()
26}
27
28type getUserIdentityFunc func() (*IdentityCache, error)
29
30// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
31// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
32// functions from this package, but right now identities are not using that framework.
33type Actions[EntityT entity.Interface] struct {
34 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
35 ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
36 Remove func(repo repository.ClockedRepo, id entity.Id) error
37 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
38}
39
40var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
41
42type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
43 repo repository.ClockedRepo
44 resolvers func() entity.Resolvers
45
46 getUserIdentity getUserIdentityFunc
47 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
48 makeExcerpt func(CacheT) ExcerptT
49 makeIndexData func(CacheT) []string
50 actions Actions[EntityT]
51
52 typename string
53 namespace string
54 version uint
55 maxLoaded int
56
57 mu sync.RWMutex
58 excerpts map[entity.Id]ExcerptT
59 cached map[entity.Id]CacheT
60 lru *lruIdCache
61}
62
63func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity](
64 repo repository.ClockedRepo,
65 resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
66 makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
67 makeExcerpt func(CacheT) ExcerptT,
68 makeIndexData func(CacheT) []string,
69 actions Actions[EntityT],
70 typename, namespace string,
71 version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
72 return &SubCache[EntityT, ExcerptT, CacheT]{
73 repo: repo,
74 resolvers: resolvers,
75 getUserIdentity: getUserIdentity,
76 makeCached: makeCached,
77 makeExcerpt: makeExcerpt,
78 makeIndexData: makeIndexData,
79 actions: actions,
80 typename: typename,
81 namespace: namespace,
82 version: version,
83 maxLoaded: maxLoaded,
84 excerpts: make(map[entity.Id]ExcerptT),
85 cached: make(map[entity.Id]CacheT),
86 lru: newLRUIdCache(),
87 }
88}
89
90func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string {
91 return sc.typename
92}
93
94// Load will try to read from the disk the entity cache file
95func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
96 sc.mu.Lock()
97 defer sc.mu.Unlock()
98
99 f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace))
100 if err != nil {
101 return err
102 }
103
104 aux := struct {
105 Version uint
106 Excerpts map[entity.Id]ExcerptT
107 }{}
108
109 decoder := gob.NewDecoder(f)
110 err = decoder.Decode(&aux)
111 if err != nil {
112 _ = f.Close()
113 return err
114 }
115
116 err = f.Close()
117 if err != nil {
118 return err
119 }
120
121 if aux.Version != sc.version {
122 return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
123 }
124
125 // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
126 // so we fix it here, which doubles as enforcing coherency.
127 for id, excerpt := range aux.Excerpts {
128 excerpt.setId(id)
129 }
130
131 sc.excerpts = aux.Excerpts
132
133 index, err := sc.repo.GetIndex(sc.namespace)
134 if err != nil {
135 return err
136 }
137
138 // simple heuristic to detect a mismatch between the index and the entities
139 count, err := index.DocCount()
140 if err != nil {
141 return err
142 }
143 if count != uint64(len(sc.excerpts)) {
144 return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace)
145 }
146
147 // TODO: find a way to check lamport clocks
148
149 return nil
150}
151
152// Write will serialize on disk the entity cache file
153func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
154 sc.mu.RLock()
155 defer sc.mu.RUnlock()
156
157 var data bytes.Buffer
158
159 aux := struct {
160 Version uint
161 Excerpts map[entity.Id]ExcerptT
162 }{
163 Version: sc.version,
164 Excerpts: sc.excerpts,
165 }
166
167 encoder := gob.NewEncoder(&data)
168
169 err := encoder.Encode(aux)
170 if err != nil {
171 return err
172 }
173
174 f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace))
175 if err != nil {
176 return err
177 }
178
179 _, err = f.Write(data.Bytes())
180 if err != nil {
181 _ = f.Close()
182 return err
183 }
184
185 return f.Close()
186}
187
188func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
189 sc.excerpts = make(map[entity.Id]ExcerptT)
190
191 allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
192
193 index, err := sc.repo.GetIndex(sc.namespace)
194 if err != nil {
195 return err
196 }
197
198 // wipe the index just to be sure
199 err = index.Clear()
200 if err != nil {
201 return err
202 }
203
204 indexer, indexEnd := index.IndexBatch()
205
206 for e := range allEntities {
207 if e.Err != nil {
208 return e.Err
209 }
210
211 cached := sc.makeCached(e.Entity, sc.entityUpdated)
212 sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
213 // might as well keep them in memory
214 sc.cached[e.Entity.Id()] = cached
215
216 indexData := sc.makeIndexData(cached)
217 if err := indexer(e.Entity.Id().String(), indexData); err != nil {
218 return err
219 }
220 }
221
222 err = indexEnd()
223 if err != nil {
224 return err
225 }
226
227 err = sc.write()
228 if err != nil {
229 return err
230 }
231
232 return nil
233}
234
235func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
236 sc.maxLoaded = size
237 sc.evictIfNeeded()
238}
239
240func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
241 sc.mu.Lock()
242 defer sc.mu.Unlock()
243 sc.excerpts = nil
244 sc.cached = make(map[entity.Id]CacheT)
245 return nil
246}
247
248// AllIds return all known bug ids
249func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id {
250 sc.mu.RLock()
251 defer sc.mu.RUnlock()
252
253 result := make([]entity.Id, len(sc.excerpts))
254
255 i := 0
256 for _, excerpt := range sc.excerpts {
257 result[i] = excerpt.Id()
258 i++
259 }
260
261 return result
262}
263
264// Resolve retrieve an entity matching the exact given id
265func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) {
266 sc.mu.RLock()
267 cached, ok := sc.cached[id]
268 if ok {
269 sc.lru.Get(id)
270 sc.mu.RUnlock()
271 return cached, nil
272 }
273 sc.mu.RUnlock()
274
275 e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
276 if err != nil {
277 return *new(CacheT), err
278 }
279
280 cached = sc.makeCached(e, sc.entityUpdated)
281
282 sc.mu.Lock()
283 sc.cached[id] = cached
284 sc.lru.Add(id)
285 sc.mu.Unlock()
286
287 sc.evictIfNeeded()
288
289 return cached, nil
290}
291
292// ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple
293// entity match.
294func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) {
295 return sc.ResolveMatcher(func(excerpt ExcerptT) bool {
296 return excerpt.Id().HasPrefix(prefix)
297 })
298}
299
300func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
301 id, err := sc.resolveMatcher(f)
302 if err != nil {
303 return *new(CacheT), err
304 }
305 return sc.Resolve(id)
306}
307
308// ResolveExcerpt retrieve an Excerpt matching the exact given id
309func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) {
310 sc.mu.RLock()
311 defer sc.mu.RUnlock()
312
313 excerpt, ok := sc.excerpts[id]
314 if !ok {
315 return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
316 }
317
318 return excerpt, nil
319}
320
321// ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple
322// entity match.
323func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) {
324 return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool {
325 return excerpt.Id().HasPrefix(prefix)
326 })
327}
328
329func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
330 id, err := sc.resolveMatcher(f)
331 if err != nil {
332 return *new(ExcerptT), err
333 }
334 return sc.ResolveExcerpt(id)
335}
336
337func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) {
338 sc.mu.RLock()
339 defer sc.mu.RUnlock()
340
341 // preallocate but empty
342 matching := make([]entity.Id, 0, 5)
343
344 for _, excerpt := range sc.excerpts {
345 if f(excerpt) {
346 matching = append(matching, excerpt.Id())
347 }
348 }
349
350 if len(matching) > 1 {
351 return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching)
352 }
353
354 if len(matching) == 0 {
355 return entity.UnsetId, entity.NewErrNotFound(sc.typename)
356 }
357
358 return matching[0], nil
359}
360
361func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
362 sc.mu.Lock()
363 if _, has := sc.cached[e.Id()]; has {
364 sc.mu.Unlock()
365 return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
366 }
367
368 cached := sc.makeCached(e, sc.entityUpdated)
369 sc.cached[e.Id()] = cached
370 sc.lru.Add(e.Id())
371 sc.mu.Unlock()
372
373 sc.evictIfNeeded()
374
375 // force the write of the excerpt
376 err := sc.entityUpdated(e.Id())
377 if err != nil {
378 return *new(CacheT), err
379 }
380
381 return cached, nil
382}
383
384func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
385 e, err := sc.ResolvePrefix(prefix)
386 if err != nil {
387 return err
388 }
389
390 sc.mu.Lock()
391
392 err = sc.actions.Remove(sc.repo, e.Id())
393 if err != nil {
394 sc.mu.Unlock()
395 return err
396 }
397
398 delete(sc.cached, e.Id())
399 delete(sc.excerpts, e.Id())
400 sc.lru.Remove(e.Id())
401
402 sc.mu.Unlock()
403
404 return sc.write()
405}
406
407func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
408 out := make(chan entity.MergeResult)
409
410 // Intercept merge results to update the cache properly
411 go func() {
412 defer close(out)
413
414 author, err := sc.getUserIdentity()
415 if err != nil {
416 out <- entity.NewMergeError(err, "")
417 return
418 }
419
420 results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
421 for result := range results {
422 out <- result
423
424 if result.Err != nil {
425 continue
426 }
427
428 switch result.Status {
429 case entity.MergeStatusNew, entity.MergeStatusUpdated:
430 e := result.Entity.(EntityT)
431 cached := sc.makeCached(e, sc.entityUpdated)
432
433 sc.mu.Lock()
434 sc.excerpts[result.Id] = sc.makeExcerpt(cached)
435 // might as well keep them in memory
436 sc.cached[result.Id] = cached
437 sc.mu.Unlock()
438 }
439 }
440
441 err = sc.write()
442 if err != nil {
443 out <- entity.NewMergeError(err, "")
444 return
445 }
446 }()
447
448 return out
449
450}
451
452func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
453 return sc.namespace
454}
455
456// entityUpdated is a callback to trigger when the excerpt of an entity changed
457func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
458 sc.mu.Lock()
459 e, ok := sc.cached[id]
460 if !ok {
461 sc.mu.Unlock()
462
463 // if the bug is not loaded at this point, it means it was loaded before
464 // but got evicted. Which means we potentially have multiple copies in
465 // memory and thus concurrent write.
466 // Failing immediately here is the simple and safe solution to avoid
467 // complicated data loss.
468 return errors.New("entity missing from cache")
469 }
470 sc.lru.Get(id)
471 // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
472 sc.excerpts[id] = sc.makeExcerpt(e)
473 sc.mu.Unlock()
474
475 index, err := sc.repo.GetIndex(sc.namespace)
476 if err != nil {
477 return err
478 }
479
480 err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
481 if err != nil {
482 return err
483 }
484
485 return sc.write()
486}
487
488// evictIfNeeded will evict an entity from the cache if needed
489func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
490 sc.mu.Lock()
491 defer sc.mu.Unlock()
492 if sc.lru.Len() <= sc.maxLoaded {
493 return
494 }
495
496 for _, id := range sc.lru.GetOldestToNewest() {
497 b := sc.cached[id]
498 if b.NeedCommit() {
499 continue
500 }
501
502 // as a form of assurance that evicted entities don't get manipulated, we lock them here.
503 // if something try to do it anyway, it will lock the program and make it obvious.
504 b.Lock()
505
506 sc.lru.Remove(id)
507 delete(sc.cached, id)
508
509 if sc.lru.Len() <= sc.maxLoaded {
510 return
511 }
512 }
513}