WIP

Michael Muré created

Change summary

cache/bug_subcache.go         |   2 
cache/repo_cache.go           |  89 +++++++++++++++++++++++-----
cache/subcache.go             |  50 +++++++++++----
util/multierr/errwaitgroup.go | 115 +++++++++++++++++++++++++++++++++++++
util/multierr/join.go         |  51 ++++++++++++++++
5 files changed, 277 insertions(+), 30 deletions(-)

Detailed changes

cache/bug_subcache.go 🔗

@@ -18,7 +18,7 @@ import (
 )
 
 type RepoCacheBug struct {
-	SubCache[*BugExcerpt, *BugCache, bug.Interface]
+	*SubCache[*BugExcerpt, *BugCache, bug.Interface]
 }
 
 // ResolveBugCreateMetadata retrieve a bug that has the exact given metadata on

cache/repo_cache.go 🔗

@@ -6,13 +6,13 @@ import (
 	"io/ioutil"
 	"os"
 	"strconv"
-
-	"golang.org/x/sync/errgroup"
+	"sync"
 
 	"github.com/MichaelMure/git-bug/entities/bug"
 	"github.com/MichaelMure/git-bug/entities/identity"
 	"github.com/MichaelMure/git-bug/entity"
 	"github.com/MichaelMure/git-bug/repository"
+	"github.com/MichaelMure/git-bug/util/multierr"
 	"github.com/MichaelMure/git-bug/util/process"
 )
 
@@ -29,6 +29,14 @@ var _ repository.RepoCommon = &RepoCache{}
 var _ repository.RepoConfig = &RepoCache{}
 var _ repository.RepoKeyring = &RepoCache{}
 
+type cacheMgmt interface {
+	Typename() string
+	Load() error
+	Write() error
+	Build() error
+	Close() error
+}
+
 // RepoCache is a cache for a Repository. This cache has multiple functions:
 //
 // 1. After being loaded, a Bug is kept in memory in the cache, allowing for fast
@@ -62,11 +70,11 @@ type RepoCache struct {
 	userIdentityId entity.Id
 }
 
-func NewRepoCache(r repository.ClockedRepo) (*RepoCache, error) {
+func NewRepoCache(r repository.ClockedRepo) (*RepoCache, chan BuildEvent, error) {
 	return NewNamedRepoCache(r, "")
 }
 
-func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, error) {
+func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, chan BuildEvent, error) {
 	c := &RepoCache{
 		repo: r,
 		name: name,
@@ -87,12 +95,12 @@ func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, error
 
 	err := c.lock()
 	if err != nil {
-		return &RepoCache{}, err
+		return &RepoCache{}, nil, err
 	}
 
 	err = c.load()
 	if err == nil {
-		return c, nil
+		return c, nil, nil
 	}
 
 	// Cache is either missing, broken or outdated. Rebuilding.
@@ -126,20 +134,20 @@ func (c *RepoCache) setCacheSize(size int) {
 
 // load will try to read from the disk all the cache files
 func (c *RepoCache) load() error {
-	var errG errgroup.Group
+	var errWait multierr.ErrWaitGroup
 	for _, mgmt := range c.subcaches {
-		errG.Go(mgmt.Load)
+		errWait.Go(mgmt.Load)
 	}
-	return errG.Wait()
+	return errWait.Wait()
 }
 
 // write will serialize on disk all the cache files
 func (c *RepoCache) write() error {
-	var errG errgroup.Group
+	var errWait multierr.ErrWaitGroup
 	for _, mgmt := range c.subcaches {
-		errG.Go(mgmt.Write)
+		errWait.Go(mgmt.Write)
 	}
-	return errG.Wait()
+	return errWait.Wait()
 }
 
 func (c *RepoCache) lock() error {
@@ -163,11 +171,11 @@ func (c *RepoCache) lock() error {
 }
 
 func (c *RepoCache) Close() error {
-	var errG errgroup.Group
+	var errWait multierr.ErrWaitGroup
 	for _, mgmt := range c.subcaches {
-		errG.Go(mgmt.Close)
+		errWait.Go(mgmt.Close)
 	}
-	err := errG.Wait()
+	err := errWait.Wait()
 	if err != nil {
 		return err
 	}
@@ -180,7 +188,56 @@ func (c *RepoCache) Close() error {
 	return c.repo.LocalStorage().Remove(lockfile)
 }
 
-func (c *RepoCache) buildCache() error {
+type BuildEventType int
+
+const (
+	_ BuildEventType = iota
+	BuildEventStarted
+	BuildEventFinished
+)
+
+type BuildEvent struct {
+	Typename string
+	Event BuildEventType
+	Err error
+}
+
+func (c *RepoCache) buildCache() chan BuildEvent {
+	out := make(chan BuildEvent)
+
+	go func() {
+		defer close(out)
+
+		var wg sync.WaitGroup
+		for _, subcache := range c.subcaches {
+			wg.Add(1)
+			go func(subcache cacheMgmt) {
+				defer wg.Done()
+				out <- BuildEvent{
+					Typename: subcache.Typename(),
+					Event:    BuildEventStarted,
+				}
+
+				err := subcache.Build()
+				if err != nil {
+					out <- BuildEvent{
+						Typename: subcache.Typename(),
+						Err: err,
+					}
+					return
+				}
+
+				out <- BuildEvent{
+					Typename: subcache.Typename(),
+					Event: BuildEventFinished,
+				}
+			}(subcache)
+		}
+		wg.Wait()
+	}()
+
+	return out
+
 	_, _ = fmt.Fprintf(os.Stderr, "Building identity cache... ")
 
 	c.identitiesExcerpts = make(map[entity.Id]*IdentityExcerpt)

cache/subcache.go 🔗

@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"encoding/gob"
 	"fmt"
+	"os"
 	"sync"
 
 	"github.com/pkg/errors"
@@ -21,13 +22,6 @@ type CacheEntity interface {
 	NeedCommit() bool
 }
 
-type cacheMgmt interface {
-	Load() error
-	Write() error
-	Build() error
-	Close() error
-}
-
 type getUserIdentityFunc func() (*IdentityCache, error)
 
 type SubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface] struct {
@@ -38,6 +32,7 @@ type SubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface] st
 	readWithResolver func(repository.ClockedRepo, entity.Resolvers, entity.Id) (EntityT, error)
 	makeCached       func(*SubCache[ExcerptT, CacheT, EntityT], getUserIdentityFunc, EntityT) CacheT
 	makeExcerpt      func() Excerpt
+	indexingCallback func(CacheT) error
 
 	typename  string
 	namespace string
@@ -70,6 +65,10 @@ func NewSubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface]
 	}
 }
 
+func (sc *SubCache[ExcerptT, CacheT, EntityT]) Typename() string {
+	return sc.typename
+}
+
 // Load will try to read from the disk the entity cache file
 func (sc *SubCache[ExcerptT, CacheT, EntityT]) Load() error {
 	sc.mu.Lock()
@@ -151,7 +150,32 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) Write() error {
 }
 
 func (sc *SubCache[ExcerptT, CacheT, EntityT]) Build() error {
+	sc.excerpts = make(map[entity.Id]ExcerptT)
+
+	sc.readWithResolver
+
+	allBugs := bug.ReadAllWithResolver(c.repo, c.resolvers)
+
+	// wipe the index just to be sure
+	err := c.repo.ClearBleveIndex("bug")
+	if err != nil {
+		return err
+	}
+
+	for b := range allBugs {
+		if b.Err != nil {
+			return b.Err
+		}
+
+		snap := b.Bug.Compile()
+		c.bugExcerpts[b.Bug.Id()] = NewBugExcerpt(b.Bug, snap)
+
+		if err := c.addBugToSearchIndex(snap); err != nil {
+			return err
+		}
+	}
 
+	_, _ = fmt.Fprintln(os.Stderr, "Done.")
 }
 
 func (sc *SubCache[ExcerptT, CacheT, EntityT]) Close() error {
@@ -191,7 +215,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) Resolve(id entity.Id) (CacheT, er
 
 	b, err := sc.readWithResolver(sc.repo, sc.resolvers(), id)
 	if err != nil {
-		return nil, err
+		return *new(CacheT), err
 	}
 
 	cached = sc.makeCached(sc, sc.getUserIdentity, b)
@@ -217,7 +241,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolvePrefix(prefix string) (Cac
 func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
 	id, err := sc.resolveMatcher(f)
 	if err != nil {
-		return nil, err
+		return *new(CacheT), err
 	}
 	return sc.Resolve(id)
 }
@@ -229,7 +253,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerpt(id entity.Id) (Exc
 
 	excerpt, ok := sc.excerpts[id]
 	if !ok {
-		return nil, entity.NewErrNotFound(sc.typename)
+		return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
 	}
 
 	return excerpt, nil
@@ -246,7 +270,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerptPrefix(prefix strin
 func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
 	id, err := sc.resolveMatcher(f)
 	if err != nil {
-		return nil, err
+		return *new(ExcerptT), err
 	}
 	return sc.ResolveExcerpt(id)
 }
@@ -281,7 +305,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) add(e EntityT) (CacheT, error) {
 	sc.mu.Lock()
 	if _, has := sc.cached[e.Id()]; has {
 		sc.mu.Unlock()
-		return nil, fmt.Errorf("entity %s already exist in the cache", e.Id())
+		return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
 	}
 
 	cached := sc.makeCached(sc, sc.getUserIdentity, e)
@@ -294,7 +318,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) add(e EntityT) (CacheT, error) {
 	// force the write of the excerpt
 	err := sc.entityUpdated(e.Id())
 	if err != nil {
-		return nil, err
+		return *new(CacheT), err
 	}
 
 	return cached, nil

util/multierr/errwaitgroup.go 🔗

@@ -0,0 +1,115 @@
+package multierr
+
+import (
+	"context"
+	"fmt"
+	"sync"
+)
+
+type token struct{}
+
+// A ErrWaitGroup is a collection of goroutines working on subtasks that are part of
+// the same overall task.
+//
+// A zero ErrWaitGroup is valid, has no limit on the number of active goroutines,
+// and does not cancel on error.
+type ErrWaitGroup struct {
+	cancel func()
+
+	wg sync.WaitGroup
+
+	sem chan token
+
+	mu  sync.Mutex
+	err error
+}
+
+func (g *ErrWaitGroup) done() {
+	if g.sem != nil {
+		<-g.sem
+	}
+	g.wg.Done()
+}
+
+// WithContext returns a new ErrWaitGroup and an associated Context derived from ctx.
+//
+// The derived Context is canceled the first time Wait returns.
+func WithContext(ctx context.Context) (*ErrWaitGroup, context.Context) {
+	ctx, cancel := context.WithCancel(ctx)
+	return &ErrWaitGroup{cancel: cancel}, ctx
+}
+
+// Wait blocks until all function calls from the Go method have returned, then
+// returns the combined non-nil errors (if any) from them.
+func (g *ErrWaitGroup) Wait() error {
+	g.wg.Wait()
+	if g.cancel != nil {
+		g.cancel()
+	}
+	return g.err
+}
+
+// Go calls the given function in a new goroutine.
+// It blocks until the new goroutine can be added without the number of
+// active goroutines in the group exceeding the configured limit.
+func (g *ErrWaitGroup) Go(f func() error) {
+	if g.sem != nil {
+		g.sem <- token{}
+	}
+
+	g.wg.Add(1)
+	go func() {
+		defer g.done()
+
+		if err := f(); err != nil {
+			g.mu.Lock()
+			err = Join(g.err, err)
+			g.mu.Unlock()
+		}
+	}()
+}
+
+// TryGo calls the given function in a new goroutine only if the number of
+// active goroutines in the group is currently below the configured limit.
+//
+// The return value reports whether the goroutine was started.
+func (g *ErrWaitGroup) TryGo(f func() error) bool {
+	if g.sem != nil {
+		select {
+		case g.sem <- token{}:
+			// Note: this allows barging iff channels in general allow barging.
+		default:
+			return false
+		}
+	}
+
+	g.wg.Add(1)
+	go func() {
+		defer g.done()
+
+		if err := f(); err != nil {
+			g.mu.Lock()
+			err = Join(g.err, err)
+			g.mu.Unlock()
+		}
+	}()
+	return true
+}
+
+// SetLimit limits the number of active goroutines in this group to at most n.
+// A negative value indicates no limit.
+//
+// Any subsequent call to the Go method will block until it can add an active
+// goroutine without exceeding the configured limit.
+//
+// The limit must not be modified while any goroutines in the group are active.
+func (g *ErrWaitGroup) SetLimit(n int) {
+	if n < 0 {
+		g.sem = nil
+		return
+	}
+	if len(g.sem) != 0 {
+		panic(fmt.Errorf("errwaitgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
+	}
+	g.sem = make(chan token, n)
+}

util/multierr/join.go 🔗

@@ -0,0 +1,51 @@
+package multierr
+
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Join returns an error that wraps the given errors.
+// Any nil error values are discarded.
+// Join returns nil if errs contains no non-nil values.
+// The error formats as the concatenation of the strings obtained
+// by calling the Error method of each element of errs, with a newline
+// between each string.
+func Join(errs ...error) error {
+	n := 0
+	for _, err := range errs {
+		if err != nil {
+			n++
+		}
+	}
+	if n == 0 {
+		return nil
+	}
+	e := &joinError{
+		errs: make([]error, 0, n),
+	}
+	for _, err := range errs {
+		if err != nil {
+			e.errs = append(e.errs, err)
+		}
+	}
+	return e
+}
+
+type joinError struct {
+	errs []error
+}
+
+func (e *joinError) Error() string {
+	var b []byte
+	for i, err := range e.errs {
+		if i > 0 {
+			b = append(b, '\n')
+		}
+		b = append(b, err.Error()...)
+	}
+	return string(b)
+}
+
+func (e *joinError) Unwrap() []error {
+	return e.errs
+}