entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entities/identity"
 13	"github.com/MichaelMure/git-bug/entity"
 14	"github.com/MichaelMure/git-bug/repository"
 15	"github.com/MichaelMure/git-bug/util/lamport"
 16)
 17
 18const refsPattern = "refs/%s/%s"
 19const creationClockPattern = "%s-create"
 20const editClockPattern = "%s-edit"
 21
 22type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
 23
 24// Definition hold the details defining one specialization of an Entity.
 25type Definition struct {
 26	// the name of the entity (bug, pull-request, ...), for human consumption
 27	Typename string
 28	// the Namespace in git references (bugs, prs, ...)
 29	Namespace string
 30	// a function decoding a JSON message into an Operation
 31	OperationUnmarshaler OperationUnmarshaler
 32	// the expected format version number, that can be used for data migration/upgrade
 33	FormatVersion uint
 34}
 35
 36type Actions[EntityT entity.Interface] struct {
 37	Wrap             func(e *Entity) EntityT
 38	New              func() EntityT
 39	Read             func(repo repository.ClockedRepo, id entity.Id) (EntityT, error)
 40	ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
 41	ReadAll          func(repo repository.ClockedRepo) <-chan StreamedEntity[EntityT]
 42	ListLocalIds     func(repo repository.Repo) ([]entity.Id, error)
 43	Fetch            func(repo repository.Repo, remote string) (string, error)
 44	Push             func(repo repository.Repo, remote string) (string, error)
 45	Pull             func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) error
 46	MergeAll         func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
 47}
 48
 49// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 50type Entity struct {
 51	// A Lamport clock is a logical clock that allow to order event
 52	// inside a distributed system.
 53	// It must be the first field in this struct due to https://github.com/golang/go/issues/36606
 54	createTime lamport.Time
 55	editTime   lamport.Time
 56
 57	Definition
 58
 59	// operations that are already stored in the repository
 60	ops []Operation
 61	// operations not yet stored in the repository
 62	staging []Operation
 63
 64	lastCommit repository.Hash
 65}
 66
 67// New create an empty Entity
 68func New(definition Definition) *Entity {
 69	return &Entity{
 70		Definition: definition,
 71	}
 72}
 73
 74// Read will read and decode a stored local Entity from a repository
 75func Read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
 76	if err := id.Validate(); err != nil {
 77		return *new(EntityT), errors.Wrap(err, "invalid id")
 78	}
 79
 80	ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
 81
 82	return read[EntityT](def, wrapper, repo, resolvers, ref)
 83}
 84
 85// readRemote will read and decode a stored remote Entity from a repository
 86func readRemote[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
 87	if err := id.Validate(); err != nil {
 88		return *new(EntityT), errors.Wrap(err, "invalid id")
 89	}
 90
 91	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
 92
 93	return read[EntityT](def, wrapper, repo, resolvers, ref)
 94}
 95
 96// read fetch from git and decode an Entity at an arbitrary git reference.
 97func read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
 98	rootHash, err := repo.ResolveRef(ref)
 99	if err != nil {
100		return *new(EntityT), err
101	}
102
103	// Perform a breadth-first search to get a topological order of the DAG where we discover the
104	// parents commit and go back in time up to the chronological root
105
106	queue := make([]repository.Hash, 0, 32)
107	visited := make(map[repository.Hash]struct{})
108	BFSOrder := make([]repository.Commit, 0, 32)
109
110	queue = append(queue, rootHash)
111	visited[rootHash] = struct{}{}
112
113	for len(queue) > 0 {
114		// pop
115		hash := queue[0]
116		queue = queue[1:]
117
118		commit, err := repo.ReadCommit(hash)
119		if err != nil {
120			return *new(EntityT), err
121		}
122
123		BFSOrder = append(BFSOrder, commit)
124
125		for _, parent := range commit.Parents {
126			if _, ok := visited[parent]; !ok {
127				queue = append(queue, parent)
128				// mark as visited
129				visited[parent] = struct{}{}
130			}
131		}
132	}
133
134	// Now, we can reverse this topological order and read the commits in an order where
135	// we are sure to have read all the chronological ancestors when we read a commit.
136
137	// Next step is to:
138	// 1) read the operationPacks
139	// 2) make sure that clocks causality respect the DAG topology.
140
141	oppMap := make(map[repository.Hash]*operationPack)
142	var opsCount int
143
144	for i := len(BFSOrder) - 1; i >= 0; i-- {
145		commit := BFSOrder[i]
146		isFirstCommit := i == len(BFSOrder)-1
147		isMerge := len(commit.Parents) > 1
148
149		// Verify DAG structure: single chronological root, so only the root
150		// can have no parents. Said otherwise, the DAG need to have exactly
151		// one leaf.
152		if !isFirstCommit && len(commit.Parents) == 0 {
153			return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
154		}
155
156		opp, err := readOperationPack(def, repo, resolvers, commit)
157		if err != nil {
158			return *new(EntityT), err
159		}
160
161		err = opp.Validate()
162		if err != nil {
163			return *new(EntityT), err
164		}
165
166		if isMerge && len(opp.Operations) > 0 {
167			return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
168		}
169
170		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
171		if isFirstCommit && opp.CreateTime <= 0 {
172			return *new(EntityT), fmt.Errorf("creation lamport time not set")
173		}
174
175		// make sure that the lamport clocks causality match the DAG topology
176		for _, parentHash := range commit.Parents {
177			parentPack, ok := oppMap[parentHash]
178			if !ok {
179				panic("DFS failed")
180			}
181
182			if parentPack.EditTime >= opp.EditTime {
183				return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
184			}
185
186			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
187			// that the clocks don't jump too far in the future
188			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
189			// as long as there is one valid chain of small hops, it's fine.
190			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
191				return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
192			}
193		}
194
195		oppMap[commit.Hash] = opp
196		opsCount += len(opp.Operations)
197	}
198
199	// The clocks are fine, we witness them
200	for _, opp := range oppMap {
201		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
202		if err != nil {
203			return *new(EntityT), err
204		}
205		err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
206		if err != nil {
207			return *new(EntityT), err
208		}
209	}
210
211	// Now that we know that the topological order and clocks are fine, we order the operationPacks
212	// based on the logical clocks, entirely ignoring the DAG topology
213
214	oppSlice := make([]*operationPack, 0, len(oppMap))
215	for _, pack := range oppMap {
216		oppSlice = append(oppSlice, pack)
217	}
218	sort.Slice(oppSlice, func(i, j int) bool {
219		// Primary ordering with the EditTime.
220		if oppSlice[i].EditTime != oppSlice[j].EditTime {
221			return oppSlice[i].EditTime < oppSlice[j].EditTime
222		}
223		// We have equal EditTime, which means we have concurrent edition over different machines, and we
224		// can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
225		// As a secondary ordering, we can order based on a hash of the serialized Operations in the
226		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
227		// This is a lexicographic ordering on the stringified ID.
228		return oppSlice[i].Id() < oppSlice[j].Id()
229	})
230
231	// Now that we ordered the operationPacks, we have the order of the Operations
232
233	ops := make([]Operation, 0, opsCount)
234	var createTime lamport.Time
235	var editTime lamport.Time
236	for _, pack := range oppSlice {
237		for _, operation := range pack.Operations {
238			ops = append(ops, operation)
239		}
240		if pack.CreateTime > createTime {
241			createTime = pack.CreateTime
242		}
243		if pack.EditTime > editTime {
244			editTime = pack.EditTime
245		}
246	}
247
248	return wrapper(&Entity{
249		Definition: def,
250		ops:        ops,
251		lastCommit: rootHash,
252		createTime: createTime,
253		editTime:   editTime,
254	}), nil
255}
256
257// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
258// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
259// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
260// operation blobs can be implemented instead.
261func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
262	rootHash, err := repo.ResolveRef(ref)
263	if err != nil {
264		return err
265	}
266
267	commit, err := repo.ReadCommit(rootHash)
268	if err != nil {
269		return err
270	}
271
272	createTime, editTime, err := readOperationPackClock(repo, commit)
273	if err != nil {
274		return err
275	}
276
277	// if we have more than one commit, we need to find the root to have the create time
278	if len(commit.Parents) > 0 {
279		for len(commit.Parents) > 0 {
280			// The path to the root is irrelevant.
281			commit, err = repo.ReadCommit(commit.Parents[0])
282			if err != nil {
283				return err
284			}
285		}
286		createTime, _, err = readOperationPackClock(repo, commit)
287		if err != nil {
288			return err
289		}
290	}
291
292	if createTime <= 0 {
293		return fmt.Errorf("creation lamport time not set")
294	}
295	if editTime <= 0 {
296		return fmt.Errorf("creation lamport time not set")
297	}
298	err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
299	if err != nil {
300		return err
301	}
302	err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
303	if err != nil {
304		return err
305	}
306	return nil
307}
308
309type StreamedEntity[EntityT entity.Interface] struct {
310	Entity EntityT
311	Err    error
312}
313
314// ReadAll read and parse all local Entity
315func ReadAll[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity[EntityT] {
316	out := make(chan StreamedEntity[EntityT])
317
318	go func() {
319		defer close(out)
320
321		refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
322
323		refs, err := repo.ListRefs(refPrefix)
324		if err != nil {
325			out <- StreamedEntity[EntityT]{Err: err}
326			return
327		}
328
329		for _, ref := range refs {
330			e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
331
332			if err != nil {
333				out <- StreamedEntity[EntityT]{Err: err}
334				return
335			}
336
337			out <- StreamedEntity[EntityT]{Entity: e}
338		}
339	}()
340
341	return out
342}
343
344// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
345// repo end up with correct clocks for the next write.
346func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
347	refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
348
349	refs, err := repo.ListRefs(refPrefix)
350	if err != nil {
351		return err
352	}
353
354	for _, ref := range refs {
355		err = readClockNoCheck(def, repo, ref)
356		if err != nil {
357			return err
358		}
359	}
360
361	return nil
362}
363
364// Id return the Entity identifier
365func (e *Entity) Id() entity.Id {
366	// id is the id of the first operation
367	return e.FirstOp().Id()
368}
369
370// Validate check if the Entity data is valid
371func (e *Entity) Validate() error {
372	// non-empty
373	if len(e.ops) == 0 && len(e.staging) == 0 {
374		return fmt.Errorf("entity has no operations")
375	}
376
377	// check if each operation are valid
378	for _, op := range e.ops {
379		if err := op.Validate(); err != nil {
380			return err
381		}
382	}
383
384	// check if staging is valid if needed
385	for _, op := range e.staging {
386		if err := op.Validate(); err != nil {
387			return err
388		}
389	}
390
391	// Check that there is no colliding operation's ID
392	ids := make(map[entity.Id]struct{})
393	for _, op := range e.Operations() {
394		if _, ok := ids[op.Id()]; ok {
395			return fmt.Errorf("id collision: %s", op.Id())
396		}
397		ids[op.Id()] = struct{}{}
398	}
399
400	return nil
401}
402
403// Operations return the ordered operations
404func (e *Entity) Operations() []Operation {
405	return append(e.ops, e.staging...)
406}
407
408// FirstOp lookup for the very first operation of the Entity
409func (e *Entity) FirstOp() Operation {
410	for _, op := range e.ops {
411		return op
412	}
413	for _, op := range e.staging {
414		return op
415	}
416	return nil
417}
418
419// LastOp lookup for the very last operation of the Entity
420func (e *Entity) LastOp() Operation {
421	if len(e.staging) > 0 {
422		return e.staging[len(e.staging)-1]
423	}
424	if len(e.ops) > 0 {
425		return e.ops[len(e.ops)-1]
426	}
427	return nil
428}
429
430// Append add a new Operation to the Entity
431func (e *Entity) Append(op Operation) {
432	e.staging = append(e.staging, op)
433}
434
435// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
436func (e *Entity) NeedCommit() bool {
437	return len(e.staging) > 0
438}
439
440// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
441// is already in sync with the repository.
442func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
443	if e.NeedCommit() {
444		return e.Commit(repo)
445	}
446	return nil
447}
448
449// Commit write the appended operations in the repository
450func (e *Entity) Commit(repo repository.ClockedRepo) error {
451	if !e.NeedCommit() {
452		return fmt.Errorf("can't commit an entity with no pending operation")
453	}
454
455	err := e.Validate()
456	if err != nil {
457		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
458	}
459
460	for len(e.staging) > 0 {
461		var author identity.Interface
462		var toCommit []Operation
463
464		// Split into chunks with the same author
465		for len(e.staging) > 0 {
466			op := e.staging[0]
467			if author != nil && op.Author().Id() != author.Id() {
468				break
469			}
470			author = e.staging[0].Author()
471			toCommit = append(toCommit, op)
472			e.staging = e.staging[1:]
473		}
474
475		e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
476		if err != nil {
477			return err
478		}
479
480		opp := &operationPack{
481			Author:     author,
482			Operations: toCommit,
483			EditTime:   e.editTime,
484		}
485
486		if e.lastCommit == "" {
487			e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
488			if err != nil {
489				return err
490			}
491			opp.CreateTime = e.createTime
492		}
493
494		var parentCommit []repository.Hash
495		if e.lastCommit != "" {
496			parentCommit = []repository.Hash{e.lastCommit}
497		}
498
499		commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
500		if err != nil {
501			return err
502		}
503
504		e.lastCommit = commitHash
505		e.ops = append(e.ops, toCommit...)
506	}
507
508	// not strictly necessary but make equality testing easier in tests
509	e.staging = nil
510
511	// Create or update the Git reference for this entity
512	// When pushing later, the remote will ensure that this ref update
513	// is fast-forward, that is no data has been overwritten.
514	ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
515	return repo.UpdateRef(ref, e.lastCommit)
516}
517
518// CreateLamportTime return the Lamport time of creation
519func (e *Entity) CreateLamportTime() lamport.Time {
520	return e.createTime
521}
522
523// EditLamportTime return the Lamport time of the last edition
524func (e *Entity) EditLamportTime() lamport.Time {
525	return e.editTime
526}