entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entity"
 13	"github.com/MichaelMure/git-bug/identity"
 14	"github.com/MichaelMure/git-bug/repository"
 15	"github.com/MichaelMure/git-bug/util/lamport"
 16)
 17
 18const refsPattern = "refs/%s/%s"
 19const creationClockPattern = "%s-create"
 20const editClockPattern = "%s-edit"
 21
 22// Definition hold the details defining one specialization of an Entity.
 23type Definition struct {
 24	// the name of the entity (bug, pull-request, ...), for human consumption
 25	Typename string
 26	// the Namespace in git references (bugs, prs, ...)
 27	Namespace string
 28	// a function decoding a JSON message into an Operation
 29	OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
 30	// the expected format version number, that can be used for data migration/upgrade
 31	FormatVersion uint
 32}
 33
 34// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 35type Entity struct {
 36	// A Lamport clock is a logical clock that allow to order event
 37	// inside a distributed system.
 38	// It must be the first field in this struct due to https://github.com/golang/go/issues/36606
 39	createTime lamport.Time
 40	editTime   lamport.Time
 41
 42	Definition
 43
 44	// operations that are already stored in the repository
 45	ops []Operation
 46	// operations not yet stored in the repository
 47	staging []Operation
 48
 49	lastCommit repository.Hash
 50}
 51
 52// New create an empty Entity
 53func New(definition Definition) *Entity {
 54	return &Entity{
 55		Definition: definition,
 56	}
 57}
 58
 59// Read will read and decode a stored local Entity from a repository
 60func Read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Entity, error) {
 61	if err := id.Validate(); err != nil {
 62		return nil, errors.Wrap(err, "invalid id")
 63	}
 64
 65	ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
 66
 67	return read(def, repo, resolvers, ref)
 68}
 69
 70// readRemote will read and decode a stored remote Entity from a repository
 71func readRemote(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (*Entity, error) {
 72	if err := id.Validate(); err != nil {
 73		return nil, errors.Wrap(err, "invalid id")
 74	}
 75
 76	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
 77
 78	return read(def, repo, resolvers, ref)
 79}
 80
 81// read fetch from git and decode an Entity at an arbitrary git reference.
 82func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (*Entity, error) {
 83	rootHash, err := repo.ResolveRef(ref)
 84	if err != nil {
 85		return nil, err
 86	}
 87
 88	// Perform a breadth-first search to get a topological order of the DAG where we discover the
 89	// parents commit and go back in time up to the chronological root
 90
 91	queue := make([]repository.Hash, 0, 32)
 92	visited := make(map[repository.Hash]struct{})
 93	BFSOrder := make([]repository.Commit, 0, 32)
 94
 95	queue = append(queue, rootHash)
 96	visited[rootHash] = struct{}{}
 97
 98	for len(queue) > 0 {
 99		// pop
100		hash := queue[0]
101		queue = queue[1:]
102
103		commit, err := repo.ReadCommit(hash)
104		if err != nil {
105			return nil, err
106		}
107
108		BFSOrder = append(BFSOrder, commit)
109
110		for _, parent := range commit.Parents {
111			if _, ok := visited[parent]; !ok {
112				queue = append(queue, parent)
113				// mark as visited
114				visited[parent] = struct{}{}
115			}
116		}
117	}
118
119	// Now, we can reverse this topological order and read the commits in an order where
120	// we are sure to have read all the chronological ancestors when we read a commit.
121
122	// Next step is to:
123	// 1) read the operationPacks
124	// 2) make sure that clocks causality respect the DAG topology.
125
126	oppMap := make(map[repository.Hash]*operationPack)
127	var opsCount int
128
129	for i := len(BFSOrder) - 1; i >= 0; i-- {
130		commit := BFSOrder[i]
131		isFirstCommit := i == len(BFSOrder)-1
132		isMerge := len(commit.Parents) > 1
133
134		// Verify DAG structure: single chronological root, so only the root
135		// can have no parents. Said otherwise, the DAG need to have exactly
136		// one leaf.
137		if !isFirstCommit && len(commit.Parents) == 0 {
138			return nil, fmt.Errorf("multiple leafs in the entity DAG")
139		}
140
141		opp, err := readOperationPack(def, repo, resolvers, commit)
142		if err != nil {
143			return nil, err
144		}
145
146		err = opp.Validate()
147		if err != nil {
148			return nil, err
149		}
150
151		if isMerge && len(opp.Operations) > 0 {
152			return nil, fmt.Errorf("merge commit cannot have operations")
153		}
154
155		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
156		if isFirstCommit && opp.CreateTime <= 0 {
157			return nil, fmt.Errorf("creation lamport time not set")
158		}
159
160		// make sure that the lamport clocks causality match the DAG topology
161		for _, parentHash := range commit.Parents {
162			parentPack, ok := oppMap[parentHash]
163			if !ok {
164				panic("DFS failed")
165			}
166
167			if parentPack.EditTime >= opp.EditTime {
168				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
169			}
170
171			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
172			// that the clocks don't jump too far in the future
173			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
174			// as long as there is one valid chain of small hops, it's fine.
175			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
176				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
177			}
178		}
179
180		oppMap[commit.Hash] = opp
181		opsCount += len(opp.Operations)
182	}
183
184	// The clocks are fine, we witness them
185	for _, opp := range oppMap {
186		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
187		if err != nil {
188			return nil, err
189		}
190		err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
191		if err != nil {
192			return nil, err
193		}
194	}
195
196	// Now that we know that the topological order and clocks are fine, we order the operationPacks
197	// based on the logical clocks, entirely ignoring the DAG topology
198
199	oppSlice := make([]*operationPack, 0, len(oppMap))
200	for _, pack := range oppMap {
201		oppSlice = append(oppSlice, pack)
202	}
203	sort.Slice(oppSlice, func(i, j int) bool {
204		// Primary ordering with the EditTime.
205		if oppSlice[i].EditTime != oppSlice[j].EditTime {
206			return oppSlice[i].EditTime < oppSlice[j].EditTime
207		}
208		// We have equal EditTime, which means we have concurrent edition over different machines, and we
209		// can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
210		// As a secondary ordering, we can order based on a hash of the serialized Operations in the
211		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
212		// This is a lexicographic ordering on the stringified ID.
213		return oppSlice[i].Id() < oppSlice[j].Id()
214	})
215
216	// Now that we ordered the operationPacks, we have the order of the Operations
217
218	ops := make([]Operation, 0, opsCount)
219	var createTime lamport.Time
220	var editTime lamport.Time
221	for _, pack := range oppSlice {
222		for _, operation := range pack.Operations {
223			ops = append(ops, operation)
224		}
225		if pack.CreateTime > createTime {
226			createTime = pack.CreateTime
227		}
228		if pack.EditTime > editTime {
229			editTime = pack.EditTime
230		}
231	}
232
233	return &Entity{
234		Definition: def,
235		ops:        ops,
236		lastCommit: rootHash,
237		createTime: createTime,
238		editTime:   editTime,
239	}, nil
240}
241
242// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
243// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
244// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
245// operation blobs can be implemented instead.
246func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
247	rootHash, err := repo.ResolveRef(ref)
248	if err != nil {
249		return err
250	}
251
252	commit, err := repo.ReadCommit(rootHash)
253	if err != nil {
254		return err
255	}
256
257	createTime, editTime, err := readOperationPackClock(repo, commit)
258	if err != nil {
259		return err
260	}
261
262	// if we have more than one commit, we need to find the root to have the create time
263	if len(commit.Parents) > 0 {
264		for len(commit.Parents) > 0 {
265			// The path to the root is irrelevant.
266			commit, err = repo.ReadCommit(commit.Parents[0])
267			if err != nil {
268				return err
269			}
270		}
271		createTime, _, err = readOperationPackClock(repo, commit)
272		if err != nil {
273			return err
274		}
275	}
276
277	if createTime <= 0 {
278		return fmt.Errorf("creation lamport time not set")
279	}
280	if editTime <= 0 {
281		return fmt.Errorf("creation lamport time not set")
282	}
283	err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
284	if err != nil {
285		return err
286	}
287	err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
288	if err != nil {
289		return err
290	}
291	return nil
292}
293
294type StreamedEntity struct {
295	Entity *Entity
296	Err    error
297}
298
299// ReadAll read and parse all local Entity
300func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity {
301	out := make(chan StreamedEntity)
302
303	go func() {
304		defer close(out)
305
306		refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
307
308		refs, err := repo.ListRefs(refPrefix)
309		if err != nil {
310			out <- StreamedEntity{Err: err}
311			return
312		}
313
314		for _, ref := range refs {
315			e, err := read(def, repo, resolvers, ref)
316
317			if err != nil {
318				out <- StreamedEntity{Err: err}
319				return
320			}
321
322			out <- StreamedEntity{Entity: e}
323		}
324	}()
325
326	return out
327}
328
329// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
330// repo end up with correct clocks for the next write.
331func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
332	refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
333
334	refs, err := repo.ListRefs(refPrefix)
335	if err != nil {
336		return err
337	}
338
339	for _, ref := range refs {
340		err = readClockNoCheck(def, repo, ref)
341		if err != nil {
342			return err
343		}
344	}
345
346	return nil
347}
348
349// Id return the Entity identifier
350func (e *Entity) Id() entity.Id {
351	// id is the id of the first operation
352	return e.FirstOp().Id()
353}
354
355// Validate check if the Entity data is valid
356func (e *Entity) Validate() error {
357	// non-empty
358	if len(e.ops) == 0 && len(e.staging) == 0 {
359		return fmt.Errorf("entity has no operations")
360	}
361
362	// check if each operations are valid
363	for _, op := range e.ops {
364		if err := op.Validate(); err != nil {
365			return err
366		}
367	}
368
369	// check if staging is valid if needed
370	for _, op := range e.staging {
371		if err := op.Validate(); err != nil {
372			return err
373		}
374	}
375
376	// Check that there is no colliding operation's ID
377	ids := make(map[entity.Id]struct{})
378	for _, op := range e.Operations() {
379		if _, ok := ids[op.Id()]; ok {
380			return fmt.Errorf("id collision: %s", op.Id())
381		}
382		ids[op.Id()] = struct{}{}
383	}
384
385	return nil
386}
387
388// Operations return the ordered operations
389func (e *Entity) Operations() []Operation {
390	return append(e.ops, e.staging...)
391}
392
393// FirstOp lookup for the very first operation of the Entity
394func (e *Entity) FirstOp() Operation {
395	for _, op := range e.ops {
396		return op
397	}
398	for _, op := range e.staging {
399		return op
400	}
401	return nil
402}
403
404// LastOp lookup for the very last operation of the Entity
405func (e *Entity) LastOp() Operation {
406	if len(e.staging) > 0 {
407		return e.staging[len(e.staging)-1]
408	}
409	if len(e.ops) > 0 {
410		return e.ops[len(e.ops)-1]
411	}
412	return nil
413}
414
415// Append add a new Operation to the Entity
416func (e *Entity) Append(op Operation) {
417	e.staging = append(e.staging, op)
418}
419
420// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
421func (e *Entity) NeedCommit() bool {
422	return len(e.staging) > 0
423}
424
425// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
426// is already in sync with the repository.
427func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
428	if e.NeedCommit() {
429		return e.Commit(repo)
430	}
431	return nil
432}
433
434// Commit write the appended operations in the repository
435func (e *Entity) Commit(repo repository.ClockedRepo) error {
436	if !e.NeedCommit() {
437		return fmt.Errorf("can't commit an entity with no pending operation")
438	}
439
440	err := e.Validate()
441	if err != nil {
442		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
443	}
444
445	for len(e.staging) > 0 {
446		var author identity.Interface
447		var toCommit []Operation
448
449		// Split into chunks with the same author
450		for len(e.staging) > 0 {
451			op := e.staging[0]
452			if author != nil && op.Author().Id() != author.Id() {
453				break
454			}
455			author = e.staging[0].Author()
456			toCommit = append(toCommit, op)
457			e.staging = e.staging[1:]
458		}
459
460		e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
461		if err != nil {
462			return err
463		}
464
465		opp := &operationPack{
466			Author:     author,
467			Operations: toCommit,
468			EditTime:   e.editTime,
469		}
470
471		if e.lastCommit == "" {
472			e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
473			if err != nil {
474				return err
475			}
476			opp.CreateTime = e.createTime
477		}
478
479		var parentCommit []repository.Hash
480		if e.lastCommit != "" {
481			parentCommit = []repository.Hash{e.lastCommit}
482		}
483
484		commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
485		if err != nil {
486			return err
487		}
488
489		e.lastCommit = commitHash
490		e.ops = append(e.ops, toCommit...)
491	}
492
493	// not strictly necessary but make equality testing easier in tests
494	e.staging = nil
495
496	// Create or update the Git reference for this entity
497	// When pushing later, the remote will ensure that this ref update
498	// is fast-forward, that is no data has been overwritten.
499	ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
500	return repo.UpdateRef(ref, e.lastCommit)
501}
502
503// CreateLamportTime return the Lamport time of creation
504func (e *Entity) CreateLamportTime() lamport.Time {
505	return e.createTime
506}
507
508// EditLamportTime return the Lamport time of the last edition
509func (e *Entity) EditLamportTime() lamport.Time {
510	return e.editTime
511}