entity.go

  1// Package dag contains the base common code to define an entity stored
  2// in a chain of git objects, supporting actions like Push, Pull and Merge.
  3package dag
  4
  5import (
  6	"encoding/json"
  7	"fmt"
  8	"sort"
  9
 10	"github.com/pkg/errors"
 11
 12	"github.com/MichaelMure/git-bug/entity"
 13	"github.com/MichaelMure/git-bug/identity"
 14	"github.com/MichaelMure/git-bug/repository"
 15	"github.com/MichaelMure/git-bug/util/lamport"
 16)
 17
 18const refsPattern = "refs/%s/%s"
 19const creationClockPattern = "%s-create"
 20const editClockPattern = "%s-edit"
 21
 22// Definition hold the details defining one specialization of an Entity.
 23type Definition struct {
 24	// the name of the entity (bug, pull-request, ...)
 25	typename string
 26	// the namespace in git (bugs, prs, ...)
 27	namespace string
 28	// a function decoding a JSON message into an Operation
 29	operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
 30	// a function loading an identity.Identity from its Id
 31	identityResolver identity.Resolver
 32	// the expected format version number, that can be used for data migration/upgrade
 33	formatVersion uint
 34}
 35
 36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
 37type Entity struct {
 38	Definition
 39
 40	// operations that are already stored in the repository
 41	ops []Operation
 42	// operations not yet stored in the repository
 43	staging []Operation
 44
 45	// TODO: add here createTime and editTime
 46
 47	// // TODO: doesn't seems to actually be useful over the topological sort ? Timestamp can be generated from graph depth
 48	// // TODO: maybe EditTime is better because it could spread ops in consecutive groups on the logical timeline --> avoid interleaving
 49	// packClock  lamport.Clock
 50	lastCommit repository.Hash
 51}
 52
 53// New create an empty Entity
 54func New(definition Definition) *Entity {
 55	return &Entity{
 56		Definition: definition,
 57		// packClock:  lamport.NewMemClock(),
 58	}
 59}
 60
 61// Read will read and decode a stored local Entity from a repository
 62func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
 63	if err := id.Validate(); err != nil {
 64		return nil, errors.Wrap(err, "invalid id")
 65	}
 66
 67	ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
 68
 69	return read(def, repo, ref)
 70}
 71
 72// readRemote will read and decode a stored remote Entity from a repository
 73func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
 74	if err := id.Validate(); err != nil {
 75		return nil, errors.Wrap(err, "invalid id")
 76	}
 77
 78	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())
 79
 80	return read(def, repo, ref)
 81}
 82
 83// read fetch from git and decode an Entity at an arbitrary git reference.
 84func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
 85	rootHash, err := repo.ResolveRef(ref)
 86	if err != nil {
 87		return nil, err
 88	}
 89
 90	// Perform a breadth-first search to get a topological order of the DAG where we discover the
 91	// parents commit and go back in time up to the chronological root
 92
 93	queue := make([]repository.Hash, 0, 32)
 94	visited := make(map[repository.Hash]struct{})
 95	BFSOrder := make([]repository.Commit, 0, 32)
 96
 97	queue = append(queue, rootHash)
 98	visited[rootHash] = struct{}{}
 99
100	for len(queue) > 0 {
101		// pop
102		hash := queue[0]
103		queue = queue[1:]
104
105		commit, err := repo.ReadCommit(hash)
106		if err != nil {
107			return nil, err
108		}
109
110		BFSOrder = append(BFSOrder, commit)
111
112		for _, parent := range commit.Parents {
113			if _, ok := visited[parent]; !ok {
114				queue = append(queue, parent)
115				// mark as visited
116				visited[parent] = struct{}{}
117			}
118		}
119	}
120
121	// Now, we can reverse this topological order and read the commits in an order where
122	// we are sure to have read all the chronological ancestors when we read a commit.
123
124	// Next step is to:
125	// 1) read the operationPacks
126	// 2) make sure that the clocks causality respect the DAG topology.
127
128	oppMap := make(map[repository.Hash]*operationPack)
129	var opsCount int
130	// var packClock = lamport.NewMemClock()
131
132	for i := len(BFSOrder) - 1; i >= 0; i-- {
133		commit := BFSOrder[i]
134		isFirstCommit := i == len(BFSOrder)-1
135		isMerge := len(commit.Parents) > 1
136
137		// Verify DAG structure: single chronological root, so only the root
138		// can have no parents. Said otherwise, the DAG need to have exactly
139		// one leaf.
140		if !isFirstCommit && len(commit.Parents) == 0 {
141			return nil, fmt.Errorf("multiple leafs in the entity DAG")
142		}
143
144		opp, err := readOperationPack(def, repo, commit)
145		if err != nil {
146			return nil, err
147		}
148
149		err = opp.Validate()
150		if err != nil {
151			return nil, err
152		}
153
154		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
155		if isFirstCommit && opp.CreateTime <= 0 {
156			return nil, fmt.Errorf("creation lamport time not set")
157		}
158
159		// make sure that the lamport clocks causality match the DAG topology
160		for _, parentHash := range commit.Parents {
161			parentPack, ok := oppMap[parentHash]
162			if !ok {
163				panic("DFS failed")
164			}
165
166			if parentPack.EditTime >= opp.EditTime {
167				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
168			}
169
170			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
171			// that the clocks don't jump too far in the future
172			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
173			// as long as there is one valid chain of small hops, it's fine.
174			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
175				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
176			}
177
178			// TODO: PackTime is not checked
179		}
180
181		oppMap[commit.Hash] = opp
182		opsCount += len(opp.Operations)
183	}
184
185	// The clocks are fine, we witness them
186	for _, opp := range oppMap {
187		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
188		if err != nil {
189			return nil, err
190		}
191		err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
192		if err != nil {
193			return nil, err
194		}
195		// err = packClock.Witness(opp.PackTime)
196		// if err != nil {
197		// 	return nil, err
198		// }
199	}
200
201	// Now that we know that the topological order and clocks are fine, we order the operationPacks
202	// based on the logical clocks, entirely ignoring the DAG topology
203
204	oppSlice := make([]*operationPack, 0, len(oppMap))
205	for _, pack := range oppMap {
206		oppSlice = append(oppSlice, pack)
207	}
208	sort.Slice(oppSlice, func(i, j int) bool {
209		// Primary ordering with the dedicated "pack" Lamport time that encode causality
210		// within the entity
211		// if oppSlice[i].PackTime != oppSlice[j].PackTime {
212		// 	return oppSlice[i].PackTime < oppSlice[i].PackTime
213		// }
214		// We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
215		// came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
216		// enough but it can give us an edge to approach what really happened.
217		if oppSlice[i].EditTime != oppSlice[j].EditTime {
218			return oppSlice[i].EditTime < oppSlice[j].EditTime
219		}
220		// Well, what now? We still need a total ordering and the most stable possible.
221		// As a last resort, we can order based on a hash of the serialized Operations in the
222		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
223		// This is a lexicographic ordering on the stringified ID.
224		return oppSlice[i].Id() < oppSlice[j].Id()
225	})
226
227	// Now that we ordered the operationPacks, we have the order of the Operations
228
229	ops := make([]Operation, 0, opsCount)
230	for _, pack := range oppSlice {
231		for _, operation := range pack.Operations {
232			ops = append(ops, operation)
233		}
234	}
235
236	return &Entity{
237		Definition: def,
238		ops:        ops,
239		// packClock:  packClock,
240		lastCommit: rootHash,
241	}, nil
242}
243
244type StreamedEntity struct {
245	Entity *Entity
246	Err    error
247}
248
249// ReadAll read and parse all local Entity
250func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
251	out := make(chan StreamedEntity)
252
253	go func() {
254		defer close(out)
255
256		refPrefix := fmt.Sprintf("refs/%s/", def.namespace)
257
258		refs, err := repo.ListRefs(refPrefix)
259		if err != nil {
260			out <- StreamedEntity{Err: err}
261			return
262		}
263
264		for _, ref := range refs {
265			e, err := read(def, repo, ref)
266
267			if err != nil {
268				out <- StreamedEntity{Err: err}
269				return
270			}
271
272			out <- StreamedEntity{Entity: e}
273		}
274	}()
275
276	return out
277}
278
279// Id return the Entity identifier
280func (e *Entity) Id() entity.Id {
281	// id is the id of the first operation
282	return e.FirstOp().Id()
283}
284
285// Validate check if the Entity data is valid
286func (e *Entity) Validate() error {
287	// non-empty
288	if len(e.ops) == 0 && len(e.staging) == 0 {
289		return fmt.Errorf("entity has no operations")
290	}
291
292	// check if each operations are valid
293	for _, op := range e.ops {
294		if err := op.Validate(); err != nil {
295			return err
296		}
297	}
298
299	// check if staging is valid if needed
300	for _, op := range e.staging {
301		if err := op.Validate(); err != nil {
302			return err
303		}
304	}
305
306	// Check that there is no colliding operation's ID
307	ids := make(map[entity.Id]struct{})
308	for _, op := range e.Operations() {
309		if _, ok := ids[op.Id()]; ok {
310			return fmt.Errorf("id collision: %s", op.Id())
311		}
312		ids[op.Id()] = struct{}{}
313	}
314
315	return nil
316}
317
318// Operations return the ordered operations
319func (e *Entity) Operations() []Operation {
320	return append(e.ops, e.staging...)
321}
322
323// FirstOp lookup for the very first operation of the Entity
324func (e *Entity) FirstOp() Operation {
325	for _, op := range e.ops {
326		return op
327	}
328	for _, op := range e.staging {
329		return op
330	}
331	return nil
332}
333
334// LastOp lookup for the very last operation of the Entity
335func (e *Entity) LastOp() Operation {
336	if len(e.staging) > 0 {
337		return e.staging[len(e.staging)-1]
338	}
339	if len(e.ops) > 0 {
340		return e.ops[len(e.ops)-1]
341	}
342	return nil
343}
344
345// Append add a new Operation to the Entity
346func (e *Entity) Append(op Operation) {
347	e.staging = append(e.staging, op)
348}
349
350// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
351func (e *Entity) NeedCommit() bool {
352	return len(e.staging) > 0
353}
354
355// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
356// is already in sync with the repository.
357func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
358	if e.NeedCommit() {
359		return e.Commit(repo)
360	}
361	return nil
362}
363
364// Commit write the appended operations in the repository
365func (e *Entity) Commit(repo repository.ClockedRepo) error {
366	if !e.NeedCommit() {
367		return fmt.Errorf("can't commit an entity with no pending operation")
368	}
369
370	if err := e.Validate(); err != nil {
371		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
372	}
373
374	var author identity.Interface
375	for _, op := range e.staging {
376		if author != nil && op.Author() != author {
377			return fmt.Errorf("operations with different author")
378		}
379		author = op.Author()
380	}
381
382	// increment the various clocks for this new operationPack
383	// packTime, err := e.packClock.Increment()
384	// if err != nil {
385	// 	return err
386	// }
387	editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
388	if err != nil {
389		return err
390	}
391	var creationTime lamport.Time
392	if e.lastCommit == "" {
393		creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
394		if err != nil {
395			return err
396		}
397	}
398
399	opp := &operationPack{
400		Author:     author,
401		Operations: e.staging,
402		CreateTime: creationTime,
403		EditTime:   editTime,
404		// PackTime:   packTime,
405	}
406
407	var commitHash repository.Hash
408	if e.lastCommit == "" {
409		commitHash, err = opp.Write(e.Definition, repo)
410	} else {
411		commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
412	}
413
414	if err != nil {
415		return err
416	}
417
418	e.lastCommit = commitHash
419	e.ops = append(e.ops, e.staging...)
420	e.staging = nil
421
422	// Create or update the Git reference for this entity
423	// When pushing later, the remote will ensure that this ref update
424	// is fast-forward, that is no data has been overwritten.
425	ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
426	return repo.UpdateRef(ref, commitHash)
427}