entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entity"
 13	"github.com/MichaelMure/git-bug/identity"
 14	"github.com/MichaelMure/git-bug/repository"
 15	"github.com/MichaelMure/git-bug/util/lamport"
 16)
 17
 18const refsPattern = "refs/%s/%s"
 19const creationClockPattern = "%s-create"
 20const editClockPattern = "%s-edit"
 21
 22// Definition hold the details defining one specialization of an Entity.
 23type Definition struct {
 24	// the name of the entity (bug, pull-request, ...)
 25	typename string
 26	// the namespace in git (bugs, prs, ...)
 27	namespace string
 28	// a function decoding a JSON message into an Operation
 29	operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
 30	// a function loading an identity.Identity from its Id
 31	identityResolver identity.Resolver
 32	// the expected format version number, that can be used for data migration/upgrade
 33	formatVersion uint
 34}
 35
 36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 37type Entity struct {
 38	Definition
 39
 40	// operations that are already stored in the repository
 41	ops []Operation
 42	// operations not yet stored in the repository
 43	staging []Operation
 44
 45	// TODO: add here createTime and editTime
 46
 47	// // TODO: doesn't seems to actually be useful over the topological sort ? Timestamp can be generated from graph depth
 48	// // TODO: maybe EditTime is better because it could spread ops in consecutive groups on the logical timeline --> avoid interleaving
 49	// packClock  lamport.Clock
 50	lastCommit repository.Hash
 51}
 52
 53// New create an empty Entity
 54func New(definition Definition) *Entity {
 55	return &Entity{
 56		Definition: definition,
 57		// packClock:  lamport.NewMemClock(),
 58	}
 59}
 60
 61// Read will read and decode a stored Entity from a repository
 62func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
 63	if err := id.Validate(); err != nil {
 64		return nil, errors.Wrap(err, "invalid id")
 65	}
 66
 67	ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
 68
 69	return read(def, repo, ref)
 70}
 71
 72// read fetch from git and decode an Entity at an arbitrary git reference.
 73func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
 74	rootHash, err := repo.ResolveRef(ref)
 75	if err != nil {
 76		return nil, err
 77	}
 78
 79	// Perform a depth-first search to get a topological order of the DAG where we discover the
 80	// parents commit and go back in time up to the chronological root
 81
 82	stack := make([]repository.Hash, 0, 32)
 83	visited := make(map[repository.Hash]struct{})
 84	DFSOrder := make([]repository.Commit, 0, 32)
 85
 86	stack = append(stack, rootHash)
 87
 88	for len(stack) > 0 {
 89		// pop
 90		hash := stack[len(stack)-1]
 91		stack = stack[:len(stack)-1]
 92
 93		if _, ok := visited[hash]; ok {
 94			continue
 95		}
 96
 97		// mark as visited
 98		visited[hash] = struct{}{}
 99
100		commit, err := repo.ReadCommit(hash)
101		if err != nil {
102			return nil, err
103		}
104
105		DFSOrder = append(DFSOrder, commit)
106
107		for _, parent := range commit.Parents {
108			stack = append(stack, parent)
109		}
110	}
111
112	// Now, we can reverse this topological order and read the commits in an order where
113	// we are sure to have read all the chronological ancestors when we read a commit.
114
115	// Next step is to:
116	// 1) read the operationPacks
117	// 2) make sure that the clocks causality respect the DAG topology.
118
119	oppMap := make(map[repository.Hash]*operationPack)
120	var opsCount int
121	// var packClock = lamport.NewMemClock()
122
123	for i := len(DFSOrder) - 1; i >= 0; i-- {
124		commit := DFSOrder[i]
125		isFirstCommit := i == len(DFSOrder)-1
126		isMerge := len(commit.Parents) > 1
127
128		// Verify DAG structure: single chronological root, so only the root
129		// can have no parents. Said otherwise, the DAG need to have exactly
130		// one leaf.
131		if !isFirstCommit && len(commit.Parents) == 0 {
132			return nil, fmt.Errorf("multiple leafs in the entity DAG")
133		}
134
135		opp, err := readOperationPack(def, repo, commit)
136		if err != nil {
137			return nil, err
138		}
139
140		err = opp.Validate()
141		if err != nil {
142			return nil, err
143		}
144
145		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
146		if isFirstCommit && opp.CreateTime <= 0 {
147			return nil, fmt.Errorf("creation lamport time not set")
148		}
149
150		// make sure that the lamport clocks causality match the DAG topology
151		for _, parentHash := range commit.Parents {
152			parentPack, ok := oppMap[parentHash]
153			if !ok {
154				panic("DFS failed")
155			}
156
157			if parentPack.EditTime >= opp.EditTime {
158				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
159			}
160
161			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
162			// that the clocks don't jump too far in the future
163			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
164			// as long as there is one valid chain of small hops, it's fine.
165			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
166				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
167			}
168
169			// TODO: PackTime is not checked
170		}
171
172		oppMap[commit.Hash] = opp
173		opsCount += len(opp.Operations)
174	}
175
176	// The clocks are fine, we witness them
177	for _, opp := range oppMap {
178		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
179		if err != nil {
180			return nil, err
181		}
182		err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
183		if err != nil {
184			return nil, err
185		}
186		// err = packClock.Witness(opp.PackTime)
187		// if err != nil {
188		// 	return nil, err
189		// }
190	}
191
192	// Now that we know that the topological order and clocks are fine, we order the operationPacks
193	// based on the logical clocks, entirely ignoring the DAG topology
194
195	oppSlice := make([]*operationPack, 0, len(oppMap))
196	for _, pack := range oppMap {
197		oppSlice = append(oppSlice, pack)
198	}
199	sort.Slice(oppSlice, func(i, j int) bool {
200		// Primary ordering with the dedicated "pack" Lamport time that encode causality
201		// within the entity
202		// if oppSlice[i].PackTime != oppSlice[j].PackTime {
203		// 	return oppSlice[i].PackTime < oppSlice[i].PackTime
204		// }
205		// We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
206		// came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
207		// enough but it can give us an edge to approach what really happened.
208		if oppSlice[i].EditTime != oppSlice[j].EditTime {
209			return oppSlice[i].EditTime < oppSlice[j].EditTime
210		}
211		// Well, what now? We still need a total ordering and the most stable possible.
212		// As a last resort, we can order based on a hash of the serialized Operations in the
213		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
214		// This is a lexicographic ordering on the stringified ID.
215		return oppSlice[i].Id() < oppSlice[j].Id()
216	})
217
218	// Now that we ordered the operationPacks, we have the order of the Operations
219
220	ops := make([]Operation, 0, opsCount)
221	for _, pack := range oppSlice {
222		for _, operation := range pack.Operations {
223			ops = append(ops, operation)
224		}
225	}
226
227	return &Entity{
228		Definition: def,
229		ops:        ops,
230		// packClock:  packClock,
231		lastCommit: rootHash,
232	}, nil
233}
234
235// Id return the Entity identifier
236func (e *Entity) Id() entity.Id {
237	// id is the id of the first operation
238	return e.FirstOp().Id()
239}
240
241// Validate check if the Entity data is valid
242func (e *Entity) Validate() error {
243	// non-empty
244	if len(e.ops) == 0 && len(e.staging) == 0 {
245		return fmt.Errorf("entity has no operations")
246	}
247
248	// check if each operations are valid
249	for _, op := range e.ops {
250		if err := op.Validate(); err != nil {
251			return err
252		}
253	}
254
255	// check if staging is valid if needed
256	for _, op := range e.staging {
257		if err := op.Validate(); err != nil {
258			return err
259		}
260	}
261
262	// Check that there is no colliding operation's ID
263	ids := make(map[entity.Id]struct{})
264	for _, op := range e.Operations() {
265		if _, ok := ids[op.Id()]; ok {
266			return fmt.Errorf("id collision: %s", op.Id())
267		}
268		ids[op.Id()] = struct{}{}
269	}
270
271	return nil
272}
273
274// Operations return the ordered operations
275func (e *Entity) Operations() []Operation {
276	return append(e.ops, e.staging...)
277}
278
279// FirstOp lookup for the very first operation of the Entity
280func (e *Entity) FirstOp() Operation {
281	for _, op := range e.ops {
282		return op
283	}
284	for _, op := range e.staging {
285		return op
286	}
287	return nil
288}
289
290// LastOp lookup for the very last operation of the Entity
291func (e *Entity) LastOp() Operation {
292	if len(e.staging) > 0 {
293		return e.staging[len(e.staging)-1]
294	}
295	if len(e.ops) > 0 {
296		return e.ops[len(e.ops)-1]
297	}
298	return nil
299}
300
301// Append add a new Operation to the Entity
302func (e *Entity) Append(op Operation) {
303	e.staging = append(e.staging, op)
304}
305
306// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
307func (e *Entity) NeedCommit() bool {
308	return len(e.staging) > 0
309}
310
311// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
312// is already in sync with the repository.
313func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
314	if e.NeedCommit() {
315		return e.Commit(repo)
316	}
317	return nil
318}
319
320// Commit write the appended operations in the repository
321// TODO: support commit signature
322func (e *Entity) Commit(repo repository.ClockedRepo) error {
323	if !e.NeedCommit() {
324		return fmt.Errorf("can't commit an entity with no pending operation")
325	}
326
327	if err := e.Validate(); err != nil {
328		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
329	}
330
331	var author identity.Interface
332	for _, op := range e.staging {
333		if author != nil && op.Author() != author {
334			return fmt.Errorf("operations with different author")
335		}
336		author = op.Author()
337	}
338
339	// increment the various clocks for this new operationPack
340	// packTime, err := e.packClock.Increment()
341	// if err != nil {
342	// 	return err
343	// }
344	editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
345	if err != nil {
346		return err
347	}
348	var creationTime lamport.Time
349	if e.lastCommit == "" {
350		creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
351		if err != nil {
352			return err
353		}
354	}
355
356	opp := &operationPack{
357		Author:     author,
358		Operations: e.staging,
359		CreateTime: creationTime,
360		EditTime:   editTime,
361		// PackTime:   packTime,
362	}
363
364	treeHash, err := opp.Write(e.Definition, repo)
365	if err != nil {
366		return err
367	}
368
369	// Write a Git commit referencing the tree, with the previous commit as parent
370	var commitHash repository.Hash
371	if e.lastCommit != "" {
372		commitHash, err = repo.StoreCommit(treeHash, e.lastCommit)
373	} else {
374		commitHash, err = repo.StoreCommit(treeHash)
375	}
376	if err != nil {
377		return err
378	}
379
380	e.lastCommit = commitHash
381	e.ops = append(e.ops, e.staging...)
382	e.staging = nil
383
384	// Create or update the Git reference for this entity
385	// When pushing later, the remote will ensure that this ref update
386	// is fast-forward, that is no data has been overwritten.
387	ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
388	return repo.UpdateRef(ref, commitHash)
389}