1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entity"
13 "github.com/MichaelMure/git-bug/identity"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22// Definition hold the details defining one specialization of an Entity.
23type Definition struct {
24 // the name of the entity (bug, pull-request, ...)
25 typename string
26 // the namespace in git (bugs, prs, ...)
27 namespace string
28 // a function decoding a JSON message into an Operation
29 operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
30 // a function loading an identity.Identity from its Id
31 identityResolver identity.Resolver
32 // the expected format version number, that can be used for data migration/upgrade
33 formatVersion uint
34}
35
36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
37type Entity struct {
38 Definition
39
40 // operations that are already stored in the repository
41 ops []Operation
42 // operations not yet stored in the repository
43 staging []Operation
44
45 // TODO: add here createTime and editTime
46
47 // // TODO: doesn't seems to actually be useful over the topological sort ? Timestamp can be generated from graph depth
48 // // TODO: maybe EditTime is better because it could spread ops in consecutive groups on the logical timeline --> avoid interleaving
49 // packClock lamport.Clock
50 lastCommit repository.Hash
51}
52
53// New create an empty Entity
54func New(definition Definition) *Entity {
55 return &Entity{
56 Definition: definition,
57 // packClock: lamport.NewMemClock(),
58 }
59}
60
61// Read will read and decode a stored local Entity from a repository
62func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
63 if err := id.Validate(); err != nil {
64 return nil, errors.Wrap(err, "invalid id")
65 }
66
67 ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
68
69 return read(def, repo, ref)
70}
71
72// readRemote will read and decode a stored remote Entity from a repository
73func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
74 if err := id.Validate(); err != nil {
75 return nil, errors.Wrap(err, "invalid id")
76 }
77
78 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())
79
80 return read(def, repo, ref)
81}
82
83// read fetch from git and decode an Entity at an arbitrary git reference.
84func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
85 rootHash, err := repo.ResolveRef(ref)
86 if err != nil {
87 return nil, err
88 }
89
90 // Perform a breadth-first search to get a topological order of the DAG where we discover the
91 // parents commit and go back in time up to the chronological root
92
93 queue := make([]repository.Hash, 0, 32)
94 visited := make(map[repository.Hash]struct{})
95 BFSOrder := make([]repository.Commit, 0, 32)
96
97 queue = append(queue, rootHash)
98 visited[rootHash] = struct{}{}
99
100 for len(queue) > 0 {
101 // pop
102 hash := queue[0]
103 queue = queue[1:]
104
105 commit, err := repo.ReadCommit(hash)
106 if err != nil {
107 return nil, err
108 }
109
110 BFSOrder = append(BFSOrder, commit)
111
112 for _, parent := range commit.Parents {
113 if _, ok := visited[parent]; !ok {
114 queue = append(queue, parent)
115 // mark as visited
116 visited[parent] = struct{}{}
117 }
118 }
119 }
120
121 // Now, we can reverse this topological order and read the commits in an order where
122 // we are sure to have read all the chronological ancestors when we read a commit.
123
124 // Next step is to:
125 // 1) read the operationPacks
126 // 2) make sure that the clocks causality respect the DAG topology.
127
128 oppMap := make(map[repository.Hash]*operationPack)
129 var opsCount int
130 // var packClock = lamport.NewMemClock()
131
132 for i := len(BFSOrder) - 1; i >= 0; i-- {
133 commit := BFSOrder[i]
134 isFirstCommit := i == len(BFSOrder)-1
135 isMerge := len(commit.Parents) > 1
136
137 // Verify DAG structure: single chronological root, so only the root
138 // can have no parents. Said otherwise, the DAG need to have exactly
139 // one leaf.
140 if !isFirstCommit && len(commit.Parents) == 0 {
141 return nil, fmt.Errorf("multiple leafs in the entity DAG")
142 }
143
144 opp, err := readOperationPack(def, repo, commit)
145 if err != nil {
146 return nil, err
147 }
148
149 err = opp.Validate()
150 if err != nil {
151 return nil, err
152 }
153
154 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
155 if isFirstCommit && opp.CreateTime <= 0 {
156 return nil, fmt.Errorf("creation lamport time not set")
157 }
158
159 // make sure that the lamport clocks causality match the DAG topology
160 for _, parentHash := range commit.Parents {
161 parentPack, ok := oppMap[parentHash]
162 if !ok {
163 panic("DFS failed")
164 }
165
166 if parentPack.EditTime >= opp.EditTime {
167 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
168 }
169
170 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
171 // that the clocks don't jump too far in the future
172 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
173 // as long as there is one valid chain of small hops, it's fine.
174 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
175 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
176 }
177
178 // TODO: PackTime is not checked
179 }
180
181 oppMap[commit.Hash] = opp
182 opsCount += len(opp.Operations)
183 }
184
185 // The clocks are fine, we witness them
186 for _, opp := range oppMap {
187 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
188 if err != nil {
189 return nil, err
190 }
191 err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
192 if err != nil {
193 return nil, err
194 }
195 // err = packClock.Witness(opp.PackTime)
196 // if err != nil {
197 // return nil, err
198 // }
199 }
200
201 // Now that we know that the topological order and clocks are fine, we order the operationPacks
202 // based on the logical clocks, entirely ignoring the DAG topology
203
204 oppSlice := make([]*operationPack, 0, len(oppMap))
205 for _, pack := range oppMap {
206 oppSlice = append(oppSlice, pack)
207 }
208 sort.Slice(oppSlice, func(i, j int) bool {
209 // Primary ordering with the dedicated "pack" Lamport time that encode causality
210 // within the entity
211 // if oppSlice[i].PackTime != oppSlice[j].PackTime {
212 // return oppSlice[i].PackTime < oppSlice[i].PackTime
213 // }
214 // We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
215 // came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
216 // enough but it can give us an edge to approach what really happened.
217 if oppSlice[i].EditTime != oppSlice[j].EditTime {
218 return oppSlice[i].EditTime < oppSlice[j].EditTime
219 }
220 // Well, what now? We still need a total ordering and the most stable possible.
221 // As a last resort, we can order based on a hash of the serialized Operations in the
222 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
223 // This is a lexicographic ordering on the stringified ID.
224 return oppSlice[i].Id() < oppSlice[j].Id()
225 })
226
227 // Now that we ordered the operationPacks, we have the order of the Operations
228
229 ops := make([]Operation, 0, opsCount)
230 for _, pack := range oppSlice {
231 for _, operation := range pack.Operations {
232 ops = append(ops, operation)
233 }
234 }
235
236 return &Entity{
237 Definition: def,
238 ops: ops,
239 // packClock: packClock,
240 lastCommit: rootHash,
241 }, nil
242}
243
244type StreamedEntity struct {
245 Entity *Entity
246 Err error
247}
248
249// ReadAll read and parse all local Entity
250func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
251 out := make(chan StreamedEntity)
252
253 go func() {
254 defer close(out)
255
256 refPrefix := fmt.Sprintf("refs/%s/", def.namespace)
257
258 refs, err := repo.ListRefs(refPrefix)
259 if err != nil {
260 out <- StreamedEntity{Err: err}
261 return
262 }
263
264 for _, ref := range refs {
265 e, err := read(def, repo, ref)
266
267 if err != nil {
268 out <- StreamedEntity{Err: err}
269 return
270 }
271
272 out <- StreamedEntity{Entity: e}
273 }
274 }()
275
276 return out
277}
278
279// Id return the Entity identifier
280func (e *Entity) Id() entity.Id {
281 // id is the id of the first operation
282 return e.FirstOp().Id()
283}
284
285// Validate check if the Entity data is valid
286func (e *Entity) Validate() error {
287 // non-empty
288 if len(e.ops) == 0 && len(e.staging) == 0 {
289 return fmt.Errorf("entity has no operations")
290 }
291
292 // check if each operations are valid
293 for _, op := range e.ops {
294 if err := op.Validate(); err != nil {
295 return err
296 }
297 }
298
299 // check if staging is valid if needed
300 for _, op := range e.staging {
301 if err := op.Validate(); err != nil {
302 return err
303 }
304 }
305
306 // Check that there is no colliding operation's ID
307 ids := make(map[entity.Id]struct{})
308 for _, op := range e.Operations() {
309 if _, ok := ids[op.Id()]; ok {
310 return fmt.Errorf("id collision: %s", op.Id())
311 }
312 ids[op.Id()] = struct{}{}
313 }
314
315 return nil
316}
317
318// Operations return the ordered operations
319func (e *Entity) Operations() []Operation {
320 return append(e.ops, e.staging...)
321}
322
323// FirstOp lookup for the very first operation of the Entity
324func (e *Entity) FirstOp() Operation {
325 for _, op := range e.ops {
326 return op
327 }
328 for _, op := range e.staging {
329 return op
330 }
331 return nil
332}
333
334// LastOp lookup for the very last operation of the Entity
335func (e *Entity) LastOp() Operation {
336 if len(e.staging) > 0 {
337 return e.staging[len(e.staging)-1]
338 }
339 if len(e.ops) > 0 {
340 return e.ops[len(e.ops)-1]
341 }
342 return nil
343}
344
345// Append add a new Operation to the Entity
346func (e *Entity) Append(op Operation) {
347 e.staging = append(e.staging, op)
348}
349
350// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
351func (e *Entity) NeedCommit() bool {
352 return len(e.staging) > 0
353}
354
355// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
356// is already in sync with the repository.
357func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
358 if e.NeedCommit() {
359 return e.Commit(repo)
360 }
361 return nil
362}
363
364// Commit write the appended operations in the repository
365func (e *Entity) Commit(repo repository.ClockedRepo) error {
366 if !e.NeedCommit() {
367 return fmt.Errorf("can't commit an entity with no pending operation")
368 }
369
370 if err := e.Validate(); err != nil {
371 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
372 }
373
374 var author identity.Interface
375 for _, op := range e.staging {
376 if author != nil && op.Author() != author {
377 return fmt.Errorf("operations with different author")
378 }
379 author = op.Author()
380 }
381
382 // increment the various clocks for this new operationPack
383 // packTime, err := e.packClock.Increment()
384 // if err != nil {
385 // return err
386 // }
387 editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
388 if err != nil {
389 return err
390 }
391 var creationTime lamport.Time
392 if e.lastCommit == "" {
393 creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
394 if err != nil {
395 return err
396 }
397 }
398
399 opp := &operationPack{
400 Author: author,
401 Operations: e.staging,
402 CreateTime: creationTime,
403 EditTime: editTime,
404 // PackTime: packTime,
405 }
406
407 var commitHash repository.Hash
408 if e.lastCommit == "" {
409 commitHash, err = opp.Write(e.Definition, repo)
410 } else {
411 commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
412 }
413
414 if err != nil {
415 return err
416 }
417
418 e.lastCommit = commitHash
419 e.ops = append(e.ops, e.staging...)
420 e.staging = nil
421
422 // Create or update the Git reference for this entity
423 // When pushing later, the remote will ensure that this ref update
424 // is fast-forward, that is no data has been overwritten.
425 ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
426 return repo.UpdateRef(ref, commitHash)
427}