1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entity"
13 "github.com/MichaelMure/git-bug/identity"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22// Definition hold the details defining one specialization of an Entity.
23type Definition struct {
24 // the name of the entity (bug, pull-request, ...)
25 typename string
26 // the namespace in git (bugs, prs, ...)
27 namespace string
28 // a function decoding a JSON message into an Operation
29 operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
30 // a function loading an identity.Identity from its Id
31 identityResolver identity.Resolver
32 // the expected format version number, that can be used for data migration/upgrade
33 formatVersion uint
34}
35
36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
37type Entity struct {
38 // A Lamport clock is a logical clock that allow to order event
39 // inside a distributed system.
40 // It must be the first field in this struct due to https://github.com/golang/go/issues/36606
41 createTime lamport.Time
42 editTime lamport.Time
43
44 Definition
45
46 // operations that are already stored in the repository
47 ops []Operation
48 // operations not yet stored in the repository
49 staging []Operation
50
51 lastCommit repository.Hash
52}
53
54// New create an empty Entity
55func New(definition Definition) *Entity {
56 return &Entity{
57 Definition: definition,
58 }
59}
60
61// Read will read and decode a stored local Entity from a repository
62func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
63 if err := id.Validate(); err != nil {
64 return nil, errors.Wrap(err, "invalid id")
65 }
66
67 ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
68
69 return read(def, repo, ref)
70}
71
72// readRemote will read and decode a stored remote Entity from a repository
73func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
74 if err := id.Validate(); err != nil {
75 return nil, errors.Wrap(err, "invalid id")
76 }
77
78 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())
79
80 return read(def, repo, ref)
81}
82
83// read fetch from git and decode an Entity at an arbitrary git reference.
84func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
85 rootHash, err := repo.ResolveRef(ref)
86 if err != nil {
87 return nil, err
88 }
89
90 // Perform a breadth-first search to get a topological order of the DAG where we discover the
91 // parents commit and go back in time up to the chronological root
92
93 queue := make([]repository.Hash, 0, 32)
94 visited := make(map[repository.Hash]struct{})
95 BFSOrder := make([]repository.Commit, 0, 32)
96
97 queue = append(queue, rootHash)
98 visited[rootHash] = struct{}{}
99
100 for len(queue) > 0 {
101 // pop
102 hash := queue[0]
103 queue = queue[1:]
104
105 commit, err := repo.ReadCommit(hash)
106 if err != nil {
107 return nil, err
108 }
109
110 BFSOrder = append(BFSOrder, commit)
111
112 for _, parent := range commit.Parents {
113 if _, ok := visited[parent]; !ok {
114 queue = append(queue, parent)
115 // mark as visited
116 visited[parent] = struct{}{}
117 }
118 }
119 }
120
121 // Now, we can reverse this topological order and read the commits in an order where
122 // we are sure to have read all the chronological ancestors when we read a commit.
123
124 // Next step is to:
125 // 1) read the operationPacks
126 // 2) make sure that the clocks causality respect the DAG topology.
127
128 oppMap := make(map[repository.Hash]*operationPack)
129 var opsCount int
130
131 for i := len(BFSOrder) - 1; i >= 0; i-- {
132 commit := BFSOrder[i]
133 isFirstCommit := i == len(BFSOrder)-1
134 isMerge := len(commit.Parents) > 1
135
136 // Verify DAG structure: single chronological root, so only the root
137 // can have no parents. Said otherwise, the DAG need to have exactly
138 // one leaf.
139 if !isFirstCommit && len(commit.Parents) == 0 {
140 return nil, fmt.Errorf("multiple leafs in the entity DAG")
141 }
142
143 opp, err := readOperationPack(def, repo, commit)
144 if err != nil {
145 return nil, err
146 }
147
148 err = opp.Validate()
149 if err != nil {
150 return nil, err
151 }
152
153 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
154 if isFirstCommit && opp.CreateTime <= 0 {
155 return nil, fmt.Errorf("creation lamport time not set")
156 }
157
158 // make sure that the lamport clocks causality match the DAG topology
159 for _, parentHash := range commit.Parents {
160 parentPack, ok := oppMap[parentHash]
161 if !ok {
162 panic("DFS failed")
163 }
164
165 if parentPack.EditTime >= opp.EditTime {
166 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
167 }
168
169 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
170 // that the clocks don't jump too far in the future
171 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
172 // as long as there is one valid chain of small hops, it's fine.
173 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
174 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
175 }
176 }
177
178 oppMap[commit.Hash] = opp
179 opsCount += len(opp.Operations)
180 }
181
182 // The clocks are fine, we witness them
183 for _, opp := range oppMap {
184 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
185 if err != nil {
186 return nil, err
187 }
188 err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
189 if err != nil {
190 return nil, err
191 }
192 }
193
194 // Now that we know that the topological order and clocks are fine, we order the operationPacks
195 // based on the logical clocks, entirely ignoring the DAG topology
196
197 oppSlice := make([]*operationPack, 0, len(oppMap))
198 for _, pack := range oppMap {
199 oppSlice = append(oppSlice, pack)
200 }
201 sort.Slice(oppSlice, func(i, j int) bool {
202 // Primary ordering with the EditTime.
203 if oppSlice[i].EditTime != oppSlice[j].EditTime {
204 return oppSlice[i].EditTime < oppSlice[j].EditTime
205 }
206 // We have equal EditTime, which means we have concurrent edition over different machines and we
207 // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
208 // As a secondary ordering, we can order based on a hash of the serialized Operations in the
209 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
210 // This is a lexicographic ordering on the stringified ID.
211 return oppSlice[i].Id() < oppSlice[j].Id()
212 })
213
214 // Now that we ordered the operationPacks, we have the order of the Operations
215
216 ops := make([]Operation, 0, opsCount)
217 var createTime lamport.Time
218 var editTime lamport.Time
219 for _, pack := range oppSlice {
220 for _, operation := range pack.Operations {
221 ops = append(ops, operation)
222 }
223 if pack.CreateTime > createTime {
224 createTime = pack.CreateTime
225 }
226 if pack.EditTime > editTime {
227 editTime = pack.EditTime
228 }
229 }
230
231 return &Entity{
232 Definition: def,
233 ops: ops,
234 lastCommit: rootHash,
235 createTime: createTime,
236 editTime: editTime,
237 }, nil
238}
239
240type StreamedEntity struct {
241 Entity *Entity
242 Err error
243}
244
245// ReadAll read and parse all local Entity
246func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
247 out := make(chan StreamedEntity)
248
249 go func() {
250 defer close(out)
251
252 refPrefix := fmt.Sprintf("refs/%s/", def.namespace)
253
254 refs, err := repo.ListRefs(refPrefix)
255 if err != nil {
256 out <- StreamedEntity{Err: err}
257 return
258 }
259
260 for _, ref := range refs {
261 e, err := read(def, repo, ref)
262
263 if err != nil {
264 out <- StreamedEntity{Err: err}
265 return
266 }
267
268 out <- StreamedEntity{Entity: e}
269 }
270 }()
271
272 return out
273}
274
275// Id return the Entity identifier
276func (e *Entity) Id() entity.Id {
277 // id is the id of the first operation
278 return e.FirstOp().Id()
279}
280
281// Validate check if the Entity data is valid
282func (e *Entity) Validate() error {
283 // non-empty
284 if len(e.ops) == 0 && len(e.staging) == 0 {
285 return fmt.Errorf("entity has no operations")
286 }
287
288 // check if each operations are valid
289 for _, op := range e.ops {
290 if err := op.Validate(); err != nil {
291 return err
292 }
293 }
294
295 // check if staging is valid if needed
296 for _, op := range e.staging {
297 if err := op.Validate(); err != nil {
298 return err
299 }
300 }
301
302 // Check that there is no colliding operation's ID
303 ids := make(map[entity.Id]struct{})
304 for _, op := range e.Operations() {
305 if _, ok := ids[op.Id()]; ok {
306 return fmt.Errorf("id collision: %s", op.Id())
307 }
308 ids[op.Id()] = struct{}{}
309 }
310
311 return nil
312}
313
314// Operations return the ordered operations
315func (e *Entity) Operations() []Operation {
316 return append(e.ops, e.staging...)
317}
318
319// FirstOp lookup for the very first operation of the Entity
320func (e *Entity) FirstOp() Operation {
321 for _, op := range e.ops {
322 return op
323 }
324 for _, op := range e.staging {
325 return op
326 }
327 return nil
328}
329
330// LastOp lookup for the very last operation of the Entity
331func (e *Entity) LastOp() Operation {
332 if len(e.staging) > 0 {
333 return e.staging[len(e.staging)-1]
334 }
335 if len(e.ops) > 0 {
336 return e.ops[len(e.ops)-1]
337 }
338 return nil
339}
340
341// Append add a new Operation to the Entity
342func (e *Entity) Append(op Operation) {
343 e.staging = append(e.staging, op)
344}
345
346// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
347func (e *Entity) NeedCommit() bool {
348 return len(e.staging) > 0
349}
350
351// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
352// is already in sync with the repository.
353func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
354 if e.NeedCommit() {
355 return e.Commit(repo)
356 }
357 return nil
358}
359
360// Commit write the appended operations in the repository
361func (e *Entity) Commit(repo repository.ClockedRepo) error {
362 if !e.NeedCommit() {
363 return fmt.Errorf("can't commit an entity with no pending operation")
364 }
365
366 err := e.Validate()
367 if err != nil {
368 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
369 }
370
371 var author identity.Interface
372 for _, op := range e.staging {
373 if author != nil && op.Author() != author {
374 return fmt.Errorf("operations with different author")
375 }
376 author = op.Author()
377 }
378
379 e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
380 if err != nil {
381 return err
382 }
383
384 opp := &operationPack{
385 Author: author,
386 Operations: e.staging,
387 EditTime: e.editTime,
388 }
389
390 if e.lastCommit == "" {
391 e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
392 if err != nil {
393 return err
394 }
395 opp.CreateTime = e.createTime
396 }
397
398 var commitHash repository.Hash
399 if e.lastCommit == "" {
400 commitHash, err = opp.Write(e.Definition, repo)
401 } else {
402 commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
403 }
404
405 if err != nil {
406 return err
407 }
408
409 e.lastCommit = commitHash
410 e.ops = append(e.ops, e.staging...)
411 e.staging = nil
412
413 // Create or update the Git reference for this entity
414 // When pushing later, the remote will ensure that this ref update
415 // is fast-forward, that is no data has been overwritten.
416 ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
417 return repo.UpdateRef(ref, commitHash)
418}
419
420// CreateLamportTime return the Lamport time of creation
421func (e *Entity) CreateLamportTime() lamport.Time {
422 return e.createTime
423}
424
425// EditLamportTime return the Lamport time of the last edition
426func (e *Entity) EditLamportTime() lamport.Time {
427 return e.editTime
428}