entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entities/identity"
 13	"github.com/MichaelMure/git-bug/entity"
 14	"github.com/MichaelMure/git-bug/repository"
 15	"github.com/MichaelMure/git-bug/util/lamport"
 16)
 17
 18const refsPattern = "refs/%s/%s"
 19const creationClockPattern = "%s-create"
 20const editClockPattern = "%s-edit"
 21
 22type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
 23
 24// Definition hold the details defining one specialization of an Entity.
 25type Definition struct {
 26	// the name of the entity (bug, pull-request, ...), for human consumption
 27	Typename string
 28	// the Namespace in git references (bugs, prs, ...)
 29	Namespace string
 30	// a function decoding a JSON message into an Operation
 31	OperationUnmarshaler OperationUnmarshaler
 32	// the expected format version number, that can be used for data migration/upgrade
 33	FormatVersion uint
 34}
 35
 36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 37type Entity struct {
 38	// A Lamport clock is a logical clock that allow to order event
 39	// inside a distributed system.
 40	// It must be the first field in this struct due to https://github.com/golang/go/issues/36606
 41	createTime lamport.Time
 42	editTime   lamport.Time
 43
 44	Definition
 45
 46	// operations that are already stored in the repository
 47	ops []Operation
 48	// operations not yet stored in the repository
 49	staging []Operation
 50
 51	lastCommit repository.Hash
 52}
 53
 54// New create an empty Entity
 55func New(definition Definition) *Entity {
 56	return &Entity{
 57		Definition: definition,
 58	}
 59}
 60
 61// Read will read and decode a stored local Entity from a repository
 62func Read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Entity, error) {
 63	if err := id.Validate(); err != nil {
 64		return nil, errors.Wrap(err, "invalid id")
 65	}
 66
 67	ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
 68
 69	return read(def, repo, resolvers, ref)
 70}
 71
 72// readRemote will read and decode a stored remote Entity from a repository
 73func readRemote(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (*Entity, error) {
 74	if err := id.Validate(); err != nil {
 75		return nil, errors.Wrap(err, "invalid id")
 76	}
 77
 78	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
 79
 80	return read(def, repo, resolvers, ref)
 81}
 82
 83// read fetch from git and decode an Entity at an arbitrary git reference.
 84func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (*Entity, error) {
 85	rootHash, err := repo.ResolveRef(ref)
 86	if err != nil {
 87		return nil, err
 88	}
 89
 90	// Perform a breadth-first search to get a topological order of the DAG where we discover the
 91	// parents commit and go back in time up to the chronological root
 92
 93	queue := make([]repository.Hash, 0, 32)
 94	visited := make(map[repository.Hash]struct{})
 95	BFSOrder := make([]repository.Commit, 0, 32)
 96
 97	queue = append(queue, rootHash)
 98	visited[rootHash] = struct{}{}
 99
100	for len(queue) > 0 {
101		// pop
102		hash := queue[0]
103		queue = queue[1:]
104
105		commit, err := repo.ReadCommit(hash)
106		if err != nil {
107			return nil, err
108		}
109
110		BFSOrder = append(BFSOrder, commit)
111
112		for _, parent := range commit.Parents {
113			if _, ok := visited[parent]; !ok {
114				queue = append(queue, parent)
115				// mark as visited
116				visited[parent] = struct{}{}
117			}
118		}
119	}
120
121	// Now, we can reverse this topological order and read the commits in an order where
122	// we are sure to have read all the chronological ancestors when we read a commit.
123
124	// Next step is to:
125	// 1) read the operationPacks
126	// 2) make sure that clocks causality respect the DAG topology.
127
128	oppMap := make(map[repository.Hash]*operationPack)
129	var opsCount int
130
131	for i := len(BFSOrder) - 1; i >= 0; i-- {
132		commit := BFSOrder[i]
133		isFirstCommit := i == len(BFSOrder)-1
134		isMerge := len(commit.Parents) > 1
135
136		// Verify DAG structure: single chronological root, so only the root
137		// can have no parents. Said otherwise, the DAG need to have exactly
138		// one leaf.
139		if !isFirstCommit && len(commit.Parents) == 0 {
140			return nil, fmt.Errorf("multiple leafs in the entity DAG")
141		}
142
143		opp, err := readOperationPack(def, repo, resolvers, commit)
144		if err != nil {
145			return nil, err
146		}
147
148		err = opp.Validate()
149		if err != nil {
150			return nil, err
151		}
152
153		if isMerge && len(opp.Operations) > 0 {
154			return nil, fmt.Errorf("merge commit cannot have operations")
155		}
156
157		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
158		if isFirstCommit && opp.CreateTime <= 0 {
159			return nil, fmt.Errorf("creation lamport time not set")
160		}
161
162		// make sure that the lamport clocks causality match the DAG topology
163		for _, parentHash := range commit.Parents {
164			parentPack, ok := oppMap[parentHash]
165			if !ok {
166				panic("DFS failed")
167			}
168
169			if parentPack.EditTime >= opp.EditTime {
170				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
171			}
172
173			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
174			// that the clocks don't jump too far in the future
175			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
176			// as long as there is one valid chain of small hops, it's fine.
177			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
178				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
179			}
180		}
181
182		oppMap[commit.Hash] = opp
183		opsCount += len(opp.Operations)
184	}
185
186	// The clocks are fine, we witness them
187	for _, opp := range oppMap {
188		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
189		if err != nil {
190			return nil, err
191		}
192		err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
193		if err != nil {
194			return nil, err
195		}
196	}
197
198	// Now that we know that the topological order and clocks are fine, we order the operationPacks
199	// based on the logical clocks, entirely ignoring the DAG topology
200
201	oppSlice := make([]*operationPack, 0, len(oppMap))
202	for _, pack := range oppMap {
203		oppSlice = append(oppSlice, pack)
204	}
205	sort.Slice(oppSlice, func(i, j int) bool {
206		// Primary ordering with the EditTime.
207		if oppSlice[i].EditTime != oppSlice[j].EditTime {
208			return oppSlice[i].EditTime < oppSlice[j].EditTime
209		}
210		// We have equal EditTime, which means we have concurrent edition over different machines, and we
211		// can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
212		// As a secondary ordering, we can order based on a hash of the serialized Operations in the
213		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
214		// This is a lexicographic ordering on the stringified ID.
215		return oppSlice[i].Id() < oppSlice[j].Id()
216	})
217
218	// Now that we ordered the operationPacks, we have the order of the Operations
219
220	ops := make([]Operation, 0, opsCount)
221	var createTime lamport.Time
222	var editTime lamport.Time
223	for _, pack := range oppSlice {
224		for _, operation := range pack.Operations {
225			ops = append(ops, operation)
226		}
227		if pack.CreateTime > createTime {
228			createTime = pack.CreateTime
229		}
230		if pack.EditTime > editTime {
231			editTime = pack.EditTime
232		}
233	}
234
235	return &Entity{
236		Definition: def,
237		ops:        ops,
238		lastCommit: rootHash,
239		createTime: createTime,
240		editTime:   editTime,
241	}, nil
242}
243
244// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
245// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
246// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
247// operation blobs can be implemented instead.
248func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
249	rootHash, err := repo.ResolveRef(ref)
250	if err != nil {
251		return err
252	}
253
254	commit, err := repo.ReadCommit(rootHash)
255	if err != nil {
256		return err
257	}
258
259	createTime, editTime, err := readOperationPackClock(repo, commit)
260	if err != nil {
261		return err
262	}
263
264	// if we have more than one commit, we need to find the root to have the create time
265	if len(commit.Parents) > 0 {
266		for len(commit.Parents) > 0 {
267			// The path to the root is irrelevant.
268			commit, err = repo.ReadCommit(commit.Parents[0])
269			if err != nil {
270				return err
271			}
272		}
273		createTime, _, err = readOperationPackClock(repo, commit)
274		if err != nil {
275			return err
276		}
277	}
278
279	if createTime <= 0 {
280		return fmt.Errorf("creation lamport time not set")
281	}
282	if editTime <= 0 {
283		return fmt.Errorf("creation lamport time not set")
284	}
285	err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
286	if err != nil {
287		return err
288	}
289	err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
290	if err != nil {
291		return err
292	}
293	return nil
294}
295
296type StreamedEntity struct {
297	Entity *Entity
298	Err    error
299}
300
301// ReadAll read and parse all local Entity
302func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity {
303	out := make(chan StreamedEntity)
304
305	go func() {
306		defer close(out)
307
308		refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
309
310		refs, err := repo.ListRefs(refPrefix)
311		if err != nil {
312			out <- StreamedEntity{Err: err}
313			return
314		}
315
316		for _, ref := range refs {
317			e, err := read(def, repo, resolvers, ref)
318
319			if err != nil {
320				out <- StreamedEntity{Err: err}
321				return
322			}
323
324			out <- StreamedEntity{Entity: e}
325		}
326	}()
327
328	return out
329}
330
331// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
332// repo end up with correct clocks for the next write.
333func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
334	refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
335
336	refs, err := repo.ListRefs(refPrefix)
337	if err != nil {
338		return err
339	}
340
341	for _, ref := range refs {
342		err = readClockNoCheck(def, repo, ref)
343		if err != nil {
344			return err
345		}
346	}
347
348	return nil
349}
350
351// Id return the Entity identifier
352func (e *Entity) Id() entity.Id {
353	// id is the id of the first operation
354	return e.FirstOp().Id()
355}
356
357// Validate check if the Entity data is valid
358func (e *Entity) Validate() error {
359	// non-empty
360	if len(e.ops) == 0 && len(e.staging) == 0 {
361		return fmt.Errorf("entity has no operations")
362	}
363
364	// check if each operations are valid
365	for _, op := range e.ops {
366		if err := op.Validate(); err != nil {
367			return err
368		}
369	}
370
371	// check if staging is valid if needed
372	for _, op := range e.staging {
373		if err := op.Validate(); err != nil {
374			return err
375		}
376	}
377
378	// Check that there is no colliding operation's ID
379	ids := make(map[entity.Id]struct{})
380	for _, op := range e.Operations() {
381		if _, ok := ids[op.Id()]; ok {
382			return fmt.Errorf("id collision: %s", op.Id())
383		}
384		ids[op.Id()] = struct{}{}
385	}
386
387	return nil
388}
389
390// Operations return the ordered operations
391func (e *Entity) Operations() []Operation {
392	return append(e.ops, e.staging...)
393}
394
395// FirstOp lookup for the very first operation of the Entity
396func (e *Entity) FirstOp() Operation {
397	for _, op := range e.ops {
398		return op
399	}
400	for _, op := range e.staging {
401		return op
402	}
403	return nil
404}
405
406// LastOp lookup for the very last operation of the Entity
407func (e *Entity) LastOp() Operation {
408	if len(e.staging) > 0 {
409		return e.staging[len(e.staging)-1]
410	}
411	if len(e.ops) > 0 {
412		return e.ops[len(e.ops)-1]
413	}
414	return nil
415}
416
417// Append add a new Operation to the Entity
418func (e *Entity) Append(op Operation) {
419	e.staging = append(e.staging, op)
420}
421
422// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
423func (e *Entity) NeedCommit() bool {
424	return len(e.staging) > 0
425}
426
427// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
428// is already in sync with the repository.
429func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
430	if e.NeedCommit() {
431		return e.Commit(repo)
432	}
433	return nil
434}
435
436// Commit write the appended operations in the repository
437func (e *Entity) Commit(repo repository.ClockedRepo) error {
438	if !e.NeedCommit() {
439		return fmt.Errorf("can't commit an entity with no pending operation")
440	}
441
442	err := e.Validate()
443	if err != nil {
444		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
445	}
446
447	for len(e.staging) > 0 {
448		var author identity.Interface
449		var toCommit []Operation
450
451		// Split into chunks with the same author
452		for len(e.staging) > 0 {
453			op := e.staging[0]
454			if author != nil && op.Author().Id() != author.Id() {
455				break
456			}
457			author = e.staging[0].Author()
458			toCommit = append(toCommit, op)
459			e.staging = e.staging[1:]
460		}
461
462		e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
463		if err != nil {
464			return err
465		}
466
467		opp := &operationPack{
468			Author:     author,
469			Operations: toCommit,
470			EditTime:   e.editTime,
471		}
472
473		if e.lastCommit == "" {
474			e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
475			if err != nil {
476				return err
477			}
478			opp.CreateTime = e.createTime
479		}
480
481		var parentCommit []repository.Hash
482		if e.lastCommit != "" {
483			parentCommit = []repository.Hash{e.lastCommit}
484		}
485
486		commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
487		if err != nil {
488			return err
489		}
490
491		e.lastCommit = commitHash
492		e.ops = append(e.ops, toCommit...)
493	}
494
495	// not strictly necessary but make equality testing easier in tests
496	e.staging = nil
497
498	// Create or update the Git reference for this entity
499	// When pushing later, the remote will ensure that this ref update
500	// is fast-forward, that is no data has been overwritten.
501	ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
502	return repo.UpdateRef(ref, e.lastCommit)
503}
504
505// CreateLamportTime return the Lamport time of creation
506func (e *Entity) CreateLamportTime() lamport.Time {
507	return e.createTime
508}
509
510// EditLamportTime return the Lamport time of the last edition
511func (e *Entity) EditLamportTime() lamport.Time {
512	return e.editTime
513}