1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 bootstrap "github.com/MichaelMure/git-bug/entity/boostrap"
15 "github.com/MichaelMure/git-bug/repository"
16 "github.com/MichaelMure/git-bug/util/lamport"
17)
18
19const refsPattern = "refs/%s/%s"
20const creationClockPattern = "%s-create"
21const editClockPattern = "%s-edit"
22
23type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
24
25// Definition hold the details defining one specialization of an Entity.
26type Definition struct {
27 // the name of the entity (bug, pull-request, ...), for human consumption
28 Typename string
29 // the Namespace in git references (bugs, prs, ...)
30 Namespace string
31 // a function decoding a JSON message into an Operation
32 OperationUnmarshaler OperationUnmarshaler
33 // the expected format version number, that can be used for data migration/upgrade
34 FormatVersion uint
35}
36
37// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
38type Entity struct {
39 // A Lamport clock is a logical clock that allow to order event
40 // inside a distributed system.
41 // It must be the first field in this struct due to https://github.com/golang/go/issues/36606
42 createTime lamport.Time
43 editTime lamport.Time
44
45 Definition
46
47 // operations that are already stored in the repository
48 ops []Operation
49 // operations not yet stored in the repository
50 staging []Operation
51
52 lastCommit repository.Hash
53}
54
55// New create an empty Entity
56func New(definition Definition) *Entity {
57 return &Entity{
58 Definition: definition,
59 }
60}
61
62// Read will read and decode a stored local Entity from a repository
63func Read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
64 if err := id.Validate(); err != nil {
65 return *new(EntityT), errors.Wrap(err, "invalid id")
66 }
67
68 ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
69
70 return read[EntityT](def, wrapper, repo, resolvers, ref)
71}
72
73// readRemote will read and decode a stored remote Entity from a repository
74func readRemote[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
75 if err := id.Validate(); err != nil {
76 return *new(EntityT), errors.Wrap(err, "invalid id")
77 }
78
79 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
80
81 return read[EntityT](def, wrapper, repo, resolvers, ref)
82}
83
84// read fetch from git and decode an Entity at an arbitrary git reference.
85func read[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
86 rootHash, err := repo.ResolveRef(ref)
87 if err == repository.ErrNotFound {
88 return *new(EntityT), entity.NewErrNotFound(def.Typename)
89 }
90 if err != nil {
91 return *new(EntityT), err
92 }
93
94 // Perform a breadth-first search to get a topological order of the DAG where we discover the
95 // parents commit and go back in time up to the chronological root
96
97 queue := make([]repository.Hash, 0, 32)
98 visited := make(map[repository.Hash]struct{})
99 BFSOrder := make([]repository.Commit, 0, 32)
100
101 queue = append(queue, rootHash)
102 visited[rootHash] = struct{}{}
103
104 for len(queue) > 0 {
105 // pop
106 hash := queue[0]
107 queue = queue[1:]
108
109 commit, err := repo.ReadCommit(hash)
110 if err != nil {
111 return *new(EntityT), err
112 }
113
114 BFSOrder = append(BFSOrder, commit)
115
116 for _, parent := range commit.Parents {
117 if _, ok := visited[parent]; !ok {
118 queue = append(queue, parent)
119 // mark as visited
120 visited[parent] = struct{}{}
121 }
122 }
123 }
124
125 // Now, we can reverse this topological order and read the commits in an order where
126 // we are sure to have read all the chronological ancestors when we read a commit.
127
128 // Next step is to:
129 // 1) read the operationPacks
130 // 2) make sure that clocks causality respect the DAG topology.
131
132 oppMap := make(map[repository.Hash]*operationPack)
133 var opsCount int
134
135 for i := len(BFSOrder) - 1; i >= 0; i-- {
136 commit := BFSOrder[i]
137 isFirstCommit := i == len(BFSOrder)-1
138 isMerge := len(commit.Parents) > 1
139
140 // Verify DAG structure: single chronological root, so only the root
141 // can have no parents. Said otherwise, the DAG need to have exactly
142 // one leaf.
143 if !isFirstCommit && len(commit.Parents) == 0 {
144 return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
145 }
146
147 opp, err := readOperationPack(def, repo, resolvers, commit)
148 if err != nil {
149 return *new(EntityT), err
150 }
151
152 err = opp.Validate()
153 if err != nil {
154 return *new(EntityT), err
155 }
156
157 if isMerge && len(opp.Operations) > 0 {
158 return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
159 }
160
161 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
162 if isFirstCommit && opp.CreateTime <= 0 {
163 return *new(EntityT), fmt.Errorf("creation lamport time not set")
164 }
165
166 // make sure that the lamport clocks causality match the DAG topology
167 for _, parentHash := range commit.Parents {
168 parentPack, ok := oppMap[parentHash]
169 if !ok {
170 panic("DFS failed")
171 }
172
173 if parentPack.EditTime >= opp.EditTime {
174 return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
175 }
176
177 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
178 // that the clocks don't jump too far in the future
179 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
180 // as long as there is one valid chain of small hops, it's fine.
181 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
182 return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
183 }
184 }
185
186 oppMap[commit.Hash] = opp
187 opsCount += len(opp.Operations)
188 }
189
190 // The clocks are fine, we witness them
191 for _, opp := range oppMap {
192 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
193 if err != nil {
194 return *new(EntityT), err
195 }
196 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
197 if err != nil {
198 return *new(EntityT), err
199 }
200 }
201
202 // Now that we know that the topological order and clocks are fine, we order the operationPacks
203 // based on the logical clocks, entirely ignoring the DAG topology
204
205 oppSlice := make([]*operationPack, 0, len(oppMap))
206 for _, pack := range oppMap {
207 oppSlice = append(oppSlice, pack)
208 }
209 sort.Slice(oppSlice, func(i, j int) bool {
210 // Primary ordering with the EditTime.
211 if oppSlice[i].EditTime != oppSlice[j].EditTime {
212 return oppSlice[i].EditTime < oppSlice[j].EditTime
213 }
214 // We have equal EditTime, which means we have concurrent edition over different machines, and we
215 // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
216 // As a secondary ordering, we can order based on a hash of the serialized Operations in the
217 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
218 // This is a lexicographic ordering on the stringified ID.
219 return oppSlice[i].Id() < oppSlice[j].Id()
220 })
221
222 // Now that we ordered the operationPacks, we have the order of the Operations
223
224 ops := make([]Operation, 0, opsCount)
225 var createTime lamport.Time
226 var editTime lamport.Time
227 for _, pack := range oppSlice {
228 for _, operation := range pack.Operations {
229 ops = append(ops, operation)
230 }
231 if pack.CreateTime > createTime {
232 createTime = pack.CreateTime
233 }
234 if pack.EditTime > editTime {
235 editTime = pack.EditTime
236 }
237 }
238
239 return wrapper(&Entity{
240 Definition: def,
241 ops: ops,
242 lastCommit: rootHash,
243 createTime: createTime,
244 editTime: editTime,
245 }), nil
246}
247
248// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
249// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
250// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
251// operation blobs can be implemented instead.
252func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
253 rootHash, err := repo.ResolveRef(ref)
254 if err == repository.ErrNotFound {
255 return entity.NewErrNotFound(def.Typename)
256 }
257 if err != nil {
258 return err
259 }
260
261 commit, err := repo.ReadCommit(rootHash)
262 if err != nil {
263 return err
264 }
265
266 createTime, editTime, err := readOperationPackClock(repo, commit)
267 if err != nil {
268 return err
269 }
270
271 // if we have more than one commit, we need to find the root to have the create time
272 if len(commit.Parents) > 0 {
273 for len(commit.Parents) > 0 {
274 // The path to the root is irrelevant.
275 commit, err = repo.ReadCommit(commit.Parents[0])
276 if err != nil {
277 return err
278 }
279 }
280 createTime, _, err = readOperationPackClock(repo, commit)
281 if err != nil {
282 return err
283 }
284 }
285
286 if createTime <= 0 {
287 return fmt.Errorf("creation lamport time not set")
288 }
289 if editTime <= 0 {
290 return fmt.Errorf("creation lamport time not set")
291 }
292 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
293 if err != nil {
294 return err
295 }
296 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
297 if err != nil {
298 return err
299 }
300 return nil
301}
302
303// ReadAll read and parse all local Entity
304func ReadAll[EntityT entity.Bare](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan bootstrap.StreamedEntity[EntityT] {
305 out := make(chan bootstrap.StreamedEntity[EntityT])
306
307 go func() {
308 defer close(out)
309
310 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
311
312 refs, err := repo.ListRefs(refPrefix)
313 if err != nil {
314 out <- bootstrap.StreamedEntity[EntityT]{Err: err}
315 return
316 }
317
318 total := int64(len(refs))
319 current := int64(1)
320
321 for _, ref := range refs {
322 e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
323
324 if err != nil {
325 out <- bootstrap.StreamedEntity[EntityT]{Err: err}
326 return
327 }
328
329 out <- bootstrap.StreamedEntity[EntityT]{
330 Entity: e,
331 CurrentEntity: current,
332 TotalEntities: total,
333 }
334 current++
335 }
336 }()
337
338 return out
339}
340
341// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
342// repo end up with correct clocks for the next write.
343func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
344 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
345
346 refs, err := repo.ListRefs(refPrefix)
347 if err != nil {
348 return err
349 }
350
351 for _, ref := range refs {
352 err = readClockNoCheck(def, repo, ref)
353 if err != nil {
354 return err
355 }
356 }
357
358 return nil
359}
360
361// Id return the Entity identifier
362func (e *Entity) Id() entity.Id {
363 // id is the id of the first operation
364 return e.FirstOp().Id()
365}
366
367// Validate check if the Entity data is valid
368func (e *Entity) Validate() error {
369 // non-empty
370 if len(e.ops) == 0 && len(e.staging) == 0 {
371 return fmt.Errorf("entity has no operations")
372 }
373
374 // check if each operation are valid
375 for _, op := range e.ops {
376 if err := op.Validate(); err != nil {
377 return err
378 }
379 }
380
381 // check if staging is valid if needed
382 for _, op := range e.staging {
383 if err := op.Validate(); err != nil {
384 return err
385 }
386 }
387
388 // Check that there is no colliding operation's ID
389 ids := make(map[entity.Id]struct{})
390 for _, op := range e.Operations() {
391 if _, ok := ids[op.Id()]; ok {
392 return fmt.Errorf("id collision: %s", op.Id())
393 }
394 ids[op.Id()] = struct{}{}
395 }
396
397 return nil
398}
399
400// Operations return the ordered operations
401func (e *Entity) Operations() []Operation {
402 return append(e.ops, e.staging...)
403}
404
405// FirstOp lookup for the very first operation of the Entity
406func (e *Entity) FirstOp() Operation {
407 for _, op := range e.ops {
408 return op
409 }
410 for _, op := range e.staging {
411 return op
412 }
413 return nil
414}
415
416// LastOp lookup for the very last operation of the Entity
417func (e *Entity) LastOp() Operation {
418 if len(e.staging) > 0 {
419 return e.staging[len(e.staging)-1]
420 }
421 if len(e.ops) > 0 {
422 return e.ops[len(e.ops)-1]
423 }
424 return nil
425}
426
427// Append add a new Operation to the Entity
428func (e *Entity) Append(op Operation) {
429 e.staging = append(e.staging, op)
430}
431
432// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
433func (e *Entity) NeedCommit() bool {
434 return len(e.staging) > 0
435}
436
437// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
438// is already in sync with the repository.
439func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
440 if e.NeedCommit() {
441 return e.Commit(repo)
442 }
443 return nil
444}
445
446// Commit write the appended operations in the repository
447func (e *Entity) Commit(repo repository.ClockedRepo) error {
448 if !e.NeedCommit() {
449 return fmt.Errorf("can't commit an entity with no pending operation")
450 }
451
452 err := e.Validate()
453 if err != nil {
454 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
455 }
456
457 for len(e.staging) > 0 {
458 var author identity.Interface
459 var toCommit []Operation
460
461 // Split into chunks with the same author
462 for len(e.staging) > 0 {
463 op := e.staging[0]
464 if author != nil && op.Author().Id() != author.Id() {
465 break
466 }
467 author = e.staging[0].Author()
468 toCommit = append(toCommit, op)
469 e.staging = e.staging[1:]
470 }
471
472 e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
473 if err != nil {
474 return err
475 }
476
477 opp := &operationPack{
478 Author: author,
479 Operations: toCommit,
480 EditTime: e.editTime,
481 }
482
483 if e.lastCommit == "" {
484 e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
485 if err != nil {
486 return err
487 }
488 opp.CreateTime = e.createTime
489 }
490
491 var parentCommit []repository.Hash
492 if e.lastCommit != "" {
493 parentCommit = []repository.Hash{e.lastCommit}
494 }
495
496 commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
497 if err != nil {
498 return err
499 }
500
501 e.lastCommit = commitHash
502 e.ops = append(e.ops, toCommit...)
503 }
504
505 // not strictly necessary but make equality testing easier in tests
506 e.staging = nil
507
508 // Create or update the Git reference for this entity
509 // When pushing later, the remote will ensure that this ref update
510 // is fast-forward, that is no data has been overwritten.
511 ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
512 return repo.UpdateRef(ref, e.lastCommit)
513}
514
515// CreateLamportTime return the Lamport time of creation
516func (e *Entity) CreateLamportTime() lamport.Time {
517 return e.createTime
518}
519
520// EditLamportTime return the Lamport time of the last edition
521func (e *Entity) EditLamportTime() lamport.Time {
522 return e.editTime
523}