1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entity"
13 "github.com/MichaelMure/git-bug/identity"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22// Definition hold the details defining one specialization of an Entity.
23type Definition struct {
24 // the name of the entity (bug, pull-request, ...), for human consumption
25 Typename string
26 // the Namespace in git references (bugs, prs, ...)
27 Namespace string
28 // a function decoding a JSON message into an Operation
29 OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
30 // the expected format version number, that can be used for data migration/upgrade
31 FormatVersion uint
32}
33
34// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
35type Entity struct {
36 // A Lamport clock is a logical clock that allow to order event
37 // inside a distributed system.
38 // It must be the first field in this struct due to https://github.com/golang/go/issues/36606
39 createTime lamport.Time
40 editTime lamport.Time
41
42 Definition
43
44 // operations that are already stored in the repository
45 ops []Operation
46 // operations not yet stored in the repository
47 staging []Operation
48
49 lastCommit repository.Hash
50}
51
52// New create an empty Entity
53func New(definition Definition) *Entity {
54 return &Entity{
55 Definition: definition,
56 }
57}
58
59// Read will read and decode a stored local Entity from a repository
60func Read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Entity, error) {
61 if err := id.Validate(); err != nil {
62 return nil, errors.Wrap(err, "invalid id")
63 }
64
65 ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
66
67 return read(def, repo, resolvers, ref)
68}
69
70// readRemote will read and decode a stored remote Entity from a repository
71func readRemote(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (*Entity, error) {
72 if err := id.Validate(); err != nil {
73 return nil, errors.Wrap(err, "invalid id")
74 }
75
76 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
77
78 return read(def, repo, resolvers, ref)
79}
80
81// read fetch from git and decode an Entity at an arbitrary git reference.
82func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (*Entity, error) {
83 rootHash, err := repo.ResolveRef(ref)
84 if err != nil {
85 return nil, err
86 }
87
88 // Perform a breadth-first search to get a topological order of the DAG where we discover the
89 // parents commit and go back in time up to the chronological root
90
91 queue := make([]repository.Hash, 0, 32)
92 visited := make(map[repository.Hash]struct{})
93 BFSOrder := make([]repository.Commit, 0, 32)
94
95 queue = append(queue, rootHash)
96 visited[rootHash] = struct{}{}
97
98 for len(queue) > 0 {
99 // pop
100 hash := queue[0]
101 queue = queue[1:]
102
103 commit, err := repo.ReadCommit(hash)
104 if err != nil {
105 return nil, err
106 }
107
108 BFSOrder = append(BFSOrder, commit)
109
110 for _, parent := range commit.Parents {
111 if _, ok := visited[parent]; !ok {
112 queue = append(queue, parent)
113 // mark as visited
114 visited[parent] = struct{}{}
115 }
116 }
117 }
118
119 // Now, we can reverse this topological order and read the commits in an order where
120 // we are sure to have read all the chronological ancestors when we read a commit.
121
122 // Next step is to:
123 // 1) read the operationPacks
124 // 2) make sure that clocks causality respect the DAG topology.
125
126 oppMap := make(map[repository.Hash]*operationPack)
127 var opsCount int
128
129 for i := len(BFSOrder) - 1; i >= 0; i-- {
130 commit := BFSOrder[i]
131 isFirstCommit := i == len(BFSOrder)-1
132 isMerge := len(commit.Parents) > 1
133
134 // Verify DAG structure: single chronological root, so only the root
135 // can have no parents. Said otherwise, the DAG need to have exactly
136 // one leaf.
137 if !isFirstCommit && len(commit.Parents) == 0 {
138 return nil, fmt.Errorf("multiple leafs in the entity DAG")
139 }
140
141 opp, err := readOperationPack(def, repo, resolvers, commit)
142 if err != nil {
143 return nil, err
144 }
145
146 err = opp.Validate()
147 if err != nil {
148 return nil, err
149 }
150
151 if isMerge && len(opp.Operations) > 0 {
152 return nil, fmt.Errorf("merge commit cannot have operations")
153 }
154
155 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
156 if isFirstCommit && opp.CreateTime <= 0 {
157 return nil, fmt.Errorf("creation lamport time not set")
158 }
159
160 // make sure that the lamport clocks causality match the DAG topology
161 for _, parentHash := range commit.Parents {
162 parentPack, ok := oppMap[parentHash]
163 if !ok {
164 panic("DFS failed")
165 }
166
167 if parentPack.EditTime >= opp.EditTime {
168 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
169 }
170
171 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
172 // that the clocks don't jump too far in the future
173 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
174 // as long as there is one valid chain of small hops, it's fine.
175 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
176 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
177 }
178 }
179
180 oppMap[commit.Hash] = opp
181 opsCount += len(opp.Operations)
182 }
183
184 // The clocks are fine, we witness them
185 for _, opp := range oppMap {
186 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
187 if err != nil {
188 return nil, err
189 }
190 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
191 if err != nil {
192 return nil, err
193 }
194 }
195
196 // Now that we know that the topological order and clocks are fine, we order the operationPacks
197 // based on the logical clocks, entirely ignoring the DAG topology
198
199 oppSlice := make([]*operationPack, 0, len(oppMap))
200 for _, pack := range oppMap {
201 oppSlice = append(oppSlice, pack)
202 }
203 sort.Slice(oppSlice, func(i, j int) bool {
204 // Primary ordering with the EditTime.
205 if oppSlice[i].EditTime != oppSlice[j].EditTime {
206 return oppSlice[i].EditTime < oppSlice[j].EditTime
207 }
208 // We have equal EditTime, which means we have concurrent edition over different machines, and we
209 // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
210 // As a secondary ordering, we can order based on a hash of the serialized Operations in the
211 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
212 // This is a lexicographic ordering on the stringified ID.
213 return oppSlice[i].Id() < oppSlice[j].Id()
214 })
215
216 // Now that we ordered the operationPacks, we have the order of the Operations
217
218 ops := make([]Operation, 0, opsCount)
219 var createTime lamport.Time
220 var editTime lamport.Time
221 for _, pack := range oppSlice {
222 for _, operation := range pack.Operations {
223 ops = append(ops, operation)
224 }
225 if pack.CreateTime > createTime {
226 createTime = pack.CreateTime
227 }
228 if pack.EditTime > editTime {
229 editTime = pack.EditTime
230 }
231 }
232
233 return &Entity{
234 Definition: def,
235 ops: ops,
236 lastCommit: rootHash,
237 createTime: createTime,
238 editTime: editTime,
239 }, nil
240}
241
242// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
243// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
244// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
245// operation blobs can be implemented instead.
246func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
247 rootHash, err := repo.ResolveRef(ref)
248 if err != nil {
249 return err
250 }
251
252 commit, err := repo.ReadCommit(rootHash)
253 if err != nil {
254 return err
255 }
256
257 createTime, editTime, err := readOperationPackClock(repo, commit)
258 if err != nil {
259 return err
260 }
261
262 // if we have more than one commit, we need to find the root to have the create time
263 if len(commit.Parents) > 0 {
264 for len(commit.Parents) > 0 {
265 // The path to the root is irrelevant.
266 commit, err = repo.ReadCommit(commit.Parents[0])
267 if err != nil {
268 return err
269 }
270 }
271 createTime, _, err = readOperationPackClock(repo, commit)
272 if err != nil {
273 return err
274 }
275 }
276
277 if createTime <= 0 {
278 return fmt.Errorf("creation lamport time not set")
279 }
280 if editTime <= 0 {
281 return fmt.Errorf("creation lamport time not set")
282 }
283 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
284 if err != nil {
285 return err
286 }
287 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
288 if err != nil {
289 return err
290 }
291 return nil
292}
293
294type StreamedEntity struct {
295 Entity *Entity
296 Err error
297}
298
299// ReadAll read and parse all local Entity
300func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity {
301 out := make(chan StreamedEntity)
302
303 go func() {
304 defer close(out)
305
306 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
307
308 refs, err := repo.ListRefs(refPrefix)
309 if err != nil {
310 out <- StreamedEntity{Err: err}
311 return
312 }
313
314 for _, ref := range refs {
315 e, err := read(def, repo, resolvers, ref)
316
317 if err != nil {
318 out <- StreamedEntity{Err: err}
319 return
320 }
321
322 out <- StreamedEntity{Entity: e}
323 }
324 }()
325
326 return out
327}
328
329// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
330// repo end up with correct clocks for the next write.
331func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
332 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
333
334 refs, err := repo.ListRefs(refPrefix)
335 if err != nil {
336 return err
337 }
338
339 for _, ref := range refs {
340 err = readClockNoCheck(def, repo, ref)
341 if err != nil {
342 return err
343 }
344 }
345
346 return nil
347}
348
349// Id return the Entity identifier
350func (e *Entity) Id() entity.Id {
351 // id is the id of the first operation
352 return e.FirstOp().Id()
353}
354
355// Validate check if the Entity data is valid
356func (e *Entity) Validate() error {
357 // non-empty
358 if len(e.ops) == 0 && len(e.staging) == 0 {
359 return fmt.Errorf("entity has no operations")
360 }
361
362 // check if each operations are valid
363 for _, op := range e.ops {
364 if err := op.Validate(); err != nil {
365 return err
366 }
367 }
368
369 // check if staging is valid if needed
370 for _, op := range e.staging {
371 if err := op.Validate(); err != nil {
372 return err
373 }
374 }
375
376 // Check that there is no colliding operation's ID
377 ids := make(map[entity.Id]struct{})
378 for _, op := range e.Operations() {
379 if _, ok := ids[op.Id()]; ok {
380 return fmt.Errorf("id collision: %s", op.Id())
381 }
382 ids[op.Id()] = struct{}{}
383 }
384
385 return nil
386}
387
388// Operations return the ordered operations
389func (e *Entity) Operations() []Operation {
390 return append(e.ops, e.staging...)
391}
392
393// FirstOp lookup for the very first operation of the Entity
394func (e *Entity) FirstOp() Operation {
395 for _, op := range e.ops {
396 return op
397 }
398 for _, op := range e.staging {
399 return op
400 }
401 return nil
402}
403
404// LastOp lookup for the very last operation of the Entity
405func (e *Entity) LastOp() Operation {
406 if len(e.staging) > 0 {
407 return e.staging[len(e.staging)-1]
408 }
409 if len(e.ops) > 0 {
410 return e.ops[len(e.ops)-1]
411 }
412 return nil
413}
414
415// Append add a new Operation to the Entity
416func (e *Entity) Append(op Operation) {
417 e.staging = append(e.staging, op)
418}
419
420// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
421func (e *Entity) NeedCommit() bool {
422 return len(e.staging) > 0
423}
424
425// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
426// is already in sync with the repository.
427func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
428 if e.NeedCommit() {
429 return e.Commit(repo)
430 }
431 return nil
432}
433
434// Commit write the appended operations in the repository
435func (e *Entity) Commit(repo repository.ClockedRepo) error {
436 if !e.NeedCommit() {
437 return fmt.Errorf("can't commit an entity with no pending operation")
438 }
439
440 err := e.Validate()
441 if err != nil {
442 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
443 }
444
445 for len(e.staging) > 0 {
446 var author identity.Interface
447 var toCommit []Operation
448
449 // Split into chunks with the same author
450 for len(e.staging) > 0 {
451 op := e.staging[0]
452 if author != nil && op.Author().Id() != author.Id() {
453 break
454 }
455 author = e.staging[0].Author()
456 toCommit = append(toCommit, op)
457 e.staging = e.staging[1:]
458 }
459
460 e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
461 if err != nil {
462 return err
463 }
464
465 opp := &operationPack{
466 Author: author,
467 Operations: toCommit,
468 EditTime: e.editTime,
469 }
470
471 if e.lastCommit == "" {
472 e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
473 if err != nil {
474 return err
475 }
476 opp.CreateTime = e.createTime
477 }
478
479 var parentCommit []repository.Hash
480 if e.lastCommit != "" {
481 parentCommit = []repository.Hash{e.lastCommit}
482 }
483
484 commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
485 if err != nil {
486 return err
487 }
488
489 e.lastCommit = commitHash
490 e.ops = append(e.ops, toCommit...)
491 }
492
493 // not strictly necessary but make equality testing easier in tests
494 e.staging = nil
495
496 // Create or update the Git reference for this entity
497 // When pushing later, the remote will ensure that this ref update
498 // is fast-forward, that is no data has been overwritten.
499 ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
500 return repo.UpdateRef(ref, e.lastCommit)
501}
502
503// CreateLamportTime return the Lamport time of creation
504func (e *Entity) CreateLamportTime() lamport.Time {
505 return e.createTime
506}
507
508// EditLamportTime return the Lamport time of the last edition
509func (e *Entity) EditLamportTime() lamport.Time {
510 return e.editTime
511}