entity: clocks and write

Michael Muré created

Change summary

entity/entity.go         | 196 +++++++++++++++++++++++++++++++++++++----
entity/entity_actions.go |   4 
entity/entity_test.go    | 107 ++++++++++++++++++++++
entity/operation_pack.go | 132 ++++++++++++++++++++++------
4 files changed, 387 insertions(+), 52 deletions(-)

Detailed changes

entity/entity.go 🔗

@@ -8,34 +8,47 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/MichaelMure/git-bug/repository"
+	"github.com/MichaelMure/git-bug/util/lamport"
 )
 
+const refsPattern = "refs/%s/%s"
+const creationClockPattern = "%s-create"
+const editClockPattern = "%s-edit"
+
 type Operation interface {
-	// Id() Id
+	Id() Id
 	// MarshalJSON() ([]byte, error)
 	Validate() error
-
-	base() *OpBase
 }
 
 type OperationIterator struct {
 }
 
 type Definition struct {
-	namespace            string
+	// the name of the entity (bug, pull-request, ...)
+	typename string
+	// the namespace in git (bugs, prs, ...)
+	namespace string
+	// a function decoding a JSON message into an Operation
 	operationUnmarshaler func(raw json.RawMessage) (Operation, error)
-	formatVersion        uint
+	// the expected format version number
+	formatVersion uint
 }
 
 type Entity struct {
 	Definition
 
-	ops []Operation
+	ops     []Operation
+	staging []Operation
+
+	packClock  lamport.Clock
+	lastCommit repository.Hash
 }
 
 func New(definition Definition) *Entity {
 	return &Entity{
 		Definition: definition,
+		packClock:  lamport.NewMemClock(),
 	}
 }
 
@@ -93,19 +106,15 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
 
 	oppMap := make(map[repository.Hash]*operationPack)
 	var opsCount int
+	var packClock = lamport.NewMemClock()
 
-	rootCommit := DFSOrder[len(DFSOrder)-1]
-	rootOpp, err := readOperationPack(def, repo, rootCommit.TreeHash)
-	if err != nil {
-		return nil, err
-	}
-	oppMap[rootCommit.Hash] = rootOpp
-
-	for i := len(DFSOrder) - 2; i >= 0; i-- {
+	for i := len(DFSOrder) - 1; i >= 0; i-- {
 		commit := DFSOrder[i]
+		firstCommit := i == len(DFSOrder)-1
 
-		// Verify DAG structure: single chronological root
-		if len(commit.Parents) == 0 {
+		// Verify DAG structure: single chronological root, so only the root
+		// can have no parents
+		if !firstCommit && len(commit.Parents) == 0 {
 			return nil, fmt.Errorf("multiple root in the entity DAG")
 		}
 
@@ -114,6 +123,17 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
 			return nil, err
 		}
 
+		// Check that the lamport clocks are set
+		if firstCommit && opp.CreateTime <= 0 {
+			return nil, fmt.Errorf("creation lamport time not set")
+		}
+		if opp.EditTime <= 0 {
+			return nil, fmt.Errorf("edition lamport time not set")
+		}
+		if opp.PackTime <= 0 {
+			return nil, fmt.Errorf("pack lamport time not set")
+		}
+
 		// make sure that the lamport clocks causality match the DAG topology
 		for _, parentHash := range commit.Parents {
 			parentPack, ok := oppMap[parentHash]
@@ -136,6 +156,22 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
 		opsCount += len(opp.Operations)
 	}
 
+	// The clocks are fine, we witness them
+	for _, opp := range oppMap {
+		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
+		if err != nil {
+			return nil, err
+		}
+		err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
+		if err != nil {
+			return nil, err
+		}
+		err = packClock.Witness(opp.PackTime)
+		if err != nil {
+			return nil, err
+		}
+	}
+
 	// Now that we know that the topological order and clocks are fine, we order the operationPacks
 	// based on the logical clocks, entirely ignoring the DAG topology
 
@@ -145,7 +181,8 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
 	}
 	sort.Slice(oppSlice, func(i, j int) bool {
 		// TODO: no secondary ordering?
-		return oppSlice[i].EditTime < oppSlice[i].EditTime
+		// might be useful for stable ordering
+		return oppSlice[i].PackTime < oppSlice[i].PackTime
 	})
 
 	// Now that we ordered the operationPacks, we have the order of the Operations
@@ -160,22 +197,135 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) {
 	return &Entity{
 		Definition: def,
 		ops:        ops,
+		lastCommit: rootHash,
 	}, nil
 }
 
-func Remove() error {
-	panic("")
+// Id return the Entity identifier
+func (e *Entity) Id() Id {
+	// id is the id of the first operation
+	return e.FirstOp().Id()
 }
 
-func (e *Entity) Id() {
+func (e *Entity) Validate() error {
+	// non-empty
+	if len(e.ops) == 0 && len(e.staging) == 0 {
+		return fmt.Errorf("entity has no operations")
+	}
+
+	// check if each operations are valid
+	for _, op := range e.ops {
+		if err := op.Validate(); err != nil {
+			return err
+		}
+	}
+
+	// check if staging is valid if needed
+	for _, op := range e.staging {
+		if err := op.Validate(); err != nil {
+			return err
+		}
+	}
+
+	// Check that there is no colliding operation's ID
+	ids := make(map[Id]struct{})
+	for _, op := range e.Operations() {
+		if _, ok := ids[op.Id()]; ok {
+			return fmt.Errorf("id collision: %s", op.Id())
+		}
+		ids[op.Id()] = struct{}{}
+	}
 
+	return nil
 }
 
 // return the ordered operations
 func (e *Entity) Operations() []Operation {
-	return e.ops
+	return append(e.ops, e.staging...)
 }
 
-func (e *Entity) Commit() error {
-	panic("")
+// Lookup for the very first operation of the Entity.
+func (e *Entity) FirstOp() Operation {
+	for _, op := range e.ops {
+		return op
+	}
+	for _, op := range e.staging {
+		return op
+	}
+	return nil
+}
+
+func (e *Entity) Append(op Operation) {
+	e.staging = append(e.staging, op)
+}
+
+func (e *Entity) NeedCommit() bool {
+	return len(e.staging) > 0
+}
+
+func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
+	if e.NeedCommit() {
+		return e.Commit(repo)
+	}
+	return nil
+}
+
+func (e *Entity) Commit(repo repository.ClockedRepo) error {
+	if !e.NeedCommit() {
+		return fmt.Errorf("can't commit an entity with no pending operation")
+	}
+
+	if err := e.Validate(); err != nil {
+		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
+	}
+
+	// increment the various clocks for this new operationPack
+	packTime, err := e.packClock.Increment()
+	if err != nil {
+		return err
+	}
+	editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
+	if err != nil {
+		return err
+	}
+	var creationTime lamport.Time
+	if e.lastCommit == "" {
+		creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
+		if err != nil {
+			return err
+		}
+	}
+
+	opp := &operationPack{
+		Operations: e.staging,
+		CreateTime: creationTime,
+		EditTime:   editTime,
+		PackTime:   packTime,
+	}
+
+	treeHash, err := opp.write(e.Definition, repo)
+	if err != nil {
+		return err
+	}
+
+	// Write a Git commit referencing the tree, with the previous commit as parent
+	var commitHash repository.Hash
+	if e.lastCommit != "" {
+		commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit)
+	} else {
+		commitHash, err = repo.StoreCommit(treeHash)
+	}
+	if err != nil {
+		return err
+	}
+
+	e.lastCommit = commitHash
+	e.ops = append(e.ops, e.staging...)
+	e.staging = nil
+
+	// Create or update the Git reference for this entity
+	// When pushing later, the remote will ensure that this ref update
+	// is fast-forward, that is no data has been overwritten.
+	ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
+	return repo.UpdateRef(ref, commitHash)
 }

entity/entity_test.go 🔗

@@ -0,0 +1,107 @@
+package entity
+
+import (
+	"encoding/json"
+	"fmt"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/MichaelMure/git-bug/repository"
+)
+
+// func TestFoo(t *testing.T) {
+// 	repo, err := repository.OpenGoGitRepo("~/dev/git-bug", nil)
+// 	require.NoError(t, err)
+//
+// 	b, err := ReadBug(repo, Id("8b22e548c93a6ed23c31fd4e337c6286c3d1e5c9cae5537bc8e5842e11bd1099"))
+// 	require.NoError(t, err)
+//
+// 	fmt.Println(b)
+// }
+
+type op1 struct {
+	OperationType int    `json:"type"`
+	Field1        string `json:"field_1"`
+}
+
+func newOp1(field1 string) *op1 {
+	return &op1{OperationType: 1, Field1: field1}
+}
+
+func (o op1) Id() Id {
+	data, _ := json.Marshal(o)
+	return DeriveId(data)
+}
+
+func (o op1) Validate() error { return nil }
+
+type op2 struct {
+	OperationType int    `json:"type"`
+	Field2        string `json:"field_2"`
+}
+
+func newOp2(field2 string) *op2 {
+	return &op2{OperationType: 2, Field2: field2}
+}
+
+func (o op2) Id() Id {
+	data, _ := json.Marshal(o)
+	return DeriveId(data)
+}
+
+func (o op2) Validate() error { return nil }
+
+var def = Definition{
+	typename:             "foo",
+	namespace:            "foos",
+	operationUnmarshaler: unmarshaller,
+	formatVersion:        1,
+}
+
+func unmarshaller(raw json.RawMessage) (Operation, error) {
+	var t struct {
+		OperationType int `json:"type"`
+	}
+
+	if err := json.Unmarshal(raw, &t); err != nil {
+		return nil, err
+	}
+
+	switch t.OperationType {
+	case 1:
+		op := &op1{}
+		err := json.Unmarshal(raw, &op)
+		return op, err
+	case 2:
+		op := &op2{}
+		err := json.Unmarshal(raw, &op)
+		return op, err
+	default:
+		return nil, fmt.Errorf("unknown operation type %v", t.OperationType)
+	}
+}
+
+func TestWriteRead(t *testing.T) {
+	repo := repository.NewMockRepo()
+
+	entity := New(def)
+	require.False(t, entity.NeedCommit())
+
+	entity.Append(newOp1("foo"))
+	entity.Append(newOp2("bar"))
+
+	require.True(t, entity.NeedCommit())
+	require.NoError(t, entity.CommitAdNeeded(repo))
+	require.False(t, entity.NeedCommit())
+
+	entity.Append(newOp2("foobar"))
+	require.True(t, entity.NeedCommit())
+	require.NoError(t, entity.CommitAdNeeded(repo))
+	require.False(t, entity.NeedCommit())
+
+	read, err := Read(def, repo, entity.Id())
+	require.NoError(t, err)
+
+	fmt.Println(*read)
+}

entity/operation_pack.go 🔗

@@ -2,6 +2,7 @@ package entity
 
 import (
 	"encoding/json"
+	"fmt"
 	"strconv"
 	"strings"
 
@@ -11,25 +12,82 @@ import (
 	"github.com/MichaelMure/git-bug/util/lamport"
 )
 
+// TODO: extra data tree
+const extraEntryName = "extra"
+
 const opsEntryName = "ops"
 const versionEntryPrefix = "version-"
 const createClockEntryPrefix = "create-clock-"
 const editClockEntryPrefix = "edit-clock-"
+const packClockEntryPrefix = "pack-clock-"
 
 type operationPack struct {
 	Operations []Operation
+	// Encode the entity's logical time of creation across all entities of the same type.
+	// Only exist on the root operationPack
 	CreateTime lamport.Time
-	EditTime   lamport.Time
+	// Encode the entity's logical time of last edition across all entities of the same type.
+	// Exist on all operationPack
+	EditTime lamport.Time
+	// Encode the operationPack's logical time of creation withing this entity.
+	// Exist on all operationPack
+	PackTime lamport.Time
 }
 
-// func (opp *operationPack) MarshalJSON() ([]byte, error) {
-// 	return json.Marshal(struct {
-// 		Operations []Operation `json:"ops"`
-// 	}{
-// 		Operations: opp.Operations,
-// 	})
-// }
+func (opp operationPack) write(def Definition, repo repository.RepoData) (repository.Hash, error) {
+	// For different reason, we store the clocks and format version directly in the git tree.
+	// Version has to be accessible before any attempt to decode to return early with a unique error.
+	// Clocks could possibly be stored in the git blob but it's nice to separate data and metadata, and
+	// we are storing something directly in the tree already so why not.
+	//
+	// To have a valid Tree, we point the "fake" entries to always the same value, the empty blob.
+	emptyBlobHash, err := repo.StoreData([]byte{})
+	if err != nil {
+		return "", err
+	}
+
+	// Write the Ops as a Git blob containing the serialized array
+	data, err := json.Marshal(struct {
+		Operations []Operation `json:"ops"`
+	}{
+		Operations: opp.Operations,
+	})
+	if err != nil {
+		return "", err
+	}
+	hash, err := repo.StoreData(data)
+	if err != nil {
+		return "", err
+	}
+
+	// Make a Git tree referencing this blob and encoding the other values:
+	// - format version
+	// - clocks
+	tree := []repository.TreeEntry{
+		{ObjectType: repository.Blob, Hash: emptyBlobHash,
+			Name: fmt.Sprintf(versionEntryPrefix+"%d", def.formatVersion)},
+		{ObjectType: repository.Blob, Hash: hash,
+			Name: opsEntryName},
+		{ObjectType: repository.Blob, Hash: emptyBlobHash,
+			Name: fmt.Sprintf(editClockEntryPrefix+"%d", opp.EditTime)},
+		{ObjectType: repository.Blob, Hash: emptyBlobHash,
+			Name: fmt.Sprintf(packClockEntryPrefix+"%d", opp.PackTime)},
+	}
+	if opp.CreateTime > 0 {
+		tree = append(tree, repository.TreeEntry{
+			ObjectType: repository.Blob,
+			Hash:       emptyBlobHash,
+			Name:       fmt.Sprintf(createClockEntryPrefix+"%d", opp.CreateTime),
+		})
+	}
 
+	// Store the tree
+	return repo.StoreTree(tree)
+}
+
+// readOperationPack read the operationPack encoded in git at the given Tree hash.
+//
+// Validity of the Lamport clocks is left for the caller to decide.
 func readOperationPack(def Definition, repo repository.RepoData, treeHash repository.Hash) (*operationPack, error) {
 	entries, err := repo.ReadTree(treeHash)
 	if err != nil {
@@ -37,30 +95,31 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi
 	}
 
 	// check the format version first, fail early instead of trying to read something
-	// var version uint
-	// for _, entry := range entries {
-	// 	if strings.HasPrefix(entry.Name, versionEntryPrefix) {
-	// 		v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64)
-	// 		if err != nil {
-	// 			return nil, errors.Wrap(err, "can't read format version")
-	// 		}
-	// 		if v > 1<<12 {
-	// 			return nil, fmt.Errorf("format version too big")
-	// 		}
-	// 		version = uint(v)
-	// 		break
-	// 	}
-	// }
-	// if version == 0 {
-	// 	return nil, NewErrUnknowFormat(def.formatVersion)
-	// }
-	// if version != def.formatVersion {
-	// 	return nil, NewErrInvalidFormat(version, def.formatVersion)
-	// }
+	var version uint
+	for _, entry := range entries {
+		if strings.HasPrefix(entry.Name, versionEntryPrefix) {
+			v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64)
+			if err != nil {
+				return nil, errors.Wrap(err, "can't read format version")
+			}
+			if v > 1<<12 {
+				return nil, fmt.Errorf("format version too big")
+			}
+			version = uint(v)
+			break
+		}
+	}
+	if version == 0 {
+		return nil, NewErrUnknowFormat(def.formatVersion)
+	}
+	if version != def.formatVersion {
+		return nil, NewErrInvalidFormat(version, def.formatVersion)
+	}
 
 	var ops []Operation
 	var createTime lamport.Time
 	var editTime lamport.Time
+	var packTime lamport.Time
 
 	for _, entry := range entries {
 		if entry.Name == opsEntryName {
@@ -73,7 +132,7 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi
 			if err != nil {
 				return nil, err
 			}
-			break
+			continue
 		}
 
 		if strings.HasPrefix(entry.Name, createClockEntryPrefix) {
@@ -82,6 +141,7 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi
 				return nil, errors.Wrap(err, "can't read creation lamport time")
 			}
 			createTime = lamport.Time(v)
+			continue
 		}
 
 		if strings.HasPrefix(entry.Name, editClockEntryPrefix) {
@@ -90,6 +150,16 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi
 				return nil, errors.Wrap(err, "can't read edit lamport time")
 			}
 			editTime = lamport.Time(v)
+			continue
+		}
+
+		if strings.HasPrefix(entry.Name, packClockEntryPrefix) {
+			v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, packClockEntryPrefix), 10, 64)
+			if err != nil {
+				return nil, errors.Wrap(err, "can't read pack lamport time")
+			}
+			packTime = lamport.Time(v)
+			continue
 		}
 	}
 
@@ -97,9 +167,13 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi
 		Operations: ops,
 		CreateTime: createTime,
 		EditTime:   editTime,
+		PackTime:   packTime,
 	}, nil
 }
 
+// unmarshallOperations delegate the unmarshalling of the Operation's JSON to the decoding
+// function provided by the concrete entity. This gives access to the concrete type of each
+// Operation.
 func unmarshallOperations(def Definition, data []byte) ([]Operation, error) {
 	aux := struct {
 		Operations []json.RawMessage `json:"ops"`