1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entity"
13 bootstrap "github.com/MichaelMure/git-bug/entity/boostrap"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
23
24// Definition hold the details defining one specialization of an Entity.
25type Definition struct {
26 // the name of the entity (bug, pull-request, ...), for human consumption
27 Typename string
28 // the Namespace in git references (bugs, prs, ...)
29 Namespace string
30 // a function decoding a JSON message into an Operation
31 OperationUnmarshaler OperationUnmarshaler
32 // the expected format version number, that can be used for data migration/upgrade
33 FormatVersion uint
34}
35
36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
37type Entity struct {
38 // A Lamport clock is a logical clock that allow to order event
39 // inside a distributed system.
40 // It must be the first field in this struct due to https://github.com/golang/go/issues/36606
41 createTime lamport.Time
42 editTime lamport.Time
43
44 Definition
45
46 // operations that are already stored in the repository
47 ops []Operation
48 // operations not yet stored in the repository
49 staging []Operation
50
51 lastCommit repository.Hash
52}
53
54// New create an empty Entity
55func New(definition Definition) *Entity {
56 return &Entity{
57 Definition: definition,
58 }
59}
60
61// Read will read and decode a stored local Entity from a repository
62func Read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
63 if err := id.Validate(); err != nil {
64 return *new(EntityT), errors.Wrap(err, "invalid id")
65 }
66
67 ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
68
69 return read[EntityT](def, wrapper, repo, resolvers, ref)
70}
71
72// readRemote will read and decode a stored remote Entity from a repository
73func readRemote[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
74 if err := id.Validate(); err != nil {
75 return *new(EntityT), errors.Wrap(err, "invalid id")
76 }
77
78 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
79
80 return read[EntityT](def, wrapper, repo, resolvers, ref)
81}
82
83// read fetch from git and decode an Entity at an arbitrary git reference.
84func read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
85 rootHash, err := repo.ResolveRef(ref)
86 if err == repository.ErrNotFound {
87 return *new(EntityT), entity.NewErrNotFound(def.Typename)
88 }
89 if err != nil {
90 return *new(EntityT), err
91 }
92
93 // Perform a breadth-first search to get a topological order of the DAG where we discover the
94 // parents commit and go back in time up to the chronological root
95
96 queue := make([]repository.Hash, 0, 32)
97 visited := make(map[repository.Hash]struct{})
98 BFSOrder := make([]repository.Commit, 0, 32)
99
100 queue = append(queue, rootHash)
101 visited[rootHash] = struct{}{}
102
103 for len(queue) > 0 {
104 // pop
105 hash := queue[0]
106 queue = queue[1:]
107
108 commit, err := repo.ReadCommit(hash)
109 if err != nil {
110 return *new(EntityT), err
111 }
112
113 BFSOrder = append(BFSOrder, commit)
114
115 for _, parent := range commit.Parents {
116 if _, ok := visited[parent]; !ok {
117 queue = append(queue, parent)
118 // mark as visited
119 visited[parent] = struct{}{}
120 }
121 }
122 }
123
124 // Now, we can reverse this topological order and read the commits in an order where
125 // we are sure to have read all the chronological ancestors when we read a commit.
126
127 // Next step is to:
128 // 1) read the operationPacks
129 // 2) make sure that clocks causality respect the DAG topology.
130
131 oppMap := make(map[repository.Hash]*operationPack)
132 var opsCount int
133
134 for i := len(BFSOrder) - 1; i >= 0; i-- {
135 commit := BFSOrder[i]
136 isFirstCommit := i == len(BFSOrder)-1
137 isMerge := len(commit.Parents) > 1
138
139 // Verify DAG structure: single chronological root, so only the root
140 // can have no parents. Said otherwise, the DAG need to have exactly
141 // one leaf.
142 if !isFirstCommit && len(commit.Parents) == 0 {
143 return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
144 }
145
146 opp, err := readOperationPack(def, repo, resolvers, commit)
147 if err != nil {
148 return *new(EntityT), err
149 }
150
151 err = opp.Validate()
152 if err != nil {
153 return *new(EntityT), err
154 }
155
156 if isMerge && len(opp.Operations) > 0 {
157 return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
158 }
159
160 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
161 if isFirstCommit && opp.CreateTime <= 0 {
162 return *new(EntityT), fmt.Errorf("creation lamport time not set")
163 }
164
165 // make sure that the lamport clocks causality match the DAG topology
166 for _, parentHash := range commit.Parents {
167 parentPack, ok := oppMap[parentHash]
168 if !ok {
169 panic("DFS failed")
170 }
171
172 if parentPack.EditTime >= opp.EditTime {
173 return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
174 }
175
176 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
177 // that the clocks don't jump too far in the future
178 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
179 // as long as there is one valid chain of small hops, it's fine.
180 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
181 return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
182 }
183 }
184
185 oppMap[commit.Hash] = opp
186 opsCount += len(opp.Operations)
187 }
188
189 // The clocks are fine, we witness them
190 for _, opp := range oppMap {
191 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
192 if err != nil {
193 return *new(EntityT), err
194 }
195 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
196 if err != nil {
197 return *new(EntityT), err
198 }
199 }
200
201 // Now that we know that the topological order and clocks are fine, we order the operationPacks
202 // based on the logical clocks, entirely ignoring the DAG topology
203
204 oppSlice := make([]*operationPack, 0, len(oppMap))
205 for _, pack := range oppMap {
206 oppSlice = append(oppSlice, pack)
207 }
208 sort.Slice(oppSlice, func(i, j int) bool {
209 // Primary ordering with the EditTime.
210 if oppSlice[i].EditTime != oppSlice[j].EditTime {
211 return oppSlice[i].EditTime < oppSlice[j].EditTime
212 }
213 // We have equal EditTime, which means we have concurrent edition over different machines, and we
214 // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
215 // As a secondary ordering, we can order based on a hash of the serialized Operations in the
216 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
217 // This is a lexicographic ordering on the stringified ID.
218 return oppSlice[i].Id() < oppSlice[j].Id()
219 })
220
221 // Now that we ordered the operationPacks, we have the order of the Operations
222
223 ops := make([]Operation, 0, opsCount)
224 var createTime lamport.Time
225 var editTime lamport.Time
226 for _, pack := range oppSlice {
227 for _, operation := range pack.Operations {
228 ops = append(ops, operation)
229 }
230 if pack.CreateTime > createTime {
231 createTime = pack.CreateTime
232 }
233 if pack.EditTime > editTime {
234 editTime = pack.EditTime
235 }
236 }
237
238 return wrapper(&Entity{
239 Definition: def,
240 ops: ops,
241 lastCommit: rootHash,
242 createTime: createTime,
243 editTime: editTime,
244 }), nil
245}
246
247// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
248// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
249// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
250// operation blobs can be implemented instead.
251func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
252 rootHash, err := repo.ResolveRef(ref)
253 if err == repository.ErrNotFound {
254 return entity.NewErrNotFound(def.Typename)
255 }
256 if err != nil {
257 return err
258 }
259
260 commit, err := repo.ReadCommit(rootHash)
261 if err != nil {
262 return err
263 }
264
265 createTime, editTime, err := readOperationPackClock(repo, commit)
266 if err != nil {
267 return err
268 }
269
270 // if we have more than one commit, we need to find the root to have the create time
271 if len(commit.Parents) > 0 {
272 for len(commit.Parents) > 0 {
273 // The path to the root is irrelevant.
274 commit, err = repo.ReadCommit(commit.Parents[0])
275 if err != nil {
276 return err
277 }
278 }
279 createTime, _, err = readOperationPackClock(repo, commit)
280 if err != nil {
281 return err
282 }
283 }
284
285 if createTime <= 0 {
286 return fmt.Errorf("creation lamport time not set")
287 }
288 if editTime <= 0 {
289 return fmt.Errorf("creation lamport time not set")
290 }
291 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
292 if err != nil {
293 return err
294 }
295 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
296 if err != nil {
297 return err
298 }
299 return nil
300}
301
302// ReadAll read and parse all local Entity
303func ReadAll[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan bootstrap.StreamedEntity[EntityT] {
304 out := make(chan bootstrap.StreamedEntity[EntityT])
305
306 go func() {
307 defer close(out)
308
309 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
310
311 refs, err := repo.ListRefs(refPrefix)
312 if err != nil {
313 out <- bootstrap.StreamedEntity[EntityT]{Err: err}
314 return
315 }
316
317 total := int64(len(refs))
318 current := int64(1)
319
320 for _, ref := range refs {
321 e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
322
323 if err != nil {
324 out <- bootstrap.StreamedEntity[EntityT]{Err: err}
325 return
326 }
327
328 out <- bootstrap.StreamedEntity[EntityT]{
329 Entity: e,
330 CurrentEntity: current,
331 TotalEntities: total,
332 }
333 current++
334 }
335 }()
336
337 return out
338}
339
340// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
341// repo end up with correct clocks for the next write.
342func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
343 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
344
345 refs, err := repo.ListRefs(refPrefix)
346 if err != nil {
347 return err
348 }
349
350 for _, ref := range refs {
351 err = readClockNoCheck(def, repo, ref)
352 if err != nil {
353 return err
354 }
355 }
356
357 return nil
358}
359
360// Id return the Entity identifier
361func (e *Entity) Id() entity.Id {
362 // id is the id of the first operation
363 return e.FirstOp().Id()
364}
365
366// Validate check if the Entity data is valid
367func (e *Entity) Validate() error {
368 // non-empty
369 if len(e.ops) == 0 && len(e.staging) == 0 {
370 return fmt.Errorf("entity has no operations")
371 }
372
373 // check if each operation are valid
374 for _, op := range e.ops {
375 if err := op.Validate(); err != nil {
376 return err
377 }
378 }
379
380 // check if staging is valid if needed
381 for _, op := range e.staging {
382 if err := op.Validate(); err != nil {
383 return err
384 }
385 }
386
387 // Check that there is no colliding operation's ID
388 ids := make(map[entity.Id]struct{})
389 for _, op := range e.Operations() {
390 if _, ok := ids[op.Id()]; ok {
391 return fmt.Errorf("id collision: %s", op.Id())
392 }
393 ids[op.Id()] = struct{}{}
394 }
395
396 return nil
397}
398
399// Operations return the ordered operations
400func (e *Entity) Operations() []Operation {
401 return append(e.ops, e.staging...)
402}
403
404// FirstOp lookup for the very first operation of the Entity
405func (e *Entity) FirstOp() Operation {
406 for _, op := range e.ops {
407 return op
408 }
409 for _, op := range e.staging {
410 return op
411 }
412 return nil
413}
414
415// LastOp lookup for the very last operation of the Entity
416func (e *Entity) LastOp() Operation {
417 if len(e.staging) > 0 {
418 return e.staging[len(e.staging)-1]
419 }
420 if len(e.ops) > 0 {
421 return e.ops[len(e.ops)-1]
422 }
423 return nil
424}
425
426// Append add a new Operation to the Entity
427func (e *Entity) Append(op Operation) {
428 e.staging = append(e.staging, op)
429}
430
431// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
432func (e *Entity) NeedCommit() bool {
433 return len(e.staging) > 0
434}
435
436// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
437// is already in sync with the repository.
438func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
439 if e.NeedCommit() {
440 return e.Commit(repo)
441 }
442 return nil
443}
444
445// Commit write the appended operations in the repository
446func (e *Entity) Commit(repo repository.ClockedRepo) error {
447 if !e.NeedCommit() {
448 return fmt.Errorf("can't commit an entity with no pending operation")
449 }
450
451 err := e.Validate()
452 if err != nil {
453 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
454 }
455
456 for len(e.staging) > 0 {
457 var author entity.Identity
458 var toCommit []Operation
459
460 // Split into chunks with the same author
461 for len(e.staging) > 0 {
462 op := e.staging[0]
463 if author != nil && op.Author().Id() != author.Id() {
464 break
465 }
466 author = e.staging[0].Author()
467 toCommit = append(toCommit, op)
468 e.staging = e.staging[1:]
469 }
470
471 e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
472 if err != nil {
473 return err
474 }
475
476 opp := &operationPack{
477 Author: author,
478 Operations: toCommit,
479 EditTime: e.editTime,
480 }
481
482 if e.lastCommit == "" {
483 e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
484 if err != nil {
485 return err
486 }
487 opp.CreateTime = e.createTime
488 }
489
490 var parentCommit []repository.Hash
491 if e.lastCommit != "" {
492 parentCommit = []repository.Hash{e.lastCommit}
493 }
494
495 commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
496 if err != nil {
497 return err
498 }
499
500 e.lastCommit = commitHash
501 e.ops = append(e.ops, toCommit...)
502 }
503
504 // not strictly necessary but make equality testing easier in tests
505 e.staging = nil
506
507 // Create or update the Git reference for this entity
508 // When pushing later, the remote will ensure that this ref update
509 // is fast-forward, that is no data has been overwritten.
510 ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
511 return repo.UpdateRef(ref, e.lastCommit)
512}
513
514// CreateLamportTime return the Lamport time of creation
515func (e *Entity) CreateLamportTime() lamport.Time {
516 return e.createTime
517}
518
519// EditLamportTime return the Lamport time of the last edition
520func (e *Entity) EditLamportTime() lamport.Time {
521 return e.editTime
522}