entity.go

  1package entity
  2
  3import (
  4	"encoding/json"
  5	"fmt"
  6	"sort"
  7
  8	"github.com/pkg/errors"
  9
 10	"github.com/MichaelMure/git-bug/repository"
 11	"github.com/MichaelMure/git-bug/util/lamport"
 12)
 13
 14const refsPattern = "refs/%s/%s"
 15const creationClockPattern = "%s-create"
 16const editClockPattern = "%s-edit"
 17
 18type Operation interface {
 19	Id() Id
 20	// MarshalJSON() ([]byte, error)
 21	Validate() error
 22}
 23
 24// Definition hold the details defining one specialization of an Entity.
 25type Definition struct {
 26	// the name of the entity (bug, pull-request, ...)
 27	typename string
 28	// the namespace in git (bugs, prs, ...)
 29	namespace string
 30	// a function decoding a JSON message into an Operation
 31	operationUnmarshaler func(raw json.RawMessage) (Operation, error)
 32	// the expected format version number
 33	formatVersion uint
 34}
 35
 36type Entity struct {
 37	Definition
 38
 39	ops     []Operation
 40	staging []Operation
 41
 42	packClock  lamport.Clock
 43	lastCommit repository.Hash
 44}
 45
 46func New(definition Definition) *Entity {
 47	return &Entity{
 48		Definition: definition,
 49		packClock:  lamport.NewMemClock(),
 50	}
 51}
 52
 53func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
 54	if err := id.Validate(); err != nil {
 55		return nil, errors.Wrap(err, "invalid id")
 56	}
 57
 58	ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
 59
 60	return read(def, repo, ref)
 61}
 62
 63// read fetch from git and decode an Entity at an arbitrary git reference.
 64func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
 65	rootHash, err := repo.ResolveRef(ref)
 66	if err != nil {
 67		return nil, err
 68	}
 69
 70	// Perform a depth-first search to get a topological order of the DAG where we discover the
 71	// parents commit and go back in time up to the chronological root
 72
 73	stack := make([]repository.Hash, 0, 32)
 74	visited := make(map[repository.Hash]struct{})
 75	DFSOrder := make([]repository.Commit, 0, 32)
 76
 77	stack = append(stack, rootHash)
 78
 79	for len(stack) > 0 {
 80		// pop
 81		hash := stack[len(stack)-1]
 82		stack = stack[:len(stack)-1]
 83
 84		if _, ok := visited[hash]; ok {
 85			continue
 86		}
 87
 88		// mark as visited
 89		visited[hash] = struct{}{}
 90
 91		commit, err := repo.ReadCommit(hash)
 92		if err != nil {
 93			return nil, err
 94		}
 95
 96		DFSOrder = append(DFSOrder, commit)
 97
 98		for _, parent := range commit.Parents {
 99			stack = append(stack, parent)
100		}
101	}
102
103	// Now, we can reverse this topological order and read the commits in an order where
104	// we are sure to have read all the chronological ancestors when we read a commit.
105
106	// Next step is to:
107	// 1) read the operationPacks
108	// 2) make sure that the clocks causality respect the DAG topology.
109
110	oppMap := make(map[repository.Hash]*operationPack)
111	var opsCount int
112	var packClock = lamport.NewMemClock()
113
114	for i := len(DFSOrder) - 1; i >= 0; i-- {
115		commit := DFSOrder[i]
116		firstCommit := i == len(DFSOrder)-1
117
118		// Verify DAG structure: single chronological root, so only the root
119		// can have no parents
120		if !firstCommit && len(commit.Parents) == 0 {
121			return nil, fmt.Errorf("multiple root in the entity DAG")
122		}
123
124		opp, err := readOperationPack(def, repo, commit.TreeHash)
125		if err != nil {
126			return nil, err
127		}
128
129		// Check that the lamport clocks are set
130		if firstCommit && opp.CreateTime <= 0 {
131			return nil, fmt.Errorf("creation lamport time not set")
132		}
133		if opp.EditTime <= 0 {
134			return nil, fmt.Errorf("edition lamport time not set")
135		}
136		if opp.PackTime <= 0 {
137			return nil, fmt.Errorf("pack lamport time not set")
138		}
139
140		// make sure that the lamport clocks causality match the DAG topology
141		for _, parentHash := range commit.Parents {
142			parentPack, ok := oppMap[parentHash]
143			if !ok {
144				panic("DFS failed")
145			}
146
147			if parentPack.EditTime >= opp.EditTime {
148				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
149			}
150
151			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
152			// that the clocks don't jump too far in the future
153			if opp.EditTime-parentPack.EditTime > 10_000 {
154				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
155			}
156		}
157
158		oppMap[commit.Hash] = opp
159		opsCount += len(opp.Operations)
160	}
161
162	// The clocks are fine, we witness them
163	for _, opp := range oppMap {
164		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
165		if err != nil {
166			return nil, err
167		}
168		err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
169		if err != nil {
170			return nil, err
171		}
172		err = packClock.Witness(opp.PackTime)
173		if err != nil {
174			return nil, err
175		}
176	}
177
178	// Now that we know that the topological order and clocks are fine, we order the operationPacks
179	// based on the logical clocks, entirely ignoring the DAG topology
180
181	oppSlice := make([]*operationPack, 0, len(oppMap))
182	for _, pack := range oppMap {
183		oppSlice = append(oppSlice, pack)
184	}
185	sort.Slice(oppSlice, func(i, j int) bool {
186		// Primary ordering with the dedicated "pack" Lamport time that encode causality
187		// within the entity
188		if oppSlice[i].PackTime != oppSlice[j].PackTime {
189			return oppSlice[i].PackTime < oppSlice[i].PackTime
190		}
191		// We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
192		// came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
193		// enough but it can give us an edge to approach what really happened.
194		if oppSlice[i].EditTime != oppSlice[j].EditTime {
195			return oppSlice[i].EditTime < oppSlice[j].EditTime
196		}
197		// Well, what now? We still need a total ordering, the most stable possible.
198		// As a last resort, we can order based on a hash of the serialized Operations in the
199		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
200		// This is a lexicographic ordering.
201		return oppSlice[i].Id < oppSlice[j].Id
202	})
203
204	// Now that we ordered the operationPacks, we have the order of the Operations
205
206	ops := make([]Operation, 0, opsCount)
207	for _, pack := range oppSlice {
208		for _, operation := range pack.Operations {
209			ops = append(ops, operation)
210		}
211	}
212
213	return &Entity{
214		Definition: def,
215		ops:        ops,
216		lastCommit: rootHash,
217	}, nil
218}
219
220// Id return the Entity identifier
221func (e *Entity) Id() Id {
222	// id is the id of the first operation
223	return e.FirstOp().Id()
224}
225
226func (e *Entity) Validate() error {
227	// non-empty
228	if len(e.ops) == 0 && len(e.staging) == 0 {
229		return fmt.Errorf("entity has no operations")
230	}
231
232	// check if each operations are valid
233	for _, op := range e.ops {
234		if err := op.Validate(); err != nil {
235			return err
236		}
237	}
238
239	// check if staging is valid if needed
240	for _, op := range e.staging {
241		if err := op.Validate(); err != nil {
242			return err
243		}
244	}
245
246	// Check that there is no colliding operation's ID
247	ids := make(map[Id]struct{})
248	for _, op := range e.Operations() {
249		if _, ok := ids[op.Id()]; ok {
250			return fmt.Errorf("id collision: %s", op.Id())
251		}
252		ids[op.Id()] = struct{}{}
253	}
254
255	return nil
256}
257
258// return the ordered operations
259func (e *Entity) Operations() []Operation {
260	return append(e.ops, e.staging...)
261}
262
263// Lookup for the very first operation of the Entity.
264func (e *Entity) FirstOp() Operation {
265	for _, op := range e.ops {
266		return op
267	}
268	for _, op := range e.staging {
269		return op
270	}
271	return nil
272}
273
274func (e *Entity) Append(op Operation) {
275	e.staging = append(e.staging, op)
276}
277
278func (e *Entity) NeedCommit() bool {
279	return len(e.staging) > 0
280}
281
282func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
283	if e.NeedCommit() {
284		return e.Commit(repo)
285	}
286	return nil
287}
288
289// TODO: support commit signature
290func (e *Entity) Commit(repo repository.ClockedRepo) error {
291	if !e.NeedCommit() {
292		return fmt.Errorf("can't commit an entity with no pending operation")
293	}
294
295	if err := e.Validate(); err != nil {
296		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
297	}
298
299	// increment the various clocks for this new operationPack
300	packTime, err := e.packClock.Increment()
301	if err != nil {
302		return err
303	}
304	editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
305	if err != nil {
306		return err
307	}
308	var creationTime lamport.Time
309	if e.lastCommit == "" {
310		creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
311		if err != nil {
312			return err
313		}
314	}
315
316	opp := &operationPack{
317		Operations: e.staging,
318		CreateTime: creationTime,
319		EditTime:   editTime,
320		PackTime:   packTime,
321	}
322
323	treeHash, err := opp.write(e.Definition, repo)
324	if err != nil {
325		return err
326	}
327
328	// Write a Git commit referencing the tree, with the previous commit as parent
329	var commitHash repository.Hash
330	if e.lastCommit != "" {
331		commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit)
332	} else {
333		commitHash, err = repo.StoreCommit(treeHash)
334	}
335	if err != nil {
336		return err
337	}
338
339	e.lastCommit = commitHash
340	e.ops = append(e.ops, e.staging...)
341	e.staging = nil
342
343	// Create or update the Git reference for this entity
344	// When pushing later, the remote will ensure that this ref update
345	// is fast-forward, that is no data has been overwritten.
346	ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
347	return repo.UpdateRef(ref, commitHash)
348}