persisted_lamport.go

 1package lamport
 2
 3import (
 4	"fmt"
 5	"io/ioutil"
 6	"os"
 7	"path/filepath"
 8)
 9
10type Persisted struct {
11	Clock
12	filePath string
13}
14
15// NewPersisted create a new persisted Lamport clock
16func NewPersisted(filePath string) (*Persisted, error) {
17	clock := &Persisted{
18		Clock:    NewClock(),
19		filePath: filePath,
20	}
21
22	dir := filepath.Dir(filePath)
23	err := os.MkdirAll(dir, 0777)
24	if err != nil {
25		return nil, err
26	}
27
28	return clock, nil
29}
30
31// LoadPersisted load a persisted Lamport clock from a file
32func LoadPersisted(filePath string) (*Persisted, error) {
33	clock := &Persisted{
34		filePath: filePath,
35	}
36
37	err := clock.read()
38	if err != nil {
39		return nil, err
40	}
41
42	return clock, nil
43}
44
45// Increment is used to return the value of the lamport clock and increment it afterwards
46func (c *Persisted) Increment() (Time, error) {
47	time := c.Clock.Increment()
48	return time, c.Write()
49}
50
51// Witness is called to update our local clock if necessary after
52// witnessing a clock value received from another process
53func (c *Persisted) Witness(time Time) error {
54	// TODO: rework so that we write only when the clock was actually updated
55	c.Clock.Witness(time)
56	return c.Write()
57}
58
59func (c *Persisted) read() error {
60	content, err := ioutil.ReadFile(c.filePath)
61	if err != nil {
62		return err
63	}
64
65	var value uint64
66	n, err := fmt.Sscanf(string(content), "%d", &value)
67
68	if err != nil {
69		return err
70	}
71
72	if n != 1 {
73		return fmt.Errorf("could not read the clock")
74	}
75
76	c.Clock = NewClockWithTime(value)
77
78	return nil
79}
80
81func (c *Persisted) Write() error {
82	data := []byte(fmt.Sprintf("%d", c.counter))
83	return ioutil.WriteFile(c.filePath, data, 0644)
84}