1package entity
2
3import (
4 "encoding/json"
5 "fmt"
6 "sort"
7
8 "github.com/pkg/errors"
9
10 "github.com/MichaelMure/git-bug/repository"
11 "github.com/MichaelMure/git-bug/util/lamport"
12)
13
14const refsPattern = "refs/%s/%s"
15const creationClockPattern = "%s-create"
16const editClockPattern = "%s-edit"
17
18type Operation interface {
19 Id() Id
20 // MarshalJSON() ([]byte, error)
21 Validate() error
22}
23
24type OperationIterator struct {
25}
26
27type Definition struct {
28 // the name of the entity (bug, pull-request, ...)
29 typename string
30 // the namespace in git (bugs, prs, ...)
31 namespace string
32 // a function decoding a JSON message into an Operation
33 operationUnmarshaler func(raw json.RawMessage) (Operation, error)
34 // the expected format version number
35 formatVersion uint
36}
37
38type Entity struct {
39 Definition
40
41 ops []Operation
42 staging []Operation
43
44 packClock lamport.Clock
45 lastCommit repository.Hash
46}
47
48func New(definition Definition) *Entity {
49 return &Entity{
50 Definition: definition,
51 packClock: lamport.NewMemClock(),
52 }
53}
54
55func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
56 if err := id.Validate(); err != nil {
57 return nil, errors.Wrap(err, "invalid id")
58 }
59
60 ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
61
62 rootHash, err := repo.ResolveRef(ref)
63 if err != nil {
64 return nil, err
65 }
66
67 // Perform a depth-first search to get a topological order of the DAG where we discover the
68 // parents commit and go back in time up to the chronological root
69
70 stack := make([]repository.Hash, 0, 32)
71 visited := make(map[repository.Hash]struct{})
72 DFSOrder := make([]repository.Commit, 0, 32)
73
74 stack = append(stack, rootHash)
75
76 for len(stack) > 0 {
77 // pop
78 hash := stack[len(stack)-1]
79 stack = stack[:len(stack)-1]
80
81 if _, ok := visited[hash]; ok {
82 continue
83 }
84
85 // mark as visited
86 visited[hash] = struct{}{}
87
88 commit, err := repo.ReadCommit(hash)
89 if err != nil {
90 return nil, err
91 }
92
93 DFSOrder = append(DFSOrder, commit)
94
95 for _, parent := range commit.Parents {
96 stack = append(stack, parent)
97 }
98 }
99
100 // Now, we can reverse this topological order and read the commits in an order where
101 // we are sure to have read all the chronological ancestors when we read a commit.
102
103 // Next step is to:
104 // 1) read the operationPacks
105 // 2) make sure that the clocks causality respect the DAG topology.
106
107 oppMap := make(map[repository.Hash]*operationPack)
108 var opsCount int
109 var packClock = lamport.NewMemClock()
110
111 for i := len(DFSOrder) - 1; i >= 0; i-- {
112 commit := DFSOrder[i]
113 firstCommit := i == len(DFSOrder)-1
114
115 // Verify DAG structure: single chronological root, so only the root
116 // can have no parents
117 if !firstCommit && len(commit.Parents) == 0 {
118 return nil, fmt.Errorf("multiple root in the entity DAG")
119 }
120
121 opp, err := readOperationPack(def, repo, commit.TreeHash)
122 if err != nil {
123 return nil, err
124 }
125
126 // Check that the lamport clocks are set
127 if firstCommit && opp.CreateTime <= 0 {
128 return nil, fmt.Errorf("creation lamport time not set")
129 }
130 if opp.EditTime <= 0 {
131 return nil, fmt.Errorf("edition lamport time not set")
132 }
133 if opp.PackTime <= 0 {
134 return nil, fmt.Errorf("pack lamport time not set")
135 }
136
137 // make sure that the lamport clocks causality match the DAG topology
138 for _, parentHash := range commit.Parents {
139 parentPack, ok := oppMap[parentHash]
140 if !ok {
141 panic("DFS failed")
142 }
143
144 if parentPack.EditTime >= opp.EditTime {
145 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
146 }
147
148 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
149 // that the clocks don't jump too far in the future
150 if opp.EditTime-parentPack.EditTime > 10_000 {
151 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
152 }
153 }
154
155 oppMap[commit.Hash] = opp
156 opsCount += len(opp.Operations)
157 }
158
159 // The clocks are fine, we witness them
160 for _, opp := range oppMap {
161 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
162 if err != nil {
163 return nil, err
164 }
165 err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
166 if err != nil {
167 return nil, err
168 }
169 err = packClock.Witness(opp.PackTime)
170 if err != nil {
171 return nil, err
172 }
173 }
174
175 // Now that we know that the topological order and clocks are fine, we order the operationPacks
176 // based on the logical clocks, entirely ignoring the DAG topology
177
178 oppSlice := make([]*operationPack, 0, len(oppMap))
179 for _, pack := range oppMap {
180 oppSlice = append(oppSlice, pack)
181 }
182 sort.Slice(oppSlice, func(i, j int) bool {
183 // TODO: no secondary ordering?
184 // might be useful for stable ordering
185 return oppSlice[i].PackTime < oppSlice[i].PackTime
186 })
187
188 // Now that we ordered the operationPacks, we have the order of the Operations
189
190 ops := make([]Operation, 0, opsCount)
191 for _, pack := range oppSlice {
192 for _, operation := range pack.Operations {
193 ops = append(ops, operation)
194 }
195 }
196
197 return &Entity{
198 Definition: def,
199 ops: ops,
200 lastCommit: rootHash,
201 }, nil
202}
203
204// Id return the Entity identifier
205func (e *Entity) Id() Id {
206 // id is the id of the first operation
207 return e.FirstOp().Id()
208}
209
210func (e *Entity) Validate() error {
211 // non-empty
212 if len(e.ops) == 0 && len(e.staging) == 0 {
213 return fmt.Errorf("entity has no operations")
214 }
215
216 // check if each operations are valid
217 for _, op := range e.ops {
218 if err := op.Validate(); err != nil {
219 return err
220 }
221 }
222
223 // check if staging is valid if needed
224 for _, op := range e.staging {
225 if err := op.Validate(); err != nil {
226 return err
227 }
228 }
229
230 // Check that there is no colliding operation's ID
231 ids := make(map[Id]struct{})
232 for _, op := range e.Operations() {
233 if _, ok := ids[op.Id()]; ok {
234 return fmt.Errorf("id collision: %s", op.Id())
235 }
236 ids[op.Id()] = struct{}{}
237 }
238
239 return nil
240}
241
242// return the ordered operations
243func (e *Entity) Operations() []Operation {
244 return append(e.ops, e.staging...)
245}
246
247// Lookup for the very first operation of the Entity.
248func (e *Entity) FirstOp() Operation {
249 for _, op := range e.ops {
250 return op
251 }
252 for _, op := range e.staging {
253 return op
254 }
255 return nil
256}
257
258func (e *Entity) Append(op Operation) {
259 e.staging = append(e.staging, op)
260}
261
262func (e *Entity) NeedCommit() bool {
263 return len(e.staging) > 0
264}
265
266func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
267 if e.NeedCommit() {
268 return e.Commit(repo)
269 }
270 return nil
271}
272
273func (e *Entity) Commit(repo repository.ClockedRepo) error {
274 if !e.NeedCommit() {
275 return fmt.Errorf("can't commit an entity with no pending operation")
276 }
277
278 if err := e.Validate(); err != nil {
279 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
280 }
281
282 // increment the various clocks for this new operationPack
283 packTime, err := e.packClock.Increment()
284 if err != nil {
285 return err
286 }
287 editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
288 if err != nil {
289 return err
290 }
291 var creationTime lamport.Time
292 if e.lastCommit == "" {
293 creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
294 if err != nil {
295 return err
296 }
297 }
298
299 opp := &operationPack{
300 Operations: e.staging,
301 CreateTime: creationTime,
302 EditTime: editTime,
303 PackTime: packTime,
304 }
305
306 treeHash, err := opp.write(e.Definition, repo)
307 if err != nil {
308 return err
309 }
310
311 // Write a Git commit referencing the tree, with the previous commit as parent
312 var commitHash repository.Hash
313 if e.lastCommit != "" {
314 commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit)
315 } else {
316 commitHash, err = repo.StoreCommit(treeHash)
317 }
318 if err != nil {
319 return err
320 }
321
322 e.lastCommit = commitHash
323 e.ops = append(e.ops, e.staging...)
324 e.staging = nil
325
326 // Create or update the Git reference for this entity
327 // When pushing later, the remote will ensure that this ref update
328 // is fast-forward, that is no data has been overwritten.
329 ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
330 return repo.UpdateRef(ref, commitHash)
331}