1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entities/identity"
13 "github.com/MichaelMure/git-bug/entity"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
23
24// Definition hold the details defining one specialization of an Entity.
25type Definition struct {
26 // the name of the entity (bug, pull-request, ...), for human consumption
27 Typename string
28 // the Namespace in git references (bugs, prs, ...)
29 Namespace string
30 // a function decoding a JSON message into an Operation
31 OperationUnmarshaler OperationUnmarshaler
32 // the expected format version number, that can be used for data migration/upgrade
33 FormatVersion uint
34}
35
36type Actions[EntityT entity.Interface] struct {
37 Wrap func(e *Entity) EntityT
38 New func() EntityT
39 Read func(repo repository.ClockedRepo, id entity.Id) (EntityT, error)
40 ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
41 ReadAll func(repo repository.ClockedRepo) <-chan StreamedEntity[EntityT]
42 ListLocalIds func(repo repository.Repo) ([]entity.Id, error)
43 Fetch func(repo repository.Repo, remote string) (string, error)
44 Push func(repo repository.Repo, remote string) (string, error)
45 Pull func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) error
46 MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
47}
48
49// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
50type Entity struct {
51 // A Lamport clock is a logical clock that allow to order event
52 // inside a distributed system.
53 // It must be the first field in this struct due to https://github.com/golang/go/issues/36606
54 createTime lamport.Time
55 editTime lamport.Time
56
57 Definition
58
59 // operations that are already stored in the repository
60 ops []Operation
61 // operations not yet stored in the repository
62 staging []Operation
63
64 lastCommit repository.Hash
65}
66
67// New create an empty Entity
68func New(definition Definition) *Entity {
69 return &Entity{
70 Definition: definition,
71 }
72}
73
74// Read will read and decode a stored local Entity from a repository
75func Read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
76 if err := id.Validate(); err != nil {
77 return *new(EntityT), errors.Wrap(err, "invalid id")
78 }
79
80 ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
81
82 return read[EntityT](def, wrapper, repo, resolvers, ref)
83}
84
85// readRemote will read and decode a stored remote Entity from a repository
86func readRemote[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
87 if err := id.Validate(); err != nil {
88 return *new(EntityT), errors.Wrap(err, "invalid id")
89 }
90
91 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
92
93 return read[EntityT](def, wrapper, repo, resolvers, ref)
94}
95
96// read fetch from git and decode an Entity at an arbitrary git reference.
97func read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
98 rootHash, err := repo.ResolveRef(ref)
99 if err != nil {
100 return *new(EntityT), err
101 }
102
103 // Perform a breadth-first search to get a topological order of the DAG where we discover the
104 // parents commit and go back in time up to the chronological root
105
106 queue := make([]repository.Hash, 0, 32)
107 visited := make(map[repository.Hash]struct{})
108 BFSOrder := make([]repository.Commit, 0, 32)
109
110 queue = append(queue, rootHash)
111 visited[rootHash] = struct{}{}
112
113 for len(queue) > 0 {
114 // pop
115 hash := queue[0]
116 queue = queue[1:]
117
118 commit, err := repo.ReadCommit(hash)
119 if err != nil {
120 return *new(EntityT), err
121 }
122
123 BFSOrder = append(BFSOrder, commit)
124
125 for _, parent := range commit.Parents {
126 if _, ok := visited[parent]; !ok {
127 queue = append(queue, parent)
128 // mark as visited
129 visited[parent] = struct{}{}
130 }
131 }
132 }
133
134 // Now, we can reverse this topological order and read the commits in an order where
135 // we are sure to have read all the chronological ancestors when we read a commit.
136
137 // Next step is to:
138 // 1) read the operationPacks
139 // 2) make sure that clocks causality respect the DAG topology.
140
141 oppMap := make(map[repository.Hash]*operationPack)
142 var opsCount int
143
144 for i := len(BFSOrder) - 1; i >= 0; i-- {
145 commit := BFSOrder[i]
146 isFirstCommit := i == len(BFSOrder)-1
147 isMerge := len(commit.Parents) > 1
148
149 // Verify DAG structure: single chronological root, so only the root
150 // can have no parents. Said otherwise, the DAG need to have exactly
151 // one leaf.
152 if !isFirstCommit && len(commit.Parents) == 0 {
153 return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
154 }
155
156 opp, err := readOperationPack(def, repo, resolvers, commit)
157 if err != nil {
158 return *new(EntityT), err
159 }
160
161 err = opp.Validate()
162 if err != nil {
163 return *new(EntityT), err
164 }
165
166 if isMerge && len(opp.Operations) > 0 {
167 return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
168 }
169
170 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
171 if isFirstCommit && opp.CreateTime <= 0 {
172 return *new(EntityT), fmt.Errorf("creation lamport time not set")
173 }
174
175 // make sure that the lamport clocks causality match the DAG topology
176 for _, parentHash := range commit.Parents {
177 parentPack, ok := oppMap[parentHash]
178 if !ok {
179 panic("DFS failed")
180 }
181
182 if parentPack.EditTime >= opp.EditTime {
183 return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
184 }
185
186 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
187 // that the clocks don't jump too far in the future
188 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
189 // as long as there is one valid chain of small hops, it's fine.
190 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
191 return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
192 }
193 }
194
195 oppMap[commit.Hash] = opp
196 opsCount += len(opp.Operations)
197 }
198
199 // The clocks are fine, we witness them
200 for _, opp := range oppMap {
201 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
202 if err != nil {
203 return *new(EntityT), err
204 }
205 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
206 if err != nil {
207 return *new(EntityT), err
208 }
209 }
210
211 // Now that we know that the topological order and clocks are fine, we order the operationPacks
212 // based on the logical clocks, entirely ignoring the DAG topology
213
214 oppSlice := make([]*operationPack, 0, len(oppMap))
215 for _, pack := range oppMap {
216 oppSlice = append(oppSlice, pack)
217 }
218 sort.Slice(oppSlice, func(i, j int) bool {
219 // Primary ordering with the EditTime.
220 if oppSlice[i].EditTime != oppSlice[j].EditTime {
221 return oppSlice[i].EditTime < oppSlice[j].EditTime
222 }
223 // We have equal EditTime, which means we have concurrent edition over different machines, and we
224 // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
225 // As a secondary ordering, we can order based on a hash of the serialized Operations in the
226 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
227 // This is a lexicographic ordering on the stringified ID.
228 return oppSlice[i].Id() < oppSlice[j].Id()
229 })
230
231 // Now that we ordered the operationPacks, we have the order of the Operations
232
233 ops := make([]Operation, 0, opsCount)
234 var createTime lamport.Time
235 var editTime lamport.Time
236 for _, pack := range oppSlice {
237 for _, operation := range pack.Operations {
238 ops = append(ops, operation)
239 }
240 if pack.CreateTime > createTime {
241 createTime = pack.CreateTime
242 }
243 if pack.EditTime > editTime {
244 editTime = pack.EditTime
245 }
246 }
247
248 return wrapper(&Entity{
249 Definition: def,
250 ops: ops,
251 lastCommit: rootHash,
252 createTime: createTime,
253 editTime: editTime,
254 }), nil
255}
256
257// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
258// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
259// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
260// operation blobs can be implemented instead.
261func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
262 rootHash, err := repo.ResolveRef(ref)
263 if err != nil {
264 return err
265 }
266
267 commit, err := repo.ReadCommit(rootHash)
268 if err != nil {
269 return err
270 }
271
272 createTime, editTime, err := readOperationPackClock(repo, commit)
273 if err != nil {
274 return err
275 }
276
277 // if we have more than one commit, we need to find the root to have the create time
278 if len(commit.Parents) > 0 {
279 for len(commit.Parents) > 0 {
280 // The path to the root is irrelevant.
281 commit, err = repo.ReadCommit(commit.Parents[0])
282 if err != nil {
283 return err
284 }
285 }
286 createTime, _, err = readOperationPackClock(repo, commit)
287 if err != nil {
288 return err
289 }
290 }
291
292 if createTime <= 0 {
293 return fmt.Errorf("creation lamport time not set")
294 }
295 if editTime <= 0 {
296 return fmt.Errorf("creation lamport time not set")
297 }
298 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
299 if err != nil {
300 return err
301 }
302 err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
303 if err != nil {
304 return err
305 }
306 return nil
307}
308
309type StreamedEntity[EntityT entity.Interface] struct {
310 Entity EntityT
311 Err error
312}
313
314// ReadAll read and parse all local Entity
315func ReadAll[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity[EntityT] {
316 out := make(chan StreamedEntity[EntityT])
317
318 go func() {
319 defer close(out)
320
321 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
322
323 refs, err := repo.ListRefs(refPrefix)
324 if err != nil {
325 out <- StreamedEntity[EntityT]{Err: err}
326 return
327 }
328
329 for _, ref := range refs {
330 e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
331
332 if err != nil {
333 out <- StreamedEntity[EntityT]{Err: err}
334 return
335 }
336
337 out <- StreamedEntity[EntityT]{Entity: e}
338 }
339 }()
340
341 return out
342}
343
344// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
345// repo end up with correct clocks for the next write.
346func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
347 refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
348
349 refs, err := repo.ListRefs(refPrefix)
350 if err != nil {
351 return err
352 }
353
354 for _, ref := range refs {
355 err = readClockNoCheck(def, repo, ref)
356 if err != nil {
357 return err
358 }
359 }
360
361 return nil
362}
363
364// Id return the Entity identifier
365func (e *Entity) Id() entity.Id {
366 // id is the id of the first operation
367 return e.FirstOp().Id()
368}
369
370// Validate check if the Entity data is valid
371func (e *Entity) Validate() error {
372 // non-empty
373 if len(e.ops) == 0 && len(e.staging) == 0 {
374 return fmt.Errorf("entity has no operations")
375 }
376
377 // check if each operation are valid
378 for _, op := range e.ops {
379 if err := op.Validate(); err != nil {
380 return err
381 }
382 }
383
384 // check if staging is valid if needed
385 for _, op := range e.staging {
386 if err := op.Validate(); err != nil {
387 return err
388 }
389 }
390
391 // Check that there is no colliding operation's ID
392 ids := make(map[entity.Id]struct{})
393 for _, op := range e.Operations() {
394 if _, ok := ids[op.Id()]; ok {
395 return fmt.Errorf("id collision: %s", op.Id())
396 }
397 ids[op.Id()] = struct{}{}
398 }
399
400 return nil
401}
402
403// Operations return the ordered operations
404func (e *Entity) Operations() []Operation {
405 return append(e.ops, e.staging...)
406}
407
408// FirstOp lookup for the very first operation of the Entity
409func (e *Entity) FirstOp() Operation {
410 for _, op := range e.ops {
411 return op
412 }
413 for _, op := range e.staging {
414 return op
415 }
416 return nil
417}
418
419// LastOp lookup for the very last operation of the Entity
420func (e *Entity) LastOp() Operation {
421 if len(e.staging) > 0 {
422 return e.staging[len(e.staging)-1]
423 }
424 if len(e.ops) > 0 {
425 return e.ops[len(e.ops)-1]
426 }
427 return nil
428}
429
430// Append add a new Operation to the Entity
431func (e *Entity) Append(op Operation) {
432 e.staging = append(e.staging, op)
433}
434
435// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
436func (e *Entity) NeedCommit() bool {
437 return len(e.staging) > 0
438}
439
440// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
441// is already in sync with the repository.
442func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
443 if e.NeedCommit() {
444 return e.Commit(repo)
445 }
446 return nil
447}
448
449// Commit write the appended operations in the repository
450func (e *Entity) Commit(repo repository.ClockedRepo) error {
451 if !e.NeedCommit() {
452 return fmt.Errorf("can't commit an entity with no pending operation")
453 }
454
455 err := e.Validate()
456 if err != nil {
457 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
458 }
459
460 for len(e.staging) > 0 {
461 var author identity.Interface
462 var toCommit []Operation
463
464 // Split into chunks with the same author
465 for len(e.staging) > 0 {
466 op := e.staging[0]
467 if author != nil && op.Author().Id() != author.Id() {
468 break
469 }
470 author = e.staging[0].Author()
471 toCommit = append(toCommit, op)
472 e.staging = e.staging[1:]
473 }
474
475 e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
476 if err != nil {
477 return err
478 }
479
480 opp := &operationPack{
481 Author: author,
482 Operations: toCommit,
483 EditTime: e.editTime,
484 }
485
486 if e.lastCommit == "" {
487 e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
488 if err != nil {
489 return err
490 }
491 opp.CreateTime = e.createTime
492 }
493
494 var parentCommit []repository.Hash
495 if e.lastCommit != "" {
496 parentCommit = []repository.Hash{e.lastCommit}
497 }
498
499 commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
500 if err != nil {
501 return err
502 }
503
504 e.lastCommit = commitHash
505 e.ops = append(e.ops, toCommit...)
506 }
507
508 // not strictly necessary but make equality testing easier in tests
509 e.staging = nil
510
511 // Create or update the Git reference for this entity
512 // When pushing later, the remote will ensure that this ref update
513 // is fast-forward, that is no data has been overwritten.
514 ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
515 return repo.UpdateRef(ref, e.lastCommit)
516}
517
518// CreateLamportTime return the Lamport time of creation
519func (e *Entity) CreateLamportTime() lamport.Time {
520 return e.createTime
521}
522
523// EditLamportTime return the Lamport time of the last edition
524func (e *Entity) EditLamportTime() lamport.Time {
525 return e.editTime
526}