repo_cache.go

  1package cache
  2
  3import (
  4	"fmt"
  5	"io"
  6	"os"
  7	"strconv"
  8	"sync"
  9
 10	"github.com/git-bug/git-bug/entities/bug"
 11	"github.com/git-bug/git-bug/entities/identity"
 12	"github.com/git-bug/git-bug/entity"
 13	"github.com/git-bug/git-bug/repository"
 14	"github.com/git-bug/git-bug/util/multierr"
 15	"github.com/git-bug/git-bug/util/process"
 16)
 17
 18// 1: original format
 19// 2: added cache for identities with a reference in the bug cache
 20// 3: no more legacy identity
 21// 4: entities make their IDs from data, not git commit
 22const formatVersion = 4
 23
 24// The maximum number of bugs loaded in memory. After that, eviction will be done.
 25const defaultMaxLoadedBugs = 1000
 26
 27var _ repository.RepoCommon = &RepoCache{}
 28var _ repository.RepoConfig = &RepoCache{}
 29var _ repository.RepoKeyring = &RepoCache{}
 30
 31// cacheMgmt is the expected interface for a sub-cache.
 32type cacheMgmt interface {
 33	Typename() string
 34	Load() error
 35	Build() <-chan BuildEvent
 36	SetCacheSize(size int)
 37	RemoveAll() error
 38	MergeAll(remote string) <-chan entity.MergeResult
 39	GetNamespace() string
 40	Close() error
 41}
 42
 43// Observer gets notified of changes in entities in the cache
 44type Observer interface {
 45	// EntityCreated notifies that an entity has been created.
 46	// The body of that function should NOT block.
 47	EntityCreated(typename string, id entity.Id)
 48
 49	// EntityUpdated notifies that an entity has been updated.
 50	// The body of that function should NOT block.
 51	EntityUpdated(typename string, id entity.Id)
 52}
 53
 54// RepoCache is a cache for a Repository. This cache has multiple functions:
 55//
 56//  1. After being loaded, a Bug is kept in memory in the cache, allowing for fast
 57//     access later.
 58//  2. The cache maintains in memory and on disk a pre-digested excerpt for each bug,
 59//     allowing for fast querying the whole set of bugs without having to load
 60//     them individually.
 61//  3. The cache guarantees that a single instance of a Bug is loaded at once, avoiding
 62//     loss of data that we could have with multiple copies in the same process.
 63//  4. The same way, the cache maintains in memory a single copy of the loaded identities.
 64//
 65// The cache also protects the on-disk data by locking the git repository for its
 66// own usage, by writing a lock file. Of course, normal git operations are not
 67// affected, only git-bug related one.
 68type RepoCache struct {
 69	// the underlying repo
 70	repo repository.ClockedRepo
 71
 72	// the name of the repository, as defined in the MultiRepoCache
 73	name string
 74
 75	// resolvers for all known entities and excerpts
 76	resolvers entity.Resolvers
 77
 78	bugs       *RepoCacheBug
 79	identities *RepoCacheIdentity
 80
 81	subcaches []cacheMgmt
 82
 83	// the user identity's id, if known
 84	muUserIdentity sync.RWMutex
 85	userIdentityId entity.Id
 86}
 87
 88// NewRepoCache create or open a cache on top of a raw repository.
 89// The caller is expected to read all returned events before the cache is considered
 90// ready to use.
 91func NewRepoCache(r repository.ClockedRepo) (*RepoCache, chan BuildEvent) {
 92	return NewNamedRepoCache(r, defaultRepoName)
 93}
 94
 95// NewNamedRepoCache create or open a named cache on top of a raw repository.
 96// The caller is expected to read all returned events before the cache is considered
 97// ready to use.
 98func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, chan BuildEvent) {
 99	c := &RepoCache{
100		repo: r,
101		name: name,
102	}
103
104	c.identities = NewRepoCacheIdentity(r, c.getResolvers, c.GetUserIdentity)
105	c.subcaches = append(c.subcaches, c.identities)
106
107	c.bugs = NewRepoCacheBug(r, c.getResolvers, c.GetUserIdentity)
108	c.subcaches = append(c.subcaches, c.bugs)
109
110	c.resolvers = entity.Resolvers{
111		&IdentityCache{}:   entity.ResolverFunc[*IdentityCache](c.identities.Resolve),
112		&IdentityExcerpt{}: entity.ResolverFunc[*IdentityExcerpt](c.identities.ResolveExcerpt),
113		&BugCache{}:        entity.ResolverFunc[*BugCache](c.bugs.Resolve),
114		&BugExcerpt{}:      entity.ResolverFunc[*BugExcerpt](c.bugs.ResolveExcerpt),
115	}
116
117	// small buffer so that the functions below can emit an event without blocking
118	events := make(chan BuildEvent)
119
120	go func() {
121		defer close(events)
122
123		err := c.lock(events)
124		if err != nil {
125			events <- BuildEvent{Err: err}
126			return
127		}
128
129		err = c.load()
130		if err == nil {
131			return
132		}
133
134		// Cache is either missing, broken or outdated. Rebuilding.
135		c.buildCache(events)
136	}()
137
138	return c, events
139}
140
141func NewRepoCacheNoEvents(r repository.ClockedRepo) (*RepoCache, error) {
142	cache, events := NewRepoCache(r)
143	for event := range events {
144		if event.Err != nil {
145			for range events {
146			}
147			return nil, event.Err
148		}
149	}
150	return cache, nil
151}
152
153func (c *RepoCache) RegisterObserver(typename string, observer Observer) {
154	switch typename {
155	case bug.Typename:
156		c.bugs.RegisterObserver(observer)
157	case identity.Typename:
158		c.identities.RegisterObserver(observer)
159	default:
160		panic(fmt.Sprintf("unknown typename %q", typename))
161	}
162}
163
164func (c *RepoCache) UnregisterObserver(typename string, observer Observer) {
165	switch typename {
166	case bug.Typename:
167		c.bugs.UnregisterObserver(observer)
168	case identity.Typename:
169		c.identities.UnregisterObserver(observer)
170	default:
171		panic(fmt.Sprintf("unknown typename %q", typename))
172	}
173}
174
175// Bugs gives access to the Bug entities
176func (c *RepoCache) Bugs() *RepoCacheBug {
177	return c.bugs
178}
179
180// Identities gives access to the Identity entities
181func (c *RepoCache) Identities() *RepoCacheIdentity {
182	return c.identities
183}
184
185func (c *RepoCache) getResolvers() entity.Resolvers {
186	return c.resolvers
187}
188
189// setCacheSize change the maximum number of loaded bugs
190func (c *RepoCache) setCacheSize(size int) {
191	for _, subcache := range c.subcaches {
192		subcache.SetCacheSize(size)
193	}
194}
195
196// load will try to read from the disk all the cache files
197func (c *RepoCache) load() error {
198	var errWait multierr.ErrWaitGroup
199	for _, mgmt := range c.subcaches {
200		errWait.Go(mgmt.Load)
201	}
202	return errWait.Wait()
203}
204
205func (c *RepoCache) lock(events chan BuildEvent) error {
206	err := repoIsAvailable(c.repo, events)
207	if err != nil {
208		return err
209	}
210
211	f, err := c.repo.LocalStorage().Create(lockfile)
212	if err != nil {
213		return err
214	}
215
216	pid := fmt.Sprintf("%d", os.Getpid())
217	_, err = f.Write([]byte(pid))
218	if err != nil {
219		_ = f.Close()
220		return err
221	}
222
223	return f.Close()
224}
225
226func (c *RepoCache) Close() error {
227	var errWait multierr.ErrWaitGroup
228	for _, mgmt := range c.subcaches {
229		errWait.Go(mgmt.Close)
230	}
231	err := errWait.Wait()
232	if err != nil {
233		return err
234	}
235
236	err = c.repo.Close()
237	if err != nil {
238		return err
239	}
240
241	return c.repo.LocalStorage().Remove(lockfile)
242}
243
244type BuildEventType int
245
246const (
247	_ BuildEventType = iota
248	// BuildEventCacheIsBuilt signal that the cache is being built (aka, not skipped)
249	BuildEventCacheIsBuilt
250	// BuildEventRemoveLock signal that an old repo lock has been cleaned
251	BuildEventRemoveLock
252	// BuildEventStarted signal the beginning of a cache build for an entity
253	BuildEventStarted
254	// BuildEventProgress signal progress in the cache building for an entity
255	BuildEventProgress
256	// BuildEventFinished signal the end of a cache build for an entity
257	BuildEventFinished
258)
259
260// BuildEvent carry an event happening during the cache build process.
261type BuildEvent struct {
262	// Err carry an error if the build process failed. If set, no other field matters.
263	Err error
264	// Typename is the name of the entity of which the event relate to. Can be empty if no particular entity is involved.
265	Typename string
266	// Event is the type of the event.
267	Event BuildEventType
268	// Total is the total number of elements being built. Set if Event is BuildEventStarted.
269	Total int64
270	// Progress is the current count of processed elements. Set if Event is BuildEventProgress.
271	Progress int64
272}
273
274func (c *RepoCache) buildCache(events chan BuildEvent) {
275	events <- BuildEvent{Event: BuildEventCacheIsBuilt}
276
277	var wg sync.WaitGroup
278	for _, subcache := range c.subcaches {
279		wg.Add(1)
280		go func(subcache cacheMgmt) {
281			defer wg.Done()
282
283			buildEvents := subcache.Build()
284			for buildEvent := range buildEvents {
285				events <- buildEvent
286				if buildEvent.Err != nil {
287					return
288				}
289			}
290		}(subcache)
291	}
292	wg.Wait()
293}
294
295// repoIsAvailable check is the given repository is locked by a Cache.
296// Note: this is a smart function that will clean the lock file if the
297// corresponding process is not there anymore.
298// If no error is returned, the repo is free to edit.
299func repoIsAvailable(repo repository.RepoStorage, events chan BuildEvent) error {
300	// Todo: this leave way for a racey access to the repo between the test
301	// if the file exist and the actual write. It's probably not a problem in
302	// practice because using a repository will be done from user interaction
303	// or in a context where a single instance of git-bug is already guaranteed
304	// (say, a server with the web UI running). But still, that might be nice to
305	// have a mutex or something to guard that.
306
307	// Todo: this will fail if somehow the filesystem is shared with another
308	// computer. Should add a configuration that prevent the cleaning of the
309	// lock file
310
311	f, err := repo.LocalStorage().Open(lockfile)
312	if err != nil && !os.IsNotExist(err) {
313		return err
314	}
315
316	if err == nil {
317		// lock file already exist
318		buf, err := io.ReadAll(io.LimitReader(f, 10))
319		if err != nil {
320			_ = f.Close()
321			return err
322		}
323
324		err = f.Close()
325		if err != nil {
326			return err
327		}
328
329		if len(buf) >= 10 {
330			return fmt.Errorf("the lock file should be < 10 bytes")
331		}
332
333		pid, err := strconv.Atoi(string(buf))
334		if err != nil {
335			return err
336		}
337
338		if process.IsRunning(pid) {
339			return fmt.Errorf("the repository you want to access is already locked by the process pid %d", pid)
340		}
341
342		// The lock file is just laying there after a crash, clean it
343
344		events <- BuildEvent{Event: BuildEventRemoveLock}
345
346		err = repo.LocalStorage().Remove(lockfile)
347		if err != nil {
348			return err
349		}
350	}
351
352	return nil
353}