1package eventstream
2
3import (
4 "encoding/base64"
5 "encoding/binary"
6 "encoding/hex"
7 "fmt"
8 "io"
9 "strconv"
10 "time"
11)
12
13const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1
14
15// valueType is the EventStream header value type.
16type valueType uint8
17
18// Header value types
19const (
20 trueValueType valueType = iota
21 falseValueType
22 int8ValueType // Byte
23 int16ValueType // Short
24 int32ValueType // Integer
25 int64ValueType // Long
26 bytesValueType
27 stringValueType
28 timestampValueType
29 uuidValueType
30)
31
32func (t valueType) String() string {
33 switch t {
34 case trueValueType:
35 return "bool"
36 case falseValueType:
37 return "bool"
38 case int8ValueType:
39 return "int8"
40 case int16ValueType:
41 return "int16"
42 case int32ValueType:
43 return "int32"
44 case int64ValueType:
45 return "int64"
46 case bytesValueType:
47 return "byte_array"
48 case stringValueType:
49 return "string"
50 case timestampValueType:
51 return "timestamp"
52 case uuidValueType:
53 return "uuid"
54 default:
55 return fmt.Sprintf("unknown value type %d", uint8(t))
56 }
57}
58
59type rawValue struct {
60 Type valueType
61 Len uint16 // Only set for variable length slices
62 Value []byte // byte representation of value, BigEndian encoding.
63}
64
65func (r rawValue) encodeScalar(w io.Writer, v interface{}) error {
66 return binaryWriteFields(w, binary.BigEndian,
67 r.Type,
68 v,
69 )
70}
71
72func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error {
73 binary.Write(w, binary.BigEndian, r.Type)
74
75 _, err := w.Write(v)
76 return err
77}
78
79func (r rawValue) encodeBytes(w io.Writer, v []byte) error {
80 if len(v) > maxHeaderValueLen {
81 return LengthError{
82 Part: "header value",
83 Want: maxHeaderValueLen, Have: len(v),
84 Value: v,
85 }
86 }
87 r.Len = uint16(len(v))
88
89 err := binaryWriteFields(w, binary.BigEndian,
90 r.Type,
91 r.Len,
92 )
93 if err != nil {
94 return err
95 }
96
97 _, err = w.Write(v)
98 return err
99}
100
101func (r rawValue) encodeString(w io.Writer, v string) error {
102 if len(v) > maxHeaderValueLen {
103 return LengthError{
104 Part: "header value",
105 Want: maxHeaderValueLen, Have: len(v),
106 Value: v,
107 }
108 }
109 r.Len = uint16(len(v))
110
111 type stringWriter interface {
112 WriteString(string) (int, error)
113 }
114
115 err := binaryWriteFields(w, binary.BigEndian,
116 r.Type,
117 r.Len,
118 )
119 if err != nil {
120 return err
121 }
122
123 if sw, ok := w.(stringWriter); ok {
124 _, err = sw.WriteString(v)
125 } else {
126 _, err = w.Write([]byte(v))
127 }
128
129 return err
130}
131
132func decodeFixedBytesValue(r io.Reader, buf []byte) error {
133 _, err := io.ReadFull(r, buf)
134 return err
135}
136
137func decodeBytesValue(r io.Reader) ([]byte, error) {
138 var raw rawValue
139 var err error
140 raw.Len, err = decodeUint16(r)
141 if err != nil {
142 return nil, err
143 }
144
145 buf := make([]byte, raw.Len)
146 _, err = io.ReadFull(r, buf)
147 if err != nil {
148 return nil, err
149 }
150
151 return buf, nil
152}
153
154func decodeStringValue(r io.Reader) (string, error) {
155 v, err := decodeBytesValue(r)
156 return string(v), err
157}
158
159// Value represents the abstract header value.
160type Value interface {
161 Get() interface{}
162 String() string
163 valueType() valueType
164 encode(io.Writer) error
165}
166
167// An BoolValue provides eventstream encoding, and representation
168// of a Go bool value.
169type BoolValue bool
170
171// Get returns the underlying type
172func (v BoolValue) Get() interface{} {
173 return bool(v)
174}
175
176// valueType returns the EventStream header value type value.
177func (v BoolValue) valueType() valueType {
178 if v {
179 return trueValueType
180 }
181 return falseValueType
182}
183
184func (v BoolValue) String() string {
185 return strconv.FormatBool(bool(v))
186}
187
188// encode encodes the BoolValue into an eventstream binary value
189// representation.
190func (v BoolValue) encode(w io.Writer) error {
191 return binary.Write(w, binary.BigEndian, v.valueType())
192}
193
194// An Int8Value provides eventstream encoding, and representation of a Go
195// int8 value.
196type Int8Value int8
197
198// Get returns the underlying value.
199func (v Int8Value) Get() interface{} {
200 return int8(v)
201}
202
203// valueType returns the EventStream header value type value.
204func (Int8Value) valueType() valueType {
205 return int8ValueType
206}
207
208func (v Int8Value) String() string {
209 return fmt.Sprintf("0x%02x", int8(v))
210}
211
212// encode encodes the Int8Value into an eventstream binary value
213// representation.
214func (v Int8Value) encode(w io.Writer) error {
215 raw := rawValue{
216 Type: v.valueType(),
217 }
218
219 return raw.encodeScalar(w, v)
220}
221
222func (v *Int8Value) decode(r io.Reader) error {
223 n, err := decodeUint8(r)
224 if err != nil {
225 return err
226 }
227
228 *v = Int8Value(n)
229 return nil
230}
231
232// An Int16Value provides eventstream encoding, and representation of a Go
233// int16 value.
234type Int16Value int16
235
236// Get returns the underlying value.
237func (v Int16Value) Get() interface{} {
238 return int16(v)
239}
240
241// valueType returns the EventStream header value type value.
242func (Int16Value) valueType() valueType {
243 return int16ValueType
244}
245
246func (v Int16Value) String() string {
247 return fmt.Sprintf("0x%04x", int16(v))
248}
249
250// encode encodes the Int16Value into an eventstream binary value
251// representation.
252func (v Int16Value) encode(w io.Writer) error {
253 raw := rawValue{
254 Type: v.valueType(),
255 }
256 return raw.encodeScalar(w, v)
257}
258
259func (v *Int16Value) decode(r io.Reader) error {
260 n, err := decodeUint16(r)
261 if err != nil {
262 return err
263 }
264
265 *v = Int16Value(n)
266 return nil
267}
268
269// An Int32Value provides eventstream encoding, and representation of a Go
270// int32 value.
271type Int32Value int32
272
273// Get returns the underlying value.
274func (v Int32Value) Get() interface{} {
275 return int32(v)
276}
277
278// valueType returns the EventStream header value type value.
279func (Int32Value) valueType() valueType {
280 return int32ValueType
281}
282
283func (v Int32Value) String() string {
284 return fmt.Sprintf("0x%08x", int32(v))
285}
286
287// encode encodes the Int32Value into an eventstream binary value
288// representation.
289func (v Int32Value) encode(w io.Writer) error {
290 raw := rawValue{
291 Type: v.valueType(),
292 }
293 return raw.encodeScalar(w, v)
294}
295
296func (v *Int32Value) decode(r io.Reader) error {
297 n, err := decodeUint32(r)
298 if err != nil {
299 return err
300 }
301
302 *v = Int32Value(n)
303 return nil
304}
305
306// An Int64Value provides eventstream encoding, and representation of a Go
307// int64 value.
308type Int64Value int64
309
310// Get returns the underlying value.
311func (v Int64Value) Get() interface{} {
312 return int64(v)
313}
314
315// valueType returns the EventStream header value type value.
316func (Int64Value) valueType() valueType {
317 return int64ValueType
318}
319
320func (v Int64Value) String() string {
321 return fmt.Sprintf("0x%016x", int64(v))
322}
323
324// encode encodes the Int64Value into an eventstream binary value
325// representation.
326func (v Int64Value) encode(w io.Writer) error {
327 raw := rawValue{
328 Type: v.valueType(),
329 }
330 return raw.encodeScalar(w, v)
331}
332
333func (v *Int64Value) decode(r io.Reader) error {
334 n, err := decodeUint64(r)
335 if err != nil {
336 return err
337 }
338
339 *v = Int64Value(n)
340 return nil
341}
342
343// An BytesValue provides eventstream encoding, and representation of a Go
344// byte slice.
345type BytesValue []byte
346
347// Get returns the underlying value.
348func (v BytesValue) Get() interface{} {
349 return []byte(v)
350}
351
352// valueType returns the EventStream header value type value.
353func (BytesValue) valueType() valueType {
354 return bytesValueType
355}
356
357func (v BytesValue) String() string {
358 return base64.StdEncoding.EncodeToString([]byte(v))
359}
360
361// encode encodes the BytesValue into an eventstream binary value
362// representation.
363func (v BytesValue) encode(w io.Writer) error {
364 raw := rawValue{
365 Type: v.valueType(),
366 }
367
368 return raw.encodeBytes(w, []byte(v))
369}
370
371func (v *BytesValue) decode(r io.Reader) error {
372 buf, err := decodeBytesValue(r)
373 if err != nil {
374 return err
375 }
376
377 *v = BytesValue(buf)
378 return nil
379}
380
381// An StringValue provides eventstream encoding, and representation of a Go
382// string.
383type StringValue string
384
385// Get returns the underlying value.
386func (v StringValue) Get() interface{} {
387 return string(v)
388}
389
390// valueType returns the EventStream header value type value.
391func (StringValue) valueType() valueType {
392 return stringValueType
393}
394
395func (v StringValue) String() string {
396 return string(v)
397}
398
399// encode encodes the StringValue into an eventstream binary value
400// representation.
401func (v StringValue) encode(w io.Writer) error {
402 raw := rawValue{
403 Type: v.valueType(),
404 }
405
406 return raw.encodeString(w, string(v))
407}
408
409func (v *StringValue) decode(r io.Reader) error {
410 s, err := decodeStringValue(r)
411 if err != nil {
412 return err
413 }
414
415 *v = StringValue(s)
416 return nil
417}
418
419// An TimestampValue provides eventstream encoding, and representation of a Go
420// timestamp.
421type TimestampValue time.Time
422
423// Get returns the underlying value.
424func (v TimestampValue) Get() interface{} {
425 return time.Time(v)
426}
427
428// valueType returns the EventStream header value type value.
429func (TimestampValue) valueType() valueType {
430 return timestampValueType
431}
432
433func (v TimestampValue) epochMilli() int64 {
434 nano := time.Time(v).UnixNano()
435 msec := nano / int64(time.Millisecond)
436 return msec
437}
438
439func (v TimestampValue) String() string {
440 msec := v.epochMilli()
441 return strconv.FormatInt(msec, 10)
442}
443
444// encode encodes the TimestampValue into an eventstream binary value
445// representation.
446func (v TimestampValue) encode(w io.Writer) error {
447 raw := rawValue{
448 Type: v.valueType(),
449 }
450
451 msec := v.epochMilli()
452 return raw.encodeScalar(w, msec)
453}
454
455func (v *TimestampValue) decode(r io.Reader) error {
456 n, err := decodeUint64(r)
457 if err != nil {
458 return err
459 }
460
461 *v = TimestampValue(timeFromEpochMilli(int64(n)))
462 return nil
463}
464
465// MarshalJSON implements the json.Marshaler interface
466func (v TimestampValue) MarshalJSON() ([]byte, error) {
467 return []byte(v.String()), nil
468}
469
470func timeFromEpochMilli(t int64) time.Time {
471 secs := t / 1e3
472 msec := t % 1e3
473 return time.Unix(secs, msec*int64(time.Millisecond)).UTC()
474}
475
476// An UUIDValue provides eventstream encoding, and representation of a UUID
477// value.
478type UUIDValue [16]byte
479
480// Get returns the underlying value.
481func (v UUIDValue) Get() interface{} {
482 return v[:]
483}
484
485// valueType returns the EventStream header value type value.
486func (UUIDValue) valueType() valueType {
487 return uuidValueType
488}
489
490func (v UUIDValue) String() string {
491 var scratch [36]byte
492
493 const dash = '-'
494
495 hex.Encode(scratch[:8], v[0:4])
496 scratch[8] = dash
497 hex.Encode(scratch[9:13], v[4:6])
498 scratch[13] = dash
499 hex.Encode(scratch[14:18], v[6:8])
500 scratch[18] = dash
501 hex.Encode(scratch[19:23], v[8:10])
502 scratch[23] = dash
503 hex.Encode(scratch[24:], v[10:])
504
505 return string(scratch[:])
506}
507
508// encode encodes the UUIDValue into an eventstream binary value
509// representation.
510func (v UUIDValue) encode(w io.Writer) error {
511 raw := rawValue{
512 Type: v.valueType(),
513 }
514
515 return raw.encodeFixedSlice(w, v[:])
516}
517
518func (v *UUIDValue) decode(r io.Reader) error {
519 tv := (*v)[:]
520 return decodeFixedBytesValue(r, tv)
521}