entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entities/identity"
 13	"github.com/MichaelMure/git-bug/entity"
 14	bootstrap "github.com/MichaelMure/git-bug/entity/boostrap"
 15	"github.com/MichaelMure/git-bug/repository"
 16	"github.com/MichaelMure/git-bug/util/lamport"
 17)
 18
 19const refsPattern = "refs/%s/%s"
 20const creationClockPattern = "%s-create"
 21const editClockPattern = "%s-edit"
 22
 23type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
 24
 25// Definition hold the details defining one specialization of an Entity.
 26type Definition struct {
 27	// the name of the entity (bug, pull-request, ...), for human consumption
 28	Typename string
 29	// the Namespace in git references (bugs, prs, ...)
 30	Namespace string
 31	// a function decoding a JSON message into an Operation
 32	OperationUnmarshaler OperationUnmarshaler
 33	// the expected format version number, that can be used for data migration/upgrade
 34	FormatVersion uint
 35}
 36
 37// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 38type Entity struct {
 39	// A Lamport clock is a logical clock that allow to order event
 40	// inside a distributed system.
 41	// It must be the first field in this struct due to https://github.com/golang/go/issues/36606
 42	createTime lamport.Time
 43	editTime   lamport.Time
 44
 45	Definition
 46
 47	// operations that are already stored in the repository
 48	ops []Operation
 49	// operations not yet stored in the repository
 50	staging []Operation
 51
 52	lastCommit repository.Hash
 53}
 54
 55// New create an empty Entity
 56func New(definition Definition) *Entity {
 57	return &Entity{
 58		Definition: definition,
 59	}
 60}
 61
 62// Read will read and decode a stored local Entity from a repository
 63func Read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
 64	if err := id.Validate(); err != nil {
 65		return *new(EntityT), errors.Wrap(err, "invalid id")
 66	}
 67
 68	ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
 69
 70	return read[EntityT](def, wrapper, repo, resolvers, ref)
 71}
 72
 73// readRemote will read and decode a stored remote Entity from a repository
 74func readRemote[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
 75	if err := id.Validate(); err != nil {
 76		return *new(EntityT), errors.Wrap(err, "invalid id")
 77	}
 78
 79	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
 80
 81	return read[EntityT](def, wrapper, repo, resolvers, ref)
 82}
 83
 84// read fetch from git and decode an Entity at an arbitrary git reference.
 85func read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
 86	rootHash, err := repo.ResolveRef(ref)
 87	if err == repository.ErrNotFound {
 88		return *new(EntityT), entity.NewErrNotFound(def.Typename)
 89	}
 90	if err != nil {
 91		return *new(EntityT), err
 92	}
 93
 94	// Perform a breadth-first search to get a topological order of the DAG where we discover the
 95	// parents commit and go back in time up to the chronological root
 96
 97	queue := make([]repository.Hash, 0, 32)
 98	visited := make(map[repository.Hash]struct{})
 99	BFSOrder := make([]repository.Commit, 0, 32)
100
101	queue = append(queue, rootHash)
102	visited[rootHash] = struct{}{}
103
104	for len(queue) > 0 {
105		// pop
106		hash := queue[0]
107		queue = queue[1:]
108
109		commit, err := repo.ReadCommit(hash)
110		if err != nil {
111			return *new(EntityT), err
112		}
113
114		BFSOrder = append(BFSOrder, commit)
115
116		for _, parent := range commit.Parents {
117			if _, ok := visited[parent]; !ok {
118				queue = append(queue, parent)
119				// mark as visited
120				visited[parent] = struct{}{}
121			}
122		}
123	}
124
125	// Now, we can reverse this topological order and read the commits in an order where
126	// we are sure to have read all the chronological ancestors when we read a commit.
127
128	// Next step is to:
129	// 1) read the operationPacks
130	// 2) make sure that clocks causality respect the DAG topology.
131
132	oppMap := make(map[repository.Hash]*operationPack)
133	var opsCount int
134
135	for i := len(BFSOrder) - 1; i >= 0; i-- {
136		commit := BFSOrder[i]
137		isFirstCommit := i == len(BFSOrder)-1
138		isMerge := len(commit.Parents) > 1
139
140		// Verify DAG structure: single chronological root, so only the root
141		// can have no parents. Said otherwise, the DAG need to have exactly
142		// one leaf.
143		if !isFirstCommit && len(commit.Parents) == 0 {
144			return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
145		}
146
147		opp, err := readOperationPack(def, repo, resolvers, commit)
148		if err != nil {
149			return *new(EntityT), err
150		}
151
152		err = opp.Validate()
153		if err != nil {
154			return *new(EntityT), err
155		}
156
157		if isMerge && len(opp.Operations) > 0 {
158			return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
159		}
160
161		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
162		if isFirstCommit && opp.CreateTime <= 0 {
163			return *new(EntityT), fmt.Errorf("creation lamport time not set")
164		}
165
166		// make sure that the lamport clocks causality match the DAG topology
167		for _, parentHash := range commit.Parents {
168			parentPack, ok := oppMap[parentHash]
169			if !ok {
170				panic("DFS failed")
171			}
172
173			if parentPack.EditTime >= opp.EditTime {
174				return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
175			}
176
177			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
178			// that the clocks don't jump too far in the future
179			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
180			// as long as there is one valid chain of small hops, it's fine.
181			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
182				return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
183			}
184		}
185
186		oppMap[commit.Hash] = opp
187		opsCount += len(opp.Operations)
188	}
189
190	// The clocks are fine, we witness them
191	for _, opp := range oppMap {
192		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
193		if err != nil {
194			return *new(EntityT), err
195		}
196		err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
197		if err != nil {
198			return *new(EntityT), err
199		}
200	}
201
202	// Now that we know that the topological order and clocks are fine, we order the operationPacks
203	// based on the logical clocks, entirely ignoring the DAG topology
204
205	oppSlice := make([]*operationPack, 0, len(oppMap))
206	for _, pack := range oppMap {
207		oppSlice = append(oppSlice, pack)
208	}
209	sort.Slice(oppSlice, func(i, j int) bool {
210		// Primary ordering with the EditTime.
211		if oppSlice[i].EditTime != oppSlice[j].EditTime {
212			return oppSlice[i].EditTime < oppSlice[j].EditTime
213		}
214		// We have equal EditTime, which means we have concurrent edition over different machines, and we
215		// can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
216		// As a secondary ordering, we can order based on a hash of the serialized Operations in the
217		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
218		// This is a lexicographic ordering on the stringified ID.
219		return oppSlice[i].Id() < oppSlice[j].Id()
220	})
221
222	// Now that we ordered the operationPacks, we have the order of the Operations
223
224	ops := make([]Operation, 0, opsCount)
225	var createTime lamport.Time
226	var editTime lamport.Time
227	for _, pack := range oppSlice {
228		for _, operation := range pack.Operations {
229			ops = append(ops, operation)
230		}
231		if pack.CreateTime > createTime {
232			createTime = pack.CreateTime
233		}
234		if pack.EditTime > editTime {
235			editTime = pack.EditTime
236		}
237	}
238
239	return wrapper(&Entity{
240		Definition: def,
241		ops:        ops,
242		lastCommit: rootHash,
243		createTime: createTime,
244		editTime:   editTime,
245	}), nil
246}
247
248// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
249// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
250// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
251// operation blobs can be implemented instead.
252func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
253	rootHash, err := repo.ResolveRef(ref)
254	if err == repository.ErrNotFound {
255		return entity.NewErrNotFound(def.Typename)
256	}
257	if err != nil {
258		return err
259	}
260
261	commit, err := repo.ReadCommit(rootHash)
262	if err != nil {
263		return err
264	}
265
266	createTime, editTime, err := readOperationPackClock(repo, commit)
267	if err != nil {
268		return err
269	}
270
271	// if we have more than one commit, we need to find the root to have the create time
272	if len(commit.Parents) > 0 {
273		for len(commit.Parents) > 0 {
274			// The path to the root is irrelevant.
275			commit, err = repo.ReadCommit(commit.Parents[0])
276			if err != nil {
277				return err
278			}
279		}
280		createTime, _, err = readOperationPackClock(repo, commit)
281		if err != nil {
282			return err
283		}
284	}
285
286	if createTime <= 0 {
287		return fmt.Errorf("creation lamport time not set")
288	}
289	if editTime <= 0 {
290		return fmt.Errorf("creation lamport time not set")
291	}
292	err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
293	if err != nil {
294		return err
295	}
296	err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
297	if err != nil {
298		return err
299	}
300	return nil
301}
302
303// ReadAll read and parse all local Entity
304func ReadAll[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan bootstrap.StreamedEntity[EntityT] {
305	out := make(chan bootstrap.StreamedEntity[EntityT])
306
307	go func() {
308		defer close(out)
309
310		refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
311
312		refs, err := repo.ListRefs(refPrefix)
313		if err != nil {
314			out <- bootstrap.StreamedEntity[EntityT]{Err: err}
315			return
316		}
317
318		total := int64(len(refs))
319		current := int64(1)
320
321		for _, ref := range refs {
322			e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
323
324			if err != nil {
325				out <- bootstrap.StreamedEntity[EntityT]{Err: err}
326				return
327			}
328
329			out <- bootstrap.StreamedEntity[EntityT]{
330				Entity:        e,
331				CurrentEntity: current,
332				TotalEntities: total,
333			}
334			current++
335		}
336	}()
337
338	return out
339}
340
341// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
342// repo end up with correct clocks for the next write.
343func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
344	refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
345
346	refs, err := repo.ListRefs(refPrefix)
347	if err != nil {
348		return err
349	}
350
351	for _, ref := range refs {
352		err = readClockNoCheck(def, repo, ref)
353		if err != nil {
354			return err
355		}
356	}
357
358	return nil
359}
360
361// Id return the Entity identifier
362func (e *Entity) Id() entity.Id {
363	// id is the id of the first operation
364	return e.FirstOp().Id()
365}
366
367// Validate check if the Entity data is valid
368func (e *Entity) Validate() error {
369	// non-empty
370	if len(e.ops) == 0 && len(e.staging) == 0 {
371		return fmt.Errorf("entity has no operations")
372	}
373
374	// check if each operation are valid
375	for _, op := range e.ops {
376		if err := op.Validate(); err != nil {
377			return err
378		}
379	}
380
381	// check if staging is valid if needed
382	for _, op := range e.staging {
383		if err := op.Validate(); err != nil {
384			return err
385		}
386	}
387
388	// Check that there is no colliding operation's ID
389	ids := make(map[entity.Id]struct{})
390	for _, op := range e.Operations() {
391		if _, ok := ids[op.Id()]; ok {
392			return fmt.Errorf("id collision: %s", op.Id())
393		}
394		ids[op.Id()] = struct{}{}
395	}
396
397	return nil
398}
399
400// Operations return the ordered operations
401func (e *Entity) Operations() []Operation {
402	return append(e.ops, e.staging...)
403}
404
405// FirstOp lookup for the very first operation of the Entity
406func (e *Entity) FirstOp() Operation {
407	for _, op := range e.ops {
408		return op
409	}
410	for _, op := range e.staging {
411		return op
412	}
413	return nil
414}
415
416// LastOp lookup for the very last operation of the Entity
417func (e *Entity) LastOp() Operation {
418	if len(e.staging) > 0 {
419		return e.staging[len(e.staging)-1]
420	}
421	if len(e.ops) > 0 {
422		return e.ops[len(e.ops)-1]
423	}
424	return nil
425}
426
427// Append add a new Operation to the Entity
428func (e *Entity) Append(op Operation) {
429	e.staging = append(e.staging, op)
430}
431
432// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
433func (e *Entity) NeedCommit() bool {
434	return len(e.staging) > 0
435}
436
437// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
438// is already in sync with the repository.
439func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
440	if e.NeedCommit() {
441		return e.Commit(repo)
442	}
443	return nil
444}
445
446// Commit write the appended operations in the repository
447func (e *Entity) Commit(repo repository.ClockedRepo) error {
448	if !e.NeedCommit() {
449		return fmt.Errorf("can't commit an entity with no pending operation")
450	}
451
452	err := e.Validate()
453	if err != nil {
454		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
455	}
456
457	for len(e.staging) > 0 {
458		var author identity.Interface
459		var toCommit []Operation
460
461		// Split into chunks with the same author
462		for len(e.staging) > 0 {
463			op := e.staging[0]
464			if author != nil && op.Author().Id() != author.Id() {
465				break
466			}
467			author = e.staging[0].Author()
468			toCommit = append(toCommit, op)
469			e.staging = e.staging[1:]
470		}
471
472		e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
473		if err != nil {
474			return err
475		}
476
477		opp := &operationPack{
478			Author:     author,
479			Operations: toCommit,
480			EditTime:   e.editTime,
481		}
482
483		if e.lastCommit == "" {
484			e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
485			if err != nil {
486				return err
487			}
488			opp.CreateTime = e.createTime
489		}
490
491		var parentCommit []repository.Hash
492		if e.lastCommit != "" {
493			parentCommit = []repository.Hash{e.lastCommit}
494		}
495
496		commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
497		if err != nil {
498			return err
499		}
500
501		e.lastCommit = commitHash
502		e.ops = append(e.ops, toCommit...)
503	}
504
505	// not strictly necessary but make equality testing easier in tests
506	e.staging = nil
507
508	// Create or update the Git reference for this entity
509	// When pushing later, the remote will ensure that this ref update
510	// is fast-forward, that is no data has been overwritten.
511	ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
512	return repo.UpdateRef(ref, e.lastCommit)
513}
514
515// CreateLamportTime return the Lamport time of creation
516func (e *Entity) CreateLamportTime() lamport.Time {
517	return e.createTime
518}
519
520// EditLamportTime return the Lamport time of the last edition
521func (e *Entity) EditLamportTime() lamport.Time {
522	return e.editTime
523}