1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entity"
13 "github.com/MichaelMure/git-bug/identity"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22// Definition hold the details defining one specialization of an Entity.
23type Definition struct {
24 // the name of the entity (bug, pull-request, ...)
25 typename string
26 // the namespace in git (bugs, prs, ...)
27 namespace string
28 // a function decoding a JSON message into an Operation
29 operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
30 // a function loading an identity.Identity from its Id
31 identityResolver identity.Resolver
32 // the expected format version number, that can be used for data migration/upgrade
33 formatVersion uint
34}
35
36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
37type Entity struct {
38 Definition
39
40 // operations that are already stored in the repository
41 ops []Operation
42 // operations not yet stored in the repository
43 staging []Operation
44
45 // TODO: add here createTime and editTime
46
47 // // TODO: doesn't seems to actually be useful over the topological sort ? Timestamp can be generated from graph depth
48 // // TODO: maybe EditTime is better because it could spread ops in consecutive groups on the logical timeline --> avoid interleaving
49 // packClock lamport.Clock
50 lastCommit repository.Hash
51}
52
53// New create an empty Entity
54func New(definition Definition) *Entity {
55 return &Entity{
56 Definition: definition,
57 // packClock: lamport.NewMemClock(),
58 }
59}
60
61// Read will read and decode a stored local Entity from a repository
62func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
63 if err := id.Validate(); err != nil {
64 return nil, errors.Wrap(err, "invalid id")
65 }
66
67 ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
68
69 return read(def, repo, ref)
70}
71
72// readRemote will read and decode a stored remote Entity from a repository
73func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
74 if err := id.Validate(); err != nil {
75 return nil, errors.Wrap(err, "invalid id")
76 }
77
78 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())
79
80 return read(def, repo, ref)
81}
82
83// read fetch from git and decode an Entity at an arbitrary git reference.
84func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
85 rootHash, err := repo.ResolveRef(ref)
86 if err != nil {
87 return nil, err
88 }
89
90 // Perform a depth-first search to get a topological order of the DAG where we discover the
91 // parents commit and go back in time up to the chronological root
92
93 stack := make([]repository.Hash, 0, 32)
94 visited := make(map[repository.Hash]struct{})
95 DFSOrder := make([]repository.Commit, 0, 32)
96
97 stack = append(stack, rootHash)
98
99 for len(stack) > 0 {
100 // pop
101 hash := stack[len(stack)-1]
102 stack = stack[:len(stack)-1]
103
104 if _, ok := visited[hash]; ok {
105 continue
106 }
107
108 // mark as visited
109 visited[hash] = struct{}{}
110
111 commit, err := repo.ReadCommit(hash)
112 if err != nil {
113 return nil, err
114 }
115
116 DFSOrder = append(DFSOrder, commit)
117
118 for _, parent := range commit.Parents {
119 stack = append(stack, parent)
120 }
121 }
122
123 // Now, we can reverse this topological order and read the commits in an order where
124 // we are sure to have read all the chronological ancestors when we read a commit.
125
126 // Next step is to:
127 // 1) read the operationPacks
128 // 2) make sure that the clocks causality respect the DAG topology.
129
130 oppMap := make(map[repository.Hash]*operationPack)
131 var opsCount int
132 // var packClock = lamport.NewMemClock()
133
134 for i := len(DFSOrder) - 1; i >= 0; i-- {
135 commit := DFSOrder[i]
136 isFirstCommit := i == len(DFSOrder)-1
137 isMerge := len(commit.Parents) > 1
138
139 // Verify DAG structure: single chronological root, so only the root
140 // can have no parents. Said otherwise, the DAG need to have exactly
141 // one leaf.
142 if !isFirstCommit && len(commit.Parents) == 0 {
143 return nil, fmt.Errorf("multiple leafs in the entity DAG")
144 }
145
146 opp, err := readOperationPack(def, repo, commit)
147 if err != nil {
148 return nil, err
149 }
150
151 err = opp.Validate()
152 if err != nil {
153 return nil, err
154 }
155
156 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
157 if isFirstCommit && opp.CreateTime <= 0 {
158 return nil, fmt.Errorf("creation lamport time not set")
159 }
160
161 // make sure that the lamport clocks causality match the DAG topology
162 for _, parentHash := range commit.Parents {
163 parentPack, ok := oppMap[parentHash]
164 if !ok {
165 panic("DFS failed")
166 }
167
168 if parentPack.EditTime >= opp.EditTime {
169 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
170 }
171
172 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
173 // that the clocks don't jump too far in the future
174 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
175 // as long as there is one valid chain of small hops, it's fine.
176 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
177 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
178 }
179
180 // TODO: PackTime is not checked
181 }
182
183 oppMap[commit.Hash] = opp
184 opsCount += len(opp.Operations)
185 }
186
187 // The clocks are fine, we witness them
188 for _, opp := range oppMap {
189 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
190 if err != nil {
191 return nil, err
192 }
193 err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
194 if err != nil {
195 return nil, err
196 }
197 // err = packClock.Witness(opp.PackTime)
198 // if err != nil {
199 // return nil, err
200 // }
201 }
202
203 // Now that we know that the topological order and clocks are fine, we order the operationPacks
204 // based on the logical clocks, entirely ignoring the DAG topology
205
206 oppSlice := make([]*operationPack, 0, len(oppMap))
207 for _, pack := range oppMap {
208 oppSlice = append(oppSlice, pack)
209 }
210 sort.Slice(oppSlice, func(i, j int) bool {
211 // Primary ordering with the dedicated "pack" Lamport time that encode causality
212 // within the entity
213 // if oppSlice[i].PackTime != oppSlice[j].PackTime {
214 // return oppSlice[i].PackTime < oppSlice[i].PackTime
215 // }
216 // We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
217 // came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
218 // enough but it can give us an edge to approach what really happened.
219 if oppSlice[i].EditTime != oppSlice[j].EditTime {
220 return oppSlice[i].EditTime < oppSlice[j].EditTime
221 }
222 // Well, what now? We still need a total ordering and the most stable possible.
223 // As a last resort, we can order based on a hash of the serialized Operations in the
224 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
225 // This is a lexicographic ordering on the stringified ID.
226 return oppSlice[i].Id() < oppSlice[j].Id()
227 })
228
229 // Now that we ordered the operationPacks, we have the order of the Operations
230
231 ops := make([]Operation, 0, opsCount)
232 for _, pack := range oppSlice {
233 for _, operation := range pack.Operations {
234 ops = append(ops, operation)
235 }
236 }
237
238 return &Entity{
239 Definition: def,
240 ops: ops,
241 // packClock: packClock,
242 lastCommit: rootHash,
243 }, nil
244}
245
246type StreamedEntity struct {
247 Entity *Entity
248 Err error
249}
250
251// ReadAll read and parse all local Entity
252func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
253 out := make(chan StreamedEntity)
254
255 go func() {
256 defer close(out)
257
258 refPrefix := fmt.Sprintf("refs/%s/", def.namespace)
259
260 refs, err := repo.ListRefs(refPrefix)
261 if err != nil {
262 out <- StreamedEntity{Err: err}
263 return
264 }
265
266 for _, ref := range refs {
267 e, err := read(def, repo, ref)
268
269 if err != nil {
270 out <- StreamedEntity{Err: err}
271 return
272 }
273
274 out <- StreamedEntity{Entity: e}
275 }
276 }()
277
278 return out
279}
280
281// Id return the Entity identifier
282func (e *Entity) Id() entity.Id {
283 // id is the id of the first operation
284 return e.FirstOp().Id()
285}
286
287// Validate check if the Entity data is valid
288func (e *Entity) Validate() error {
289 // non-empty
290 if len(e.ops) == 0 && len(e.staging) == 0 {
291 return fmt.Errorf("entity has no operations")
292 }
293
294 // check if each operations are valid
295 for _, op := range e.ops {
296 if err := op.Validate(); err != nil {
297 return err
298 }
299 }
300
301 // check if staging is valid if needed
302 for _, op := range e.staging {
303 if err := op.Validate(); err != nil {
304 return err
305 }
306 }
307
308 // Check that there is no colliding operation's ID
309 ids := make(map[entity.Id]struct{})
310 for _, op := range e.Operations() {
311 if _, ok := ids[op.Id()]; ok {
312 return fmt.Errorf("id collision: %s", op.Id())
313 }
314 ids[op.Id()] = struct{}{}
315 }
316
317 return nil
318}
319
320// Operations return the ordered operations
321func (e *Entity) Operations() []Operation {
322 return append(e.ops, e.staging...)
323}
324
325// FirstOp lookup for the very first operation of the Entity
326func (e *Entity) FirstOp() Operation {
327 for _, op := range e.ops {
328 return op
329 }
330 for _, op := range e.staging {
331 return op
332 }
333 return nil
334}
335
336// LastOp lookup for the very last operation of the Entity
337func (e *Entity) LastOp() Operation {
338 if len(e.staging) > 0 {
339 return e.staging[len(e.staging)-1]
340 }
341 if len(e.ops) > 0 {
342 return e.ops[len(e.ops)-1]
343 }
344 return nil
345}
346
347// Append add a new Operation to the Entity
348func (e *Entity) Append(op Operation) {
349 e.staging = append(e.staging, op)
350}
351
352// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
353func (e *Entity) NeedCommit() bool {
354 return len(e.staging) > 0
355}
356
357// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
358// is already in sync with the repository.
359func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
360 if e.NeedCommit() {
361 return e.Commit(repo)
362 }
363 return nil
364}
365
366// Commit write the appended operations in the repository
367func (e *Entity) Commit(repo repository.ClockedRepo) error {
368 if !e.NeedCommit() {
369 return fmt.Errorf("can't commit an entity with no pending operation")
370 }
371
372 if err := e.Validate(); err != nil {
373 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
374 }
375
376 var author identity.Interface
377 for _, op := range e.staging {
378 if author != nil && op.Author() != author {
379 return fmt.Errorf("operations with different author")
380 }
381 author = op.Author()
382 }
383
384 // increment the various clocks for this new operationPack
385 // packTime, err := e.packClock.Increment()
386 // if err != nil {
387 // return err
388 // }
389 editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
390 if err != nil {
391 return err
392 }
393 var creationTime lamport.Time
394 if e.lastCommit == "" {
395 creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
396 if err != nil {
397 return err
398 }
399 }
400
401 opp := &operationPack{
402 Author: author,
403 Operations: e.staging,
404 CreateTime: creationTime,
405 EditTime: editTime,
406 // PackTime: packTime,
407 }
408
409 var commitHash repository.Hash
410 if e.lastCommit == "" {
411 commitHash, err = opp.Write(e.Definition, repo)
412 } else {
413 commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
414 }
415
416 if err != nil {
417 return err
418 }
419
420 e.lastCommit = commitHash
421 e.ops = append(e.ops, e.staging...)
422 e.staging = nil
423
424 // Create or update the Git reference for this entity
425 // When pushing later, the remote will ensure that this ref update
426 // is fast-forward, that is no data has been overwritten.
427 ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
428 return repo.UpdateRef(ref, commitHash)
429}