entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entity"
 13	"github.com/MichaelMure/git-bug/identity"
 14	"github.com/MichaelMure/git-bug/repository"
 15	"github.com/MichaelMure/git-bug/util/lamport"
 16)
 17
 18const refsPattern = "refs/%s/%s"
 19const creationClockPattern = "%s-create"
 20const editClockPattern = "%s-edit"
 21
 22// Definition hold the details defining one specialization of an Entity.
 23type Definition struct {
 24	// the name of the entity (bug, pull-request, ...)
 25	typename string
 26	// the namespace in git (bugs, prs, ...)
 27	namespace string
 28	// a function decoding a JSON message into an Operation
 29	operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
 30	// a function loading an identity.Identity from its Id
 31	identityResolver identity.Resolver
 32	// the expected format version number, that can be used for data migration/upgrade
 33	formatVersion uint
 34}
 35
 36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 37type Entity struct {
 38	Definition
 39
 40	// operations that are already stored in the repository
 41	ops []Operation
 42	// operations not yet stored in the repository
 43	staging []Operation
 44
 45	// TODO: add here createTime and editTime
 46
 47	lastCommit repository.Hash
 48}
 49
 50// New create an empty Entity
 51func New(definition Definition) *Entity {
 52	return &Entity{
 53		Definition: definition,
 54	}
 55}
 56
 57// Read will read and decode a stored local Entity from a repository
 58func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
 59	if err := id.Validate(); err != nil {
 60		return nil, errors.Wrap(err, "invalid id")
 61	}
 62
 63	ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
 64
 65	return read(def, repo, ref)
 66}
 67
 68// readRemote will read and decode a stored remote Entity from a repository
 69func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
 70	if err := id.Validate(); err != nil {
 71		return nil, errors.Wrap(err, "invalid id")
 72	}
 73
 74	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())
 75
 76	return read(def, repo, ref)
 77}
 78
 79// read fetch from git and decode an Entity at an arbitrary git reference.
 80func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
 81	rootHash, err := repo.ResolveRef(ref)
 82	if err != nil {
 83		return nil, err
 84	}
 85
 86	// Perform a breadth-first search to get a topological order of the DAG where we discover the
 87	// parents commit and go back in time up to the chronological root
 88
 89	queue := make([]repository.Hash, 0, 32)
 90	visited := make(map[repository.Hash]struct{})
 91	BFSOrder := make([]repository.Commit, 0, 32)
 92
 93	queue = append(queue, rootHash)
 94	visited[rootHash] = struct{}{}
 95
 96	for len(queue) > 0 {
 97		// pop
 98		hash := queue[0]
 99		queue = queue[1:]
100
101		commit, err := repo.ReadCommit(hash)
102		if err != nil {
103			return nil, err
104		}
105
106		BFSOrder = append(BFSOrder, commit)
107
108		for _, parent := range commit.Parents {
109			if _, ok := visited[parent]; !ok {
110				queue = append(queue, parent)
111				// mark as visited
112				visited[parent] = struct{}{}
113			}
114		}
115	}
116
117	// Now, we can reverse this topological order and read the commits in an order where
118	// we are sure to have read all the chronological ancestors when we read a commit.
119
120	// Next step is to:
121	// 1) read the operationPacks
122	// 2) make sure that the clocks causality respect the DAG topology.
123
124	oppMap := make(map[repository.Hash]*operationPack)
125	var opsCount int
126
127	for i := len(BFSOrder) - 1; i >= 0; i-- {
128		commit := BFSOrder[i]
129		isFirstCommit := i == len(BFSOrder)-1
130		isMerge := len(commit.Parents) > 1
131
132		// Verify DAG structure: single chronological root, so only the root
133		// can have no parents. Said otherwise, the DAG need to have exactly
134		// one leaf.
135		if !isFirstCommit && len(commit.Parents) == 0 {
136			return nil, fmt.Errorf("multiple leafs in the entity DAG")
137		}
138
139		opp, err := readOperationPack(def, repo, commit)
140		if err != nil {
141			return nil, err
142		}
143
144		err = opp.Validate()
145		if err != nil {
146			return nil, err
147		}
148
149		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
150		if isFirstCommit && opp.CreateTime <= 0 {
151			return nil, fmt.Errorf("creation lamport time not set")
152		}
153
154		// make sure that the lamport clocks causality match the DAG topology
155		for _, parentHash := range commit.Parents {
156			parentPack, ok := oppMap[parentHash]
157			if !ok {
158				panic("DFS failed")
159			}
160
161			if parentPack.EditTime >= opp.EditTime {
162				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
163			}
164
165			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
166			// that the clocks don't jump too far in the future
167			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
168			// as long as there is one valid chain of small hops, it's fine.
169			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
170				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
171			}
172		}
173
174		oppMap[commit.Hash] = opp
175		opsCount += len(opp.Operations)
176	}
177
178	// The clocks are fine, we witness them
179	for _, opp := range oppMap {
180		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
181		if err != nil {
182			return nil, err
183		}
184		err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
185		if err != nil {
186			return nil, err
187		}
188	}
189
190	// Now that we know that the topological order and clocks are fine, we order the operationPacks
191	// based on the logical clocks, entirely ignoring the DAG topology
192
193	oppSlice := make([]*operationPack, 0, len(oppMap))
194	for _, pack := range oppMap {
195		oppSlice = append(oppSlice, pack)
196	}
197	sort.Slice(oppSlice, func(i, j int) bool {
198		// Primary ordering with the EditTime.
199		if oppSlice[i].EditTime != oppSlice[j].EditTime {
200			return oppSlice[i].EditTime < oppSlice[j].EditTime
201		}
202		// We have equal EditTime, which means we have concurrent edition over different machines and we
203		// can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
204		// As a secondary ordering, we can order based on a hash of the serialized Operations in the
205		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
206		// This is a lexicographic ordering on the stringified ID.
207		return oppSlice[i].Id() < oppSlice[j].Id()
208	})
209
210	// Now that we ordered the operationPacks, we have the order of the Operations
211
212	ops := make([]Operation, 0, opsCount)
213	for _, pack := range oppSlice {
214		for _, operation := range pack.Operations {
215			ops = append(ops, operation)
216		}
217	}
218
219	return &Entity{
220		Definition: def,
221		ops:        ops,
222		lastCommit: rootHash,
223	}, nil
224}
225
226type StreamedEntity struct {
227	Entity *Entity
228	Err    error
229}
230
231// ReadAll read and parse all local Entity
232func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
233	out := make(chan StreamedEntity)
234
235	go func() {
236		defer close(out)
237
238		refPrefix := fmt.Sprintf("refs/%s/", def.namespace)
239
240		refs, err := repo.ListRefs(refPrefix)
241		if err != nil {
242			out <- StreamedEntity{Err: err}
243			return
244		}
245
246		for _, ref := range refs {
247			e, err := read(def, repo, ref)
248
249			if err != nil {
250				out <- StreamedEntity{Err: err}
251				return
252			}
253
254			out <- StreamedEntity{Entity: e}
255		}
256	}()
257
258	return out
259}
260
261// Id return the Entity identifier
262func (e *Entity) Id() entity.Id {
263	// id is the id of the first operation
264	return e.FirstOp().Id()
265}
266
267// Validate check if the Entity data is valid
268func (e *Entity) Validate() error {
269	// non-empty
270	if len(e.ops) == 0 && len(e.staging) == 0 {
271		return fmt.Errorf("entity has no operations")
272	}
273
274	// check if each operations are valid
275	for _, op := range e.ops {
276		if err := op.Validate(); err != nil {
277			return err
278		}
279	}
280
281	// check if staging is valid if needed
282	for _, op := range e.staging {
283		if err := op.Validate(); err != nil {
284			return err
285		}
286	}
287
288	// Check that there is no colliding operation's ID
289	ids := make(map[entity.Id]struct{})
290	for _, op := range e.Operations() {
291		if _, ok := ids[op.Id()]; ok {
292			return fmt.Errorf("id collision: %s", op.Id())
293		}
294		ids[op.Id()] = struct{}{}
295	}
296
297	return nil
298}
299
300// Operations return the ordered operations
301func (e *Entity) Operations() []Operation {
302	return append(e.ops, e.staging...)
303}
304
305// FirstOp lookup for the very first operation of the Entity
306func (e *Entity) FirstOp() Operation {
307	for _, op := range e.ops {
308		return op
309	}
310	for _, op := range e.staging {
311		return op
312	}
313	return nil
314}
315
316// LastOp lookup for the very last operation of the Entity
317func (e *Entity) LastOp() Operation {
318	if len(e.staging) > 0 {
319		return e.staging[len(e.staging)-1]
320	}
321	if len(e.ops) > 0 {
322		return e.ops[len(e.ops)-1]
323	}
324	return nil
325}
326
327// Append add a new Operation to the Entity
328func (e *Entity) Append(op Operation) {
329	e.staging = append(e.staging, op)
330}
331
332// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
333func (e *Entity) NeedCommit() bool {
334	return len(e.staging) > 0
335}
336
337// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
338// is already in sync with the repository.
339func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
340	if e.NeedCommit() {
341		return e.Commit(repo)
342	}
343	return nil
344}
345
346// Commit write the appended operations in the repository
347func (e *Entity) Commit(repo repository.ClockedRepo) error {
348	if !e.NeedCommit() {
349		return fmt.Errorf("can't commit an entity with no pending operation")
350	}
351
352	if err := e.Validate(); err != nil {
353		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
354	}
355
356	var author identity.Interface
357	for _, op := range e.staging {
358		if author != nil && op.Author() != author {
359			return fmt.Errorf("operations with different author")
360		}
361		author = op.Author()
362	}
363
364	editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
365	if err != nil {
366		return err
367	}
368	var creationTime lamport.Time
369	if e.lastCommit == "" {
370		creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
371		if err != nil {
372			return err
373		}
374	}
375
376	opp := &operationPack{
377		Author:     author,
378		Operations: e.staging,
379		CreateTime: creationTime,
380		EditTime:   editTime,
381	}
382
383	var commitHash repository.Hash
384	if e.lastCommit == "" {
385		commitHash, err = opp.Write(e.Definition, repo)
386	} else {
387		commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
388	}
389
390	if err != nil {
391		return err
392	}
393
394	e.lastCommit = commitHash
395	e.ops = append(e.ops, e.staging...)
396	e.staging = nil
397
398	// Create or update the Git reference for this entity
399	// When pushing later, the remote will ensure that this ref update
400	// is fast-forward, that is no data has been overwritten.
401	ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
402	return repo.UpdateRef(ref, commitHash)
403}