1package entity
2
3import (
4 "encoding/json"
5 "fmt"
6 "sort"
7
8 "github.com/pkg/errors"
9
10 "github.com/MichaelMure/git-bug/repository"
11 "github.com/MichaelMure/git-bug/util/lamport"
12)
13
14const refsPattern = "refs/%s/%s"
15const creationClockPattern = "%s-create"
16const editClockPattern = "%s-edit"
17
18type Operation interface {
19 Id() Id
20 // MarshalJSON() ([]byte, error)
21 Validate() error
22}
23
24// Definition hold the details defining one specialization of an Entity.
25type Definition struct {
26 // the name of the entity (bug, pull-request, ...)
27 typename string
28 // the namespace in git (bugs, prs, ...)
29 namespace string
30 // a function decoding a JSON message into an Operation
31 operationUnmarshaler func(raw json.RawMessage) (Operation, error)
32 // the expected format version number
33 formatVersion uint
34}
35
36type Entity struct {
37 Definition
38
39 ops []Operation
40 staging []Operation
41
42 packClock lamport.Clock
43 lastCommit repository.Hash
44}
45
46func New(definition Definition) *Entity {
47 return &Entity{
48 Definition: definition,
49 packClock: lamport.NewMemClock(),
50 }
51}
52
53func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
54 if err := id.Validate(); err != nil {
55 return nil, errors.Wrap(err, "invalid id")
56 }
57
58 ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())
59
60 return read(def, repo, ref)
61}
62
63// read fetch from git and decode an Entity at an arbitrary git reference.
64func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
65 rootHash, err := repo.ResolveRef(ref)
66 if err != nil {
67 return nil, err
68 }
69
70 // Perform a depth-first search to get a topological order of the DAG where we discover the
71 // parents commit and go back in time up to the chronological root
72
73 stack := make([]repository.Hash, 0, 32)
74 visited := make(map[repository.Hash]struct{})
75 DFSOrder := make([]repository.Commit, 0, 32)
76
77 stack = append(stack, rootHash)
78
79 for len(stack) > 0 {
80 // pop
81 hash := stack[len(stack)-1]
82 stack = stack[:len(stack)-1]
83
84 if _, ok := visited[hash]; ok {
85 continue
86 }
87
88 // mark as visited
89 visited[hash] = struct{}{}
90
91 commit, err := repo.ReadCommit(hash)
92 if err != nil {
93 return nil, err
94 }
95
96 DFSOrder = append(DFSOrder, commit)
97
98 for _, parent := range commit.Parents {
99 stack = append(stack, parent)
100 }
101 }
102
103 // Now, we can reverse this topological order and read the commits in an order where
104 // we are sure to have read all the chronological ancestors when we read a commit.
105
106 // Next step is to:
107 // 1) read the operationPacks
108 // 2) make sure that the clocks causality respect the DAG topology.
109
110 oppMap := make(map[repository.Hash]*operationPack)
111 var opsCount int
112 var packClock = lamport.NewMemClock()
113
114 for i := len(DFSOrder) - 1; i >= 0; i-- {
115 commit := DFSOrder[i]
116 firstCommit := i == len(DFSOrder)-1
117
118 // Verify DAG structure: single chronological root, so only the root
119 // can have no parents
120 if !firstCommit && len(commit.Parents) == 0 {
121 return nil, fmt.Errorf("multiple root in the entity DAG")
122 }
123
124 opp, err := readOperationPack(def, repo, commit.TreeHash)
125 if err != nil {
126 return nil, err
127 }
128
129 // Check that the lamport clocks are set
130 if firstCommit && opp.CreateTime <= 0 {
131 return nil, fmt.Errorf("creation lamport time not set")
132 }
133 if opp.EditTime <= 0 {
134 return nil, fmt.Errorf("edition lamport time not set")
135 }
136 if opp.PackTime <= 0 {
137 return nil, fmt.Errorf("pack lamport time not set")
138 }
139
140 // make sure that the lamport clocks causality match the DAG topology
141 for _, parentHash := range commit.Parents {
142 parentPack, ok := oppMap[parentHash]
143 if !ok {
144 panic("DFS failed")
145 }
146
147 if parentPack.EditTime >= opp.EditTime {
148 return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
149 }
150
151 // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
152 // that the clocks don't jump too far in the future
153 if opp.EditTime-parentPack.EditTime > 10_000 {
154 return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
155 }
156 }
157
158 oppMap[commit.Hash] = opp
159 opsCount += len(opp.Operations)
160 }
161
162 // The clocks are fine, we witness them
163 for _, opp := range oppMap {
164 err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
165 if err != nil {
166 return nil, err
167 }
168 err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
169 if err != nil {
170 return nil, err
171 }
172 err = packClock.Witness(opp.PackTime)
173 if err != nil {
174 return nil, err
175 }
176 }
177
178 // Now that we know that the topological order and clocks are fine, we order the operationPacks
179 // based on the logical clocks, entirely ignoring the DAG topology
180
181 oppSlice := make([]*operationPack, 0, len(oppMap))
182 for _, pack := range oppMap {
183 oppSlice = append(oppSlice, pack)
184 }
185 sort.Slice(oppSlice, func(i, j int) bool {
186 // Primary ordering with the dedicated "pack" Lamport time that encode causality
187 // within the entity
188 if oppSlice[i].PackTime != oppSlice[j].PackTime {
189 return oppSlice[i].PackTime < oppSlice[i].PackTime
190 }
191 // We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
192 // came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
193 // enough but it can give us an edge to approach what really happened.
194 if oppSlice[i].EditTime != oppSlice[j].EditTime {
195 return oppSlice[i].EditTime < oppSlice[j].EditTime
196 }
197 // Well, what now? We still need a total ordering, the most stable possible.
198 // As a last resort, we can order based on a hash of the serialized Operations in the
199 // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
200 // This is a lexicographic ordering.
201 return oppSlice[i].Id < oppSlice[j].Id
202 })
203
204 // Now that we ordered the operationPacks, we have the order of the Operations
205
206 ops := make([]Operation, 0, opsCount)
207 for _, pack := range oppSlice {
208 for _, operation := range pack.Operations {
209 ops = append(ops, operation)
210 }
211 }
212
213 return &Entity{
214 Definition: def,
215 ops: ops,
216 lastCommit: rootHash,
217 }, nil
218}
219
220// Id return the Entity identifier
221func (e *Entity) Id() Id {
222 // id is the id of the first operation
223 return e.FirstOp().Id()
224}
225
226func (e *Entity) Validate() error {
227 // non-empty
228 if len(e.ops) == 0 && len(e.staging) == 0 {
229 return fmt.Errorf("entity has no operations")
230 }
231
232 // check if each operations are valid
233 for _, op := range e.ops {
234 if err := op.Validate(); err != nil {
235 return err
236 }
237 }
238
239 // check if staging is valid if needed
240 for _, op := range e.staging {
241 if err := op.Validate(); err != nil {
242 return err
243 }
244 }
245
246 // Check that there is no colliding operation's ID
247 ids := make(map[Id]struct{})
248 for _, op := range e.Operations() {
249 if _, ok := ids[op.Id()]; ok {
250 return fmt.Errorf("id collision: %s", op.Id())
251 }
252 ids[op.Id()] = struct{}{}
253 }
254
255 return nil
256}
257
258// return the ordered operations
259func (e *Entity) Operations() []Operation {
260 return append(e.ops, e.staging...)
261}
262
263// Lookup for the very first operation of the Entity.
264func (e *Entity) FirstOp() Operation {
265 for _, op := range e.ops {
266 return op
267 }
268 for _, op := range e.staging {
269 return op
270 }
271 return nil
272}
273
274func (e *Entity) Append(op Operation) {
275 e.staging = append(e.staging, op)
276}
277
278func (e *Entity) NeedCommit() bool {
279 return len(e.staging) > 0
280}
281
282func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
283 if e.NeedCommit() {
284 return e.Commit(repo)
285 }
286 return nil
287}
288
289// TODO: support commit signature
290func (e *Entity) Commit(repo repository.ClockedRepo) error {
291 if !e.NeedCommit() {
292 return fmt.Errorf("can't commit an entity with no pending operation")
293 }
294
295 if err := e.Validate(); err != nil {
296 return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
297 }
298
299 // increment the various clocks for this new operationPack
300 packTime, err := e.packClock.Increment()
301 if err != nil {
302 return err
303 }
304 editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
305 if err != nil {
306 return err
307 }
308 var creationTime lamport.Time
309 if e.lastCommit == "" {
310 creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
311 if err != nil {
312 return err
313 }
314 }
315
316 opp := &operationPack{
317 Operations: e.staging,
318 CreateTime: creationTime,
319 EditTime: editTime,
320 PackTime: packTime,
321 }
322
323 treeHash, err := opp.write(e.Definition, repo)
324 if err != nil {
325 return err
326 }
327
328 // Write a Git commit referencing the tree, with the previous commit as parent
329 var commitHash repository.Hash
330 if e.lastCommit != "" {
331 commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit)
332 } else {
333 commitHash, err = repo.StoreCommit(treeHash)
334 }
335 if err != nil {
336 return err
337 }
338
339 e.lastCommit = commitHash
340 e.ops = append(e.ops, e.staging...)
341 e.staging = nil
342
343 // Create or update the Git reference for this entity
344 // When pushing later, the remote will ensure that this ref update
345 // is fast-forward, that is no data has been overwritten.
346 ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
347 return repo.UpdateRef(ref, commitHash)
348}