1// Package dag contains the base common code to define an entity stored
2// in a chain of git objects, supporting actions like Push, Pull and Merge.
3package dag
4
5import (
6 "encoding/json"
7 "fmt"
8 "sort"
9
10 "github.com/pkg/errors"
11
12 "github.com/MichaelMure/git-bug/entity"
13 "github.com/MichaelMure/git-bug/identity"
14 "github.com/MichaelMure/git-bug/repository"
15 "github.com/MichaelMure/git-bug/util/lamport"
16)
17
18const refsPattern = "refs/%s/%s"
19const creationClockPattern = "%s-create"
20const editClockPattern = "%s-edit"
21
22// Definition hold the details defining one specialization of an Entity.
23type Definition struct {
24 // the name of the entity (bug, pull-request, ...)
25 typename string
26 // the namespace in git (bugs, prs, ...)
27 namespace string
28 // a function decoding a JSON message into an Operation
29 operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
30 // a function loading an identity.Identity from its Id
31 identityResolver identity.Resolver
32 // the expected format version number, that can be used for data migration/upgrade
33 formatVersion uint
34}
35
36// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
37type Entity struct {
38 Definition
39
40 // operations that are already stored in the repository
41 ops []Operation
42 // operations not yet stored in the repository
43 staging []Operation
44
45 // TODO: add here createTime and editTime
46
47 lastCommit repository.Hash
48}
49
50// New create an empty Entity
51func New(definition Definition) *Entity {
52 return &Entity{
53 Definition: definition,
54 }
55}
56
57// Read will read and decode a stored local Entity from a repository
58func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
59 if err := id.Validate(); err != nil {
60 return nil, errors.Wrap(err, "invalid id")
61 }
62
63 ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
64
65 return read(def, repo, ref)
66}
67
68// readRemote will read and decode a stored remote Entity from a repository
69func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
70 if err := id.Validate(); err != nil {
71 return nil, errors.Wrap(err, "invalid id")
72 }
73
74 ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())
75
76 return read(def, repo, ref)
77}
78
79// read fetch from git and decode an Entity at an arbitrary git reference.
80func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
81 rootHash, err := repo.ResolveRef(ref)
82 if err != nil {
83 return nil, err
84 }
85
86 // Perform a breadth-first search to get a topological order of the DAG where we discover the
87 // parents commit and go back in time up to the chronological root
88
89 queue := make([]repository.Hash, 0, 32)
90 visited := make(map[repository.Hash]struct{})
91 BFSOrder := make([]repository.Commit, 0, 32)
92
93 queue = append(queue, rootHash)
94 visited[rootHash] = struct{}{}
95
96 for len(queue) > 0 {
97 // pop
98 hash := queue[0]
99 queue = queue[1:]
100
101 commit, err := repo.ReadCommit(hash)
102 if err != nil {
103 return nil, err
104 }
105
106 BFSOrder = append(BFSOrder, commit)
107
108 for _, parent := range commit.Parents {
109 if _, ok := visited[parent]; !ok {
110 queue = append(queue, parent)
111 // mark as visited
112 visited[parent] = struct{}{}
113 }
114 }
115 }
116
117 // Now, we can reverse this topological order and read the commits in an order where
118 // we are sure to have read all the chronological ancestors when we read a commit.
119
120 // Next step is to:
121 // 1) read the operationPacks
122 // 2) make sure that the clocks causality respect the DAG topology.
123
124 oppMap := make(map[repository.Hash]*operationPack)
125 var opsCount int
126
127 for i := len(BFSOrder) - 1; i >= 0; i-- {
128 commit := BFSOrder[i]
129 isFirstCommit := i == len(BFSOrder)-1
130 isMerge := len(commit.Parents) > 1
131
132 // Verify DAG structure: single chronological root, so only the root
133 // can have no parents. Said otherwise, the DAG need to have exactly
134 // one leaf.
135 if !isFirstCommit && len(commit.Parents) == 0 {
136 return nil, fmt.Errorf("multiple leafs in the entity DAG")
137 }
138
139 opp, err := readOperationPack(def, repo, commit)
140 if err != nil {
141 return nil, err
142 }
143
144 err = opp.Validate()
145 if err != nil {
146 return nil, err
147 }
148
149 // Check that the create lamport clock is set (not checked in Validate() as it's optional)
150 if isFirstCommit && opp.CreateTime <= 0 {
151 return nil, fmt.Errorf("creation lamport time not set")
152 }
153
154 // make sure that the lamport clocks causality match the DAG topology
155 for _, parentHash := range commit.Parents {
156 parentPack, ok := oppMap[parentHash]
157 if !ok {
158 panic("DFS failed")
159 }
160
161 if parentPack.EditTime >= opp.EditTime {
162 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
163 }
164
165 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
166 // that the clocks don't jump too far in the future
167 // we ignore merge commits here to allow merging after a loooong time without breaking anything,
168 // as long as there is one valid chain of small hops, it's fine.
169 if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
170 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
171 }
172 }
173
174 oppMap[commit.Hash] = opp
175 opsCount += len(opp.Operations)
176 }
177
178 // The clocks are fine, we witness them
179 for _, opp := range oppMap {
180 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
181 if err != nil {
182 return nil, err
183 }
184 err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
185 if err != nil {
186 return nil, err
187 }
188 }
189
190 // Now that we know that the topological order and clocks are fine, we order the operationPacks
191 // based on the logical clocks, entirely ignoring the DAG topology
192
193 oppSlice := make([]*operationPack, 0, len(oppMap))
194 for _, pack := range oppMap {
195 oppSlice = append(oppSlice, pack)
196 }
197 sort.Slice(oppSlice, func(i, j int) bool {
198 // Primary ordering with the EditTime.
199 if oppSlice[i].EditTime != oppSlice[j].EditTime {
200 return oppSlice[i].EditTime < oppSlice[j].EditTime
201 }
202 // We have equal EditTime, which means we have concurrent edition over different machines and we
203 // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
204 // As a secondary ordering, we can order based on a hash of the serialized Operations in the
205 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
206 // This is a lexicographic ordering on the stringified ID.
207 return oppSlice[i].Id() < oppSlice[j].Id()
208 })
209
210 // Now that we ordered the operationPacks, we have the order of the Operations
211
212 ops := make([]Operation, 0, opsCount)
213 for _, pack := range oppSlice {
214 for _, operation := range pack.Operations {
215 ops = append(ops, operation)
216 }
217 }
218
219 return &Entity{
220 Definition: def,
221 ops: ops,
222 lastCommit: rootHash,
223 }, nil
224}
225
226type StreamedEntity struct {
227 Entity *Entity
228 Err error
229}
230
231// ReadAll read and parse all local Entity
232func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
233 out := make(chan StreamedEntity)
234
235 go func() {
236 defer close(out)
237
238 refPrefix := fmt.Sprintf("refs/%s/", def.namespace)
239
240 refs, err := repo.ListRefs(refPrefix)
241 if err != nil {
242 out <- StreamedEntity{Err: err}
243 return
244 }
245
246 for _, ref := range refs {
247 e, err := read(def, repo, ref)
248
249 if err != nil {
250 out <- StreamedEntity{Err: err}
251 return
252 }
253
254 out <- StreamedEntity{Entity: e}
255 }
256 }()
257
258 return out
259}
260
261// Id return the Entity identifier
262func (e *Entity) Id() entity.Id {
263 // id is the id of the first operation
264 return e.FirstOp().Id()
265}
266
267// Validate check if the Entity data is valid
268func (e *Entity) Validate() error {
269 // non-empty
270 if len(e.ops) == 0 && len(e.staging) == 0 {
271 return fmt.Errorf("entity has no operations")
272 }
273
274 // check if each operations are valid
275 for _, op := range e.ops {
276 if err := op.Validate(); err != nil {
277 return err
278 }
279 }
280
281 // check if staging is valid if needed
282 for _, op := range e.staging {
283 if err := op.Validate(); err != nil {
284 return err
285 }
286 }
287
288 // Check that there is no colliding operation's ID
289 ids := make(map[entity.Id]struct{})
290 for _, op := range e.Operations() {
291 if _, ok := ids[op.Id()]; ok {
292 return fmt.Errorf("id collision: %s", op.Id())
293 }
294 ids[op.Id()] = struct{}{}
295 }
296
297 return nil
298}
299
300// Operations return the ordered operations
301func (e *Entity) Operations() []Operation {
302 return append(e.ops, e.staging...)
303}
304
305// FirstOp lookup for the very first operation of the Entity
306func (e *Entity) FirstOp() Operation {
307 for _, op := range e.ops {
308 return op
309 }
310 for _, op := range e.staging {
311 return op
312 }
313 return nil
314}
315
316// LastOp lookup for the very last operation of the Entity
317func (e *Entity) LastOp() Operation {
318 if len(e.staging) > 0 {
319 return e.staging[len(e.staging)-1]
320 }
321 if len(e.ops) > 0 {
322 return e.ops[len(e.ops)-1]
323 }
324 return nil
325}
326
327// Append add a new Operation to the Entity
328func (e *Entity) Append(op Operation) {
329 e.staging = append(e.staging, op)
330}
331
332// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
333func (e *Entity) NeedCommit() bool {
334 return len(e.staging) > 0
335}
336
337// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
338// is already in sync with the repository.
339func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
340 if e.NeedCommit() {
341 return e.Commit(repo)
342 }
343 return nil
344}
345
346// Commit write the appended operations in the repository
347func (e *Entity) Commit(repo repository.ClockedRepo) error {
348 if !e.NeedCommit() {
349 return fmt.Errorf("can't commit an entity with no pending operation")
350 }
351
352 if err := e.Validate(); err != nil {
353 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
354 }
355
356 var author identity.Interface
357 for _, op := range e.staging {
358 if author != nil && op.Author() != author {
359 return fmt.Errorf("operations with different author")
360 }
361 author = op.Author()
362 }
363
364 editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
365 if err != nil {
366 return err
367 }
368 var creationTime lamport.Time
369 if e.lastCommit == "" {
370 creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
371 if err != nil {
372 return err
373 }
374 }
375
376 opp := &operationPack{
377 Author: author,
378 Operations: e.staging,
379 CreateTime: creationTime,
380 EditTime: editTime,
381 }
382
383 var commitHash repository.Hash
384 if e.lastCommit == "" {
385 commitHash, err = opp.Write(e.Definition, repo)
386 } else {
387 commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
388 }
389
390 if err != nil {
391 return err
392 }
393
394 e.lastCommit = commitHash
395 e.ops = append(e.ops, e.staging...)
396 e.staging = nil
397
398 // Create or update the Git reference for this entity
399 // When pushing later, the remote will ensure that this ref update
400 // is fast-forward, that is no data has been overwritten.
401 ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
402 return repo.UpdateRef(ref, commitHash)
403}