entity.go

  1package entity
  2
  3import (
  4	"encoding/json"
  5	"fmt"
  6	"sort"
  7
  8	"github.com/pkg/errors"
  9
 10	"github.com/MichaelMure/git-bug/repository"
 11	"github.com/MichaelMure/git-bug/util/lamport"
 12)
 13
 14const refsPattern = "refs/%s/%s"
 15const creationClockPattern = "%s-create"
 16const editClockPattern = "%s-edit"
 17
 18type Operation interface {
 19	Id() Id
 20	// MarshalJSON() ([]byte, error)
 21	Validate() error
 22}
 23
 24type OperationIterator struct {
 25}
 26
 27type Definition struct {
 28	// the name of the entity (bug, pull-request, ...)
 29	typename string
 30	// the namespace in git (bugs, prs, ...)
 31	namespace string
 32	// a function decoding a JSON message into an Operation
 33	operationUnmarshaler func(raw json.RawMessage) (Operation, error)
 34	// the expected format version number
 35	formatVersion uint
 36}
 37
 38type Entity struct {
 39	Definition
 40
 41	ops     []Operation
 42	staging []Operation
 43
 44	packClock  lamport.Clock
 45	lastCommit repository.Hash
 46}
 47
 48func New(definition Definition) *Entity {
 49	return &Entity{
 50		Definition: definition,
 51		packClock:  lamport.NewMemClock(),
 52	}
 53}
 54
 55func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
 56	if err := id.Validate(); err != nil {
 57		return nil, errors.Wrap(err, "invalid id")
 58	}
 59
 60	ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
 61
 62	rootHash, err := repo.ResolveRef(ref)
 63	if err != nil {
 64		return nil, err
 65	}
 66
 67	// Perform a depth-first search to get a topological order of the DAG where we discover the
 68	// parents commit and go back in time up to the chronological root
 69
 70	stack := make([]repository.Hash, 0, 32)
 71	visited := make(map[repository.Hash]struct{})
 72	DFSOrder := make([]repository.Commit, 0, 32)
 73
 74	stack = append(stack, rootHash)
 75
 76	for len(stack) > 0 {
 77		// pop
 78		hash := stack[len(stack)-1]
 79		stack = stack[:len(stack)-1]
 80
 81		if _, ok := visited[hash]; ok {
 82			continue
 83		}
 84
 85		// mark as visited
 86		visited[hash] = struct{}{}
 87
 88		commit, err := repo.ReadCommit(hash)
 89		if err != nil {
 90			return nil, err
 91		}
 92
 93		DFSOrder = append(DFSOrder, commit)
 94
 95		for _, parent := range commit.Parents {
 96			stack = append(stack, parent)
 97		}
 98	}
 99
100	// Now, we can reverse this topological order and read the commits in an order where
101	// we are sure to have read all the chronological ancestors when we read a commit.
102
103	// Next step is to:
104	// 1) read the operationPacks
105	// 2) make sure that the clocks causality respect the DAG topology.
106
107	oppMap := make(map[repository.Hash]*operationPack)
108	var opsCount int
109	var packClock = lamport.NewMemClock()
110
111	for i := len(DFSOrder) - 1; i >= 0; i-- {
112		commit := DFSOrder[i]
113		firstCommit := i == len(DFSOrder)-1
114
115		// Verify DAG structure: single chronological root, so only the root
116		// can have no parents
117		if !firstCommit && len(commit.Parents) == 0 {
118			return nil, fmt.Errorf("multiple root in the entity DAG")
119		}
120
121		opp, err := readOperationPack(def, repo, commit.TreeHash)
122		if err != nil {
123			return nil, err
124		}
125
126		// Check that the lamport clocks are set
127		if firstCommit && opp.CreateTime <= 0 {
128			return nil, fmt.Errorf("creation lamport time not set")
129		}
130		if opp.EditTime <= 0 {
131			return nil, fmt.Errorf("edition lamport time not set")
132		}
133		if opp.PackTime <= 0 {
134			return nil, fmt.Errorf("pack lamport time not set")
135		}
136
137		// make sure that the lamport clocks causality match the DAG topology
138		for _, parentHash := range commit.Parents {
139			parentPack, ok := oppMap[parentHash]
140			if !ok {
141				panic("DFS failed")
142			}
143
144			if parentPack.EditTime >= opp.EditTime {
145				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
146			}
147
148			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
149			// that the clocks don't jump too far in the future
150			if opp.EditTime-parentPack.EditTime > 10_000 {
151				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
152			}
153		}
154
155		oppMap[commit.Hash] = opp
156		opsCount += len(opp.Operations)
157	}
158
159	// The clocks are fine, we witness them
160	for _, opp := range oppMap {
161		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
162		if err != nil {
163			return nil, err
164		}
165		err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
166		if err != nil {
167			return nil, err
168		}
169		err = packClock.Witness(opp.PackTime)
170		if err != nil {
171			return nil, err
172		}
173	}
174
175	// Now that we know that the topological order and clocks are fine, we order the operationPacks
176	// based on the logical clocks, entirely ignoring the DAG topology
177
178	oppSlice := make([]*operationPack, 0, len(oppMap))
179	for _, pack := range oppMap {
180		oppSlice = append(oppSlice, pack)
181	}
182	sort.Slice(oppSlice, func(i, j int) bool {
183		// TODO: no secondary ordering?
184		// might be useful for stable ordering
185		return oppSlice[i].PackTime < oppSlice[i].PackTime
186	})
187
188	// Now that we ordered the operationPacks, we have the order of the Operations
189
190	ops := make([]Operation, 0, opsCount)
191	for _, pack := range oppSlice {
192		for _, operation := range pack.Operations {
193			ops = append(ops, operation)
194		}
195	}
196
197	return &Entity{
198		Definition: def,
199		ops:        ops,
200		lastCommit: rootHash,
201	}, nil
202}
203
204// Id return the Entity identifier
205func (e *Entity) Id() Id {
206	// id is the id of the first operation
207	return e.FirstOp().Id()
208}
209
210func (e *Entity) Validate() error {
211	// non-empty
212	if len(e.ops) == 0 && len(e.staging) == 0 {
213		return fmt.Errorf("entity has no operations")
214	}
215
216	// check if each operations are valid
217	for _, op := range e.ops {
218		if err := op.Validate(); err != nil {
219			return err
220		}
221	}
222
223	// check if staging is valid if needed
224	for _, op := range e.staging {
225		if err := op.Validate(); err != nil {
226			return err
227		}
228	}
229
230	// Check that there is no colliding operation's ID
231	ids := make(map[Id]struct{})
232	for _, op := range e.Operations() {
233		if _, ok := ids[op.Id()]; ok {
234			return fmt.Errorf("id collision: %s", op.Id())
235		}
236		ids[op.Id()] = struct{}{}
237	}
238
239	return nil
240}
241
242// return the ordered operations
243func (e *Entity) Operations() []Operation {
244	return append(e.ops, e.staging...)
245}
246
247// Lookup for the very first operation of the Entity.
248func (e *Entity) FirstOp() Operation {
249	for _, op := range e.ops {
250		return op
251	}
252	for _, op := range e.staging {
253		return op
254	}
255	return nil
256}
257
258func (e *Entity) Append(op Operation) {
259	e.staging = append(e.staging, op)
260}
261
262func (e *Entity) NeedCommit() bool {
263	return len(e.staging) > 0
264}
265
266func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
267	if e.NeedCommit() {
268		return e.Commit(repo)
269	}
270	return nil
271}
272
273func (e *Entity) Commit(repo repository.ClockedRepo) error {
274	if !e.NeedCommit() {
275		return fmt.Errorf("can't commit an entity with no pending operation")
276	}
277
278	if err := e.Validate(); err != nil {
279		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
280	}
281
282	// increment the various clocks for this new operationPack
283	packTime, err := e.packClock.Increment()
284	if err != nil {
285		return err
286	}
287	editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
288	if err != nil {
289		return err
290	}
291	var creationTime lamport.Time
292	if e.lastCommit == "" {
293		creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
294		if err != nil {
295			return err
296		}
297	}
298
299	opp := &operationPack{
300		Operations: e.staging,
301		CreateTime: creationTime,
302		EditTime:   editTime,
303		PackTime:   packTime,
304	}
305
306	treeHash, err := opp.write(e.Definition, repo)
307	if err != nil {
308		return err
309	}
310
311	// Write a Git commit referencing the tree, with the previous commit as parent
312	var commitHash repository.Hash
313	if e.lastCommit != "" {
314		commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit)
315	} else {
316		commitHash, err = repo.StoreCommit(treeHash)
317	}
318	if err != nil {
319		return err
320	}
321
322	e.lastCommit = commitHash
323	e.ops = append(e.ops, e.staging...)
324	e.staging = nil
325
326	// Create or update the Git reference for this entity
327	// When pushing later, the remote will ensure that this ref update
328	// is fast-forward, that is no data has been overwritten.
329	ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
330	return repo.UpdateRef(ref, commitHash)
331}