1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
23
24// Definition hold the details defining one specialization of an Entity.
25type Definition struct {
26 // the name of the entity (bug, pull-request, ...), for human consumption
27 Typename string
28 // the Namespace in git references (bugs, prs, ...)
29 Namespace string
30 // a function decoding a JSON message into an Operation
31 OperationUnmarshaler OperationUnmarshaler
32 // the expected format version number, that can be used for data migration/upgrade
33 FormatVersion uint
34}
35
36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
37type Entity struct {
38 // A Lamport clock is a logical clock that allow to order event
39 // inside a distributed system.
40 // It must be the first field in this struct due to https://github.com/golang/go/issues/36606
41 createTime lamport.Time
42 editTime lamport.Time
43
44 Definition
45
46 // operations that are already stored in the repository
47 ops []Operation
48 // operations not yet stored in the repository
49 staging []Operation
50
51 lastCommit repository.Hash
52}
53
54// New create an empty Entity
55func New(definition Definition) *Entity {
56 return &Entity{
57 Definition: definition,
58 }
59}
60
61// Read will read and decode a stored local Entity from a repository
62func Read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Entity, error) {
63 if err := id.Validate(); err != nil {
64 return nil, errors.Wrap(err, "invalid id")
65 }
66
67 ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
68
69 return read(def, repo, resolvers, ref)
70}
71
72// readRemote will read and decode a stored remote Entity from a repository
73func readRemote(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (*Entity, error) {
74 if err := id.Validate(); err != nil {
75 return nil, errors.Wrap(err, "invalid id")
76 }
77
78 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
79
80 return read(def, repo, resolvers, ref)
81}
82
83// read fetch from git and decode an Entity at an arbitrary git reference.
84func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (*Entity, error) {
85 rootHash, err := repo.ResolveRef(ref)
86 if err != nil {
87 return nil, err
88 }
89
90 // Perform a breadth-first search to get a topological order of the DAG where we discover the
91 // parents commit and go back in time up to the chronological root
92
93 queue := make([]repository.Hash, 0, 32)
94 visited := make(map[repository.Hash]struct{})
95 BFSOrder := make([]repository.Commit, 0, 32)
96
97 queue = append(queue, rootHash)
98 visited[rootHash] = struct{}{}
99
100 for len(queue) > 0 {
101 // pop
102 hash := queue[0]
103 queue = queue[1:]
104
105 commit, err := repo.ReadCommit(hash)
106 if err != nil {
107 return nil, err
108 }
109
110 BFSOrder = append(BFSOrder, commit)
111
112 for _, parent := range commit.Parents {
113 if _, ok := visited[parent]; !ok {
114 queue = append(queue, parent)
115 // mark as visited
116 visited[parent] = struct{}{}
117 }
118 }
119 }
120
121 // Now, we can reverse this topological order and read the commits in an order where
122 // we are sure to have read all the chronological ancestors when we read a commit.
123
124 // Next step is to:
125 // 1) read the operationPacks
126 // 2) make sure that clocks causality respect the DAG topology.
127
128 oppMap := make(map[repository.Hash]*operationPack)
129 var opsCount int
130
131 for i := len(BFSOrder) - 1; i >= 0; i-- {
132 commit := BFSOrder[i]
133 isFirstCommit := i == len(BFSOrder)-1
134 isMerge := len(commit.Parents) > 1
135
136 // Verify DAG structure: single chronological root, so only the root
137 // can have no parents. Said otherwise, the DAG need to have exactly
138 // one leaf.
139 if !isFirstCommit && len(commit.Parents) == 0 {
140 return nil, fmt.Errorf("multiple leafs in the entity DAG")
141 }
142
143 opp, err := readOperationPack(def, repo, resolvers, commit)
144 if err != nil {
145 return nil, err
146 }
147
148 err = opp.Validate()
149 if err != nil {
150 return nil, err
151 }
152
153 if isMerge && len(opp.Operations) > 0 {
154 return nil, fmt.Errorf("merge commit cannot have operations")
155 }
156
157 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
158 if isFirstCommit && opp.CreateTime <= 0 {
159 return nil, fmt.Errorf("creation lamport time not set")
160 }
161
162 // make sure that the lamport clocks causality match the DAG topology
163 for _, parentHash := range commit.Parents {
164 parentPack, ok := oppMap[parentHash]
165 if !ok {
166 panic("DFS failed")
167 }
168
169 if parentPack.EditTime >= opp.EditTime {
170 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
171 }
172
173 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
174 // that the clocks don't jump too far in the future
175 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
176 // as long as there is one valid chain of small hops, it's fine.
177 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
178 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
179 }
180 }
181
182 oppMap[commit.Hash] = opp
183 opsCount += len(opp.Operations)
184 }
185
186 // The clocks are fine, we witness them
187 for _, opp := range oppMap {
188 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
189 if err != nil {
190 return nil, err
191 }
192 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
193 if err != nil {
194 return nil, err
195 }
196 }
197
198 // Now that we know that the topological order and clocks are fine, we order the operationPacks
199 // based on the logical clocks, entirely ignoring the DAG topology
200
201 oppSlice := make([]*operationPack, 0, len(oppMap))
202 for _, pack := range oppMap {
203 oppSlice = append(oppSlice, pack)
204 }
205 sort.Slice(oppSlice, func(i, j int) bool {
206 // Primary ordering with the EditTime.
207 if oppSlice[i].EditTime != oppSlice[j].EditTime {
208 return oppSlice[i].EditTime < oppSlice[j].EditTime
209 }
210 // We have equal EditTime, which means we have concurrent edition over different machines, and we
211 // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
212 // As a secondary ordering, we can order based on a hash of the serialized Operations in the
213 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
214 // This is a lexicographic ordering on the stringified ID.
215 return oppSlice[i].Id() < oppSlice[j].Id()
216 })
217
218 // Now that we ordered the operationPacks, we have the order of the Operations
219
220 ops := make([]Operation, 0, opsCount)
221 var createTime lamport.Time
222 var editTime lamport.Time
223 for _, pack := range oppSlice {
224 for _, operation := range pack.Operations {
225 ops = append(ops, operation)
226 }
227 if pack.CreateTime > createTime {
228 createTime = pack.CreateTime
229 }
230 if pack.EditTime > editTime {
231 editTime = pack.EditTime
232 }
233 }
234
235 return &Entity{
236 Definition: def,
237 ops: ops,
238 lastCommit: rootHash,
239 createTime: createTime,
240 editTime: editTime,
241 }, nil
242}
243
244// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
245// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
246// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
247// operation blobs can be implemented instead.
248func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
249 rootHash, err := repo.ResolveRef(ref)
250 if err != nil {
251 return err
252 }
253
254 commit, err := repo.ReadCommit(rootHash)
255 if err != nil {
256 return err
257 }
258
259 createTime, editTime, err := readOperationPackClock(repo, commit)
260 if err != nil {
261 return err
262 }
263
264 // if we have more than one commit, we need to find the root to have the create time
265 if len(commit.Parents) > 0 {
266 for len(commit.Parents) > 0 {
267 // The path to the root is irrelevant.
268 commit, err = repo.ReadCommit(commit.Parents[0])
269 if err != nil {
270 return err
271 }
272 }
273 createTime, _, err = readOperationPackClock(repo, commit)
274 if err != nil {
275 return err
276 }
277 }
278
279 if createTime <= 0 {
280 return fmt.Errorf("creation lamport time not set")
281 }
282 if editTime <= 0 {
283 return fmt.Errorf("creation lamport time not set")
284 }
285 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
286 if err != nil {
287 return err
288 }
289 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
290 if err != nil {
291 return err
292 }
293 return nil
294}
295
296type StreamedEntity struct {
297 Entity *Entity
298 Err error
299}
300
301// ReadAll read and parse all local Entity
302func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity {
303 out := make(chan StreamedEntity)
304
305 go func() {
306 defer close(out)
307
308 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
309
310 refs, err := repo.ListRefs(refPrefix)
311 if err != nil {
312 out <- StreamedEntity{Err: err}
313 return
314 }
315
316 for _, ref := range refs {
317 e, err := read(def, repo, resolvers, ref)
318
319 if err != nil {
320 out <- StreamedEntity{Err: err}
321 return
322 }
323
324 out <- StreamedEntity{Entity: e}
325 }
326 }()
327
328 return out
329}
330
331// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
332// repo end up with correct clocks for the next write.
333func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
334 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
335
336 refs, err := repo.ListRefs(refPrefix)
337 if err != nil {
338 return err
339 }
340
341 for _, ref := range refs {
342 err = readClockNoCheck(def, repo, ref)
343 if err != nil {
344 return err
345 }
346 }
347
348 return nil
349}
350
351// Id return the Entity identifier
352func (e *Entity) Id() entity.Id {
353 // id is the id of the first operation
354 return e.FirstOp().Id()
355}
356
357// Validate check if the Entity data is valid
358func (e *Entity) Validate() error {
359 // non-empty
360 if len(e.ops) == 0 && len(e.staging) == 0 {
361 return fmt.Errorf("entity has no operations")
362 }
363
364 // check if each operation are valid
365 for _, op := range e.ops {
366 if err := op.Validate(); err != nil {
367 return err
368 }
369 }
370
371 // check if staging is valid if needed
372 for _, op := range e.staging {
373 if err := op.Validate(); err != nil {
374 return err
375 }
376 }
377
378 // Check that there is no colliding operation's ID
379 ids := make(map[entity.Id]struct{})
380 for _, op := range e.Operations() {
381 if _, ok := ids[op.Id()]; ok {
382 return fmt.Errorf("id collision: %s", op.Id())
383 }
384 ids[op.Id()] = struct{}{}
385 }
386
387 return nil
388}
389
390// Operations return the ordered operations
391func (e *Entity) Operations() []Operation {
392 return append(e.ops, e.staging...)
393}
394
395// FirstOp lookup for the very first operation of the Entity
396func (e *Entity) FirstOp() Operation {
397 for _, op := range e.ops {
398 return op
399 }
400 for _, op := range e.staging {
401 return op
402 }
403 return nil
404}
405
406// LastOp lookup for the very last operation of the Entity
407func (e *Entity) LastOp() Operation {
408 if len(e.staging) > 0 {
409 return e.staging[len(e.staging)-1]
410 }
411 if len(e.ops) > 0 {
412 return e.ops[len(e.ops)-1]
413 }
414 return nil
415}
416
417// Append add a new Operation to the Entity
418func (e *Entity) Append(op Operation) {
419 e.staging = append(e.staging, op)
420}
421
422// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
423func (e *Entity) NeedCommit() bool {
424 return len(e.staging) > 0
425}
426
427// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
428// is already in sync with the repository.
429func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
430 if e.NeedCommit() {
431 return e.Commit(repo)
432 }
433 return nil
434}
435
436// Commit write the appended operations in the repository
437func (e *Entity) Commit(repo repository.ClockedRepo) error {
438 if !e.NeedCommit() {
439 return fmt.Errorf("can't commit an entity with no pending operation")
440 }
441
442 err := e.Validate()
443 if err != nil {
444 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
445 }
446
447 for len(e.staging) > 0 {
448 var author identity.Interface
449 var toCommit []Operation
450
451 // Split into chunks with the same author
452 for len(e.staging) > 0 {
453 op := e.staging[0]
454 if author != nil && op.Author().Id() != author.Id() {
455 break
456 }
457 author = e.staging[0].Author()
458 toCommit = append(toCommit, op)
459 e.staging = e.staging[1:]
460 }
461
462 e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
463 if err != nil {
464 return err
465 }
466
467 opp := &operationPack{
468 Author: author,
469 Operations: toCommit,
470 EditTime: e.editTime,
471 }
472
473 if e.lastCommit == "" {
474 e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
475 if err != nil {
476 return err
477 }
478 opp.CreateTime = e.createTime
479 }
480
481 var parentCommit []repository.Hash
482 if e.lastCommit != "" {
483 parentCommit = []repository.Hash{e.lastCommit}
484 }
485
486 commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
487 if err != nil {
488 return err
489 }
490
491 e.lastCommit = commitHash
492 e.ops = append(e.ops, toCommit...)
493 }
494
495 // not strictly necessary but make equality testing easier in tests
496 e.staging = nil
497
498 // Create or update the Git reference for this entity
499 // When pushing later, the remote will ensure that this ref update
500 // is fast-forward, that is no data has been overwritten.
501 ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
502 return repo.UpdateRef(ref, e.lastCommit)
503}
504
505// CreateLamportTime return the Lamport time of creation
506func (e *Entity) CreateLamportTime() lamport.Time {
507 return e.createTime
508}
509
510// EditLamportTime return the Lamport time of the last edition
511func (e *Entity) EditLamportTime() lamport.Time {
512 return e.editTime
513}