entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entity"
 13	bootstrap "github.com/MichaelMure/git-bug/entity/boostrap"
 14	"github.com/MichaelMure/git-bug/repository"
 15	"github.com/MichaelMure/git-bug/util/lamport"
 16)
 17
 18const refsPattern = "refs/%s/%s"
 19const creationClockPattern = "%s-create"
 20const editClockPattern = "%s-edit"
 21
 22type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
 23
 24// Definition hold the details defining one specialization of an Entity.
 25type Definition struct {
 26	// the name of the entity (bug, pull-request, ...), for human consumption
 27	Typename string
 28	// the Namespace in git references (bugs, prs, ...)
 29	Namespace string
 30	// a function decoding a JSON message into an Operation
 31	OperationUnmarshaler OperationUnmarshaler
 32	// the expected format version number, that can be used for data migration/upgrade
 33	FormatVersion uint
 34}
 35
 36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 37type Entity struct {
 38	// A Lamport clock is a logical clock that allow to order event
 39	// inside a distributed system.
 40	// It must be the first field in this struct due to https://github.com/golang/go/issues/36606
 41	createTime lamport.Time
 42	editTime   lamport.Time
 43
 44	Definition
 45
 46	// operations that are already stored in the repository
 47	ops []Operation
 48	// operations not yet stored in the repository
 49	staging []Operation
 50
 51	lastCommit repository.Hash
 52}
 53
 54// New create an empty Entity
 55func New(definition Definition) *Entity {
 56	return &Entity{
 57		Definition: definition,
 58	}
 59}
 60
 61// Read will read and decode a stored local Entity from a repository
 62func Read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
 63	if err := id.Validate(); err != nil {
 64		return *new(EntityT), errors.Wrap(err, "invalid id")
 65	}
 66
 67	ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
 68
 69	return read[EntityT](def, wrapper, repo, resolvers, ref)
 70}
 71
 72// readRemote will read and decode a stored remote Entity from a repository
 73func readRemote[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
 74	if err := id.Validate(); err != nil {
 75		return *new(EntityT), errors.Wrap(err, "invalid id")
 76	}
 77
 78	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
 79
 80	return read[EntityT](def, wrapper, repo, resolvers, ref)
 81}
 82
 83// read fetch from git and decode an Entity at an arbitrary git reference.
 84func read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
 85	rootHash, err := repo.ResolveRef(ref)
 86	if err == repository.ErrNotFound {
 87		return *new(EntityT), entity.NewErrNotFound(def.Typename)
 88	}
 89	if err != nil {
 90		return *new(EntityT), err
 91	}
 92
 93	// Perform a breadth-first search to get a topological order of the DAG where we discover the
 94	// parents commit and go back in time up to the chronological root
 95
 96	queue := make([]repository.Hash, 0, 32)
 97	visited := make(map[repository.Hash]struct{})
 98	BFSOrder := make([]repository.Commit, 0, 32)
 99
100	queue = append(queue, rootHash)
101	visited[rootHash] = struct{}{}
102
103	for len(queue) > 0 {
104		// pop
105		hash := queue[0]
106		queue = queue[1:]
107
108		commit, err := repo.ReadCommit(hash)
109		if err != nil {
110			return *new(EntityT), err
111		}
112
113		BFSOrder = append(BFSOrder, commit)
114
115		for _, parent := range commit.Parents {
116			if _, ok := visited[parent]; !ok {
117				queue = append(queue, parent)
118				// mark as visited
119				visited[parent] = struct{}{}
120			}
121		}
122	}
123
124	// Now, we can reverse this topological order and read the commits in an order where
125	// we are sure to have read all the chronological ancestors when we read a commit.
126
127	// Next step is to:
128	// 1) read the operationPacks
129	// 2) make sure that clocks causality respect the DAG topology.
130
131	oppMap := make(map[repository.Hash]*operationPack)
132	var opsCount int
133
134	for i := len(BFSOrder) - 1; i >= 0; i-- {
135		commit := BFSOrder[i]
136		isFirstCommit := i == len(BFSOrder)-1
137		isMerge := len(commit.Parents) > 1
138
139		// Verify DAG structure: single chronological root, so only the root
140		// can have no parents. Said otherwise, the DAG need to have exactly
141		// one leaf.
142		if !isFirstCommit && len(commit.Parents) == 0 {
143			return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
144		}
145
146		opp, err := readOperationPack(def, repo, resolvers, commit)
147		if err != nil {
148			return *new(EntityT), err
149		}
150
151		err = opp.Validate()
152		if err != nil {
153			return *new(EntityT), err
154		}
155
156		if isMerge && len(opp.Operations) > 0 {
157			return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
158		}
159
160		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
161		if isFirstCommit && opp.CreateTime <= 0 {
162			return *new(EntityT), fmt.Errorf("creation lamport time not set")
163		}
164
165		// make sure that the lamport clocks causality match the DAG topology
166		for _, parentHash := range commit.Parents {
167			parentPack, ok := oppMap[parentHash]
168			if !ok {
169				panic("DFS failed")
170			}
171
172			if parentPack.EditTime >= opp.EditTime {
173				return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
174			}
175
176			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
177			// that the clocks don't jump too far in the future
178			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
179			// as long as there is one valid chain of small hops, it's fine.
180			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
181				return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
182			}
183		}
184
185		oppMap[commit.Hash] = opp
186		opsCount += len(opp.Operations)
187	}
188
189	// The clocks are fine, we witness them
190	for _, opp := range oppMap {
191		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
192		if err != nil {
193			return *new(EntityT), err
194		}
195		err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
196		if err != nil {
197			return *new(EntityT), err
198		}
199	}
200
201	// Now that we know that the topological order and clocks are fine, we order the operationPacks
202	// based on the logical clocks, entirely ignoring the DAG topology
203
204	oppSlice := make([]*operationPack, 0, len(oppMap))
205	for _, pack := range oppMap {
206		oppSlice = append(oppSlice, pack)
207	}
208	sort.Slice(oppSlice, func(i, j int) bool {
209		// Primary ordering with the EditTime.
210		if oppSlice[i].EditTime != oppSlice[j].EditTime {
211			return oppSlice[i].EditTime < oppSlice[j].EditTime
212		}
213		// We have equal EditTime, which means we have concurrent edition over different machines, and we
214		// can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
215		// As a secondary ordering, we can order based on a hash of the serialized Operations in the
216		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
217		// This is a lexicographic ordering on the stringified ID.
218		return oppSlice[i].Id() < oppSlice[j].Id()
219	})
220
221	// Now that we ordered the operationPacks, we have the order of the Operations
222
223	ops := make([]Operation, 0, opsCount)
224	var createTime lamport.Time
225	var editTime lamport.Time
226	for _, pack := range oppSlice {
227		for _, operation := range pack.Operations {
228			ops = append(ops, operation)
229		}
230		if pack.CreateTime > createTime {
231			createTime = pack.CreateTime
232		}
233		if pack.EditTime > editTime {
234			editTime = pack.EditTime
235		}
236	}
237
238	return wrapper(&Entity{
239		Definition: def,
240		ops:        ops,
241		lastCommit: rootHash,
242		createTime: createTime,
243		editTime:   editTime,
244	}), nil
245}
246
247// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
248// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
249// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
250// operation blobs can be implemented instead.
251func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
252	rootHash, err := repo.ResolveRef(ref)
253	if err == repository.ErrNotFound {
254		return entity.NewErrNotFound(def.Typename)
255	}
256	if err != nil {
257		return err
258	}
259
260	commit, err := repo.ReadCommit(rootHash)
261	if err != nil {
262		return err
263	}
264
265	createTime, editTime, err := readOperationPackClock(repo, commit)
266	if err != nil {
267		return err
268	}
269
270	// if we have more than one commit, we need to find the root to have the create time
271	if len(commit.Parents) > 0 {
272		for len(commit.Parents) > 0 {
273			// The path to the root is irrelevant.
274			commit, err = repo.ReadCommit(commit.Parents[0])
275			if err != nil {
276				return err
277			}
278		}
279		createTime, _, err = readOperationPackClock(repo, commit)
280		if err != nil {
281			return err
282		}
283	}
284
285	if createTime <= 0 {
286		return fmt.Errorf("creation lamport time not set")
287	}
288	if editTime <= 0 {
289		return fmt.Errorf("creation lamport time not set")
290	}
291	err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
292	if err != nil {
293		return err
294	}
295	err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
296	if err != nil {
297		return err
298	}
299	return nil
300}
301
302// ReadAll read and parse all local Entity
303func ReadAll[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan bootstrap.StreamedEntity[EntityT] {
304	out := make(chan bootstrap.StreamedEntity[EntityT])
305
306	go func() {
307		defer close(out)
308
309		refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
310
311		refs, err := repo.ListRefs(refPrefix)
312		if err != nil {
313			out <- bootstrap.StreamedEntity[EntityT]{Err: err}
314			return
315		}
316
317		total := int64(len(refs))
318		current := int64(1)
319
320		for _, ref := range refs {
321			e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
322
323			if err != nil {
324				out <- bootstrap.StreamedEntity[EntityT]{Err: err}
325				return
326			}
327
328			out <- bootstrap.StreamedEntity[EntityT]{
329				Entity:        e,
330				CurrentEntity: current,
331				TotalEntities: total,
332			}
333			current++
334		}
335	}()
336
337	return out
338}
339
340// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
341// repo end up with correct clocks for the next write.
342func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
343	refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
344
345	refs, err := repo.ListRefs(refPrefix)
346	if err != nil {
347		return err
348	}
349
350	for _, ref := range refs {
351		err = readClockNoCheck(def, repo, ref)
352		if err != nil {
353			return err
354		}
355	}
356
357	return nil
358}
359
360// Id return the Entity identifier
361func (e *Entity) Id() entity.Id {
362	// id is the id of the first operation
363	return e.FirstOp().Id()
364}
365
366// Validate check if the Entity data is valid
367func (e *Entity) Validate() error {
368	// non-empty
369	if len(e.ops) == 0 && len(e.staging) == 0 {
370		return fmt.Errorf("entity has no operations")
371	}
372
373	// check if each operation are valid
374	for _, op := range e.ops {
375		if err := op.Validate(); err != nil {
376			return err
377		}
378	}
379
380	// check if staging is valid if needed
381	for _, op := range e.staging {
382		if err := op.Validate(); err != nil {
383			return err
384		}
385	}
386
387	// Check that there is no colliding operation's ID
388	ids := make(map[entity.Id]struct{})
389	for _, op := range e.Operations() {
390		if _, ok := ids[op.Id()]; ok {
391			return fmt.Errorf("id collision: %s", op.Id())
392		}
393		ids[op.Id()] = struct{}{}
394	}
395
396	return nil
397}
398
399// Operations return the ordered operations
400func (e *Entity) Operations() []Operation {
401	return append(e.ops, e.staging...)
402}
403
404// FirstOp lookup for the very first operation of the Entity
405func (e *Entity) FirstOp() Operation {
406	for _, op := range e.ops {
407		return op
408	}
409	for _, op := range e.staging {
410		return op
411	}
412	return nil
413}
414
415// LastOp lookup for the very last operation of the Entity
416func (e *Entity) LastOp() Operation {
417	if len(e.staging) > 0 {
418		return e.staging[len(e.staging)-1]
419	}
420	if len(e.ops) > 0 {
421		return e.ops[len(e.ops)-1]
422	}
423	return nil
424}
425
426// Append add a new Operation to the Entity
427func (e *Entity) Append(op Operation) {
428	e.staging = append(e.staging, op)
429}
430
431// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
432func (e *Entity) NeedCommit() bool {
433	return len(e.staging) > 0
434}
435
436// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
437// is already in sync with the repository.
438func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
439	if e.NeedCommit() {
440		return e.Commit(repo)
441	}
442	return nil
443}
444
445// Commit write the appended operations in the repository
446func (e *Entity) Commit(repo repository.ClockedRepo) error {
447	if !e.NeedCommit() {
448		return fmt.Errorf("can't commit an entity with no pending operation")
449	}
450
451	err := e.Validate()
452	if err != nil {
453		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
454	}
455
456	for len(e.staging) > 0 {
457		var author entity.Identity
458		var toCommit []Operation
459
460		// Split into chunks with the same author
461		for len(e.staging) > 0 {
462			op := e.staging[0]
463			if author != nil && op.Author().Id() != author.Id() {
464				break
465			}
466			author = e.staging[0].Author()
467			toCommit = append(toCommit, op)
468			e.staging = e.staging[1:]
469		}
470
471		e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
472		if err != nil {
473			return err
474		}
475
476		opp := &operationPack{
477			Author:     author,
478			Operations: toCommit,
479			EditTime:   e.editTime,
480		}
481
482		if e.lastCommit == "" {
483			e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
484			if err != nil {
485				return err
486			}
487			opp.CreateTime = e.createTime
488		}
489
490		var parentCommit []repository.Hash
491		if e.lastCommit != "" {
492			parentCommit = []repository.Hash{e.lastCommit}
493		}
494
495		commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
496		if err != nil {
497			return err
498		}
499
500		e.lastCommit = commitHash
501		e.ops = append(e.ops, toCommit...)
502	}
503
504	// not strictly necessary but make equality testing easier in tests
505	e.staging = nil
506
507	// Create or update the Git reference for this entity
508	// When pushing later, the remote will ensure that this ref update
509	// is fast-forward, that is no data has been overwritten.
510	ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
511	return repo.UpdateRef(ref, e.lastCommit)
512}
513
514// CreateLamportTime return the Lamport time of creation
515func (e *Entity) CreateLamportTime() lamport.Time {
516	return e.createTime
517}
518
519// EditLamportTime return the Lamport time of the last edition
520func (e *Entity) EditLamportTime() lamport.Time {
521	return e.editTime
522}