decode.go

  1package eventstream
  2
  3import (
  4	"bytes"
  5	"encoding/binary"
  6	"encoding/hex"
  7	"encoding/json"
  8	"fmt"
  9	"github.com/aws/smithy-go/logging"
 10	"hash"
 11	"hash/crc32"
 12	"io"
 13)
 14
 15// DecoderOptions is the Decoder configuration options.
 16type DecoderOptions struct {
 17	Logger      logging.Logger
 18	LogMessages bool
 19}
 20
 21// Decoder provides decoding of an Event Stream messages.
 22type Decoder struct {
 23	options DecoderOptions
 24}
 25
 26// NewDecoder initializes and returns a Decoder for decoding event
 27// stream messages from the reader provided.
 28func NewDecoder(optFns ...func(*DecoderOptions)) *Decoder {
 29	options := DecoderOptions{}
 30
 31	for _, fn := range optFns {
 32		fn(&options)
 33	}
 34
 35	return &Decoder{
 36		options: options,
 37	}
 38}
 39
 40// Decode attempts to decode a single message from the event stream reader.
 41// Will return the event stream message, or error if decodeMessage fails to read
 42// the message from the stream.
 43//
 44// payloadBuf is a byte slice that will be used in the returned Message.Payload. Callers
 45// must ensure that the Message.Payload from a previous decode has been consumed before passing in the same underlying
 46// payloadBuf byte slice.
 47func (d *Decoder) Decode(reader io.Reader, payloadBuf []byte) (m Message, err error) {
 48	if d.options.Logger != nil && d.options.LogMessages {
 49		debugMsgBuf := bytes.NewBuffer(nil)
 50		reader = io.TeeReader(reader, debugMsgBuf)
 51		defer func() {
 52			logMessageDecode(d.options.Logger, debugMsgBuf, m, err)
 53		}()
 54	}
 55
 56	m, err = decodeMessage(reader, payloadBuf)
 57
 58	return m, err
 59}
 60
 61// decodeMessage attempts to decode a single message from the event stream reader.
 62// Will return the event stream message, or error if decodeMessage fails to read
 63// the message from the reader.
 64func decodeMessage(reader io.Reader, payloadBuf []byte) (m Message, err error) {
 65	crc := crc32.New(crc32IEEETable)
 66	hashReader := io.TeeReader(reader, crc)
 67
 68	prelude, err := decodePrelude(hashReader, crc)
 69	if err != nil {
 70		return Message{}, err
 71	}
 72
 73	if prelude.HeadersLen > 0 {
 74		lr := io.LimitReader(hashReader, int64(prelude.HeadersLen))
 75		m.Headers, err = decodeHeaders(lr)
 76		if err != nil {
 77			return Message{}, err
 78		}
 79	}
 80
 81	if payloadLen := prelude.PayloadLen(); payloadLen > 0 {
 82		buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen)))
 83		if err != nil {
 84			return Message{}, err
 85		}
 86		m.Payload = buf
 87	}
 88
 89	msgCRC := crc.Sum32()
 90	if err := validateCRC(reader, msgCRC); err != nil {
 91		return Message{}, err
 92	}
 93
 94	return m, nil
 95}
 96
 97func logMessageDecode(logger logging.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) {
 98	w := bytes.NewBuffer(nil)
 99	defer func() { logger.Logf(logging.Debug, w.String()) }()
100
101	fmt.Fprintf(w, "Raw message:\n%s\n",
102		hex.Dump(msgBuf.Bytes()))
103
104	if decodeErr != nil {
105		fmt.Fprintf(w, "decodeMessage error: %v\n", decodeErr)
106		return
107	}
108
109	rawMsg, err := msg.rawMessage()
110	if err != nil {
111		fmt.Fprintf(w, "failed to create raw message, %v\n", err)
112		return
113	}
114
115	decodedMsg := decodedMessage{
116		rawMessage: rawMsg,
117		Headers:    decodedHeaders(msg.Headers),
118	}
119
120	fmt.Fprintf(w, "Decoded message:\n")
121	encoder := json.NewEncoder(w)
122	if err := encoder.Encode(decodedMsg); err != nil {
123		fmt.Fprintf(w, "failed to generate decoded message, %v\n", err)
124	}
125}
126
127func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) {
128	var p messagePrelude
129
130	var err error
131	p.Length, err = decodeUint32(r)
132	if err != nil {
133		return messagePrelude{}, err
134	}
135
136	p.HeadersLen, err = decodeUint32(r)
137	if err != nil {
138		return messagePrelude{}, err
139	}
140
141	if err := p.ValidateLens(); err != nil {
142		return messagePrelude{}, err
143	}
144
145	preludeCRC := crc.Sum32()
146	if err := validateCRC(r, preludeCRC); err != nil {
147		return messagePrelude{}, err
148	}
149
150	p.PreludeCRC = preludeCRC
151
152	return p, nil
153}
154
155func decodePayload(buf []byte, r io.Reader) ([]byte, error) {
156	w := bytes.NewBuffer(buf[0:0])
157
158	_, err := io.Copy(w, r)
159	return w.Bytes(), err
160}
161
162func decodeUint8(r io.Reader) (uint8, error) {
163	type byteReader interface {
164		ReadByte() (byte, error)
165	}
166
167	if br, ok := r.(byteReader); ok {
168		v, err := br.ReadByte()
169		return v, err
170	}
171
172	var b [1]byte
173	_, err := io.ReadFull(r, b[:])
174	return b[0], err
175}
176
177func decodeUint16(r io.Reader) (uint16, error) {
178	var b [2]byte
179	bs := b[:]
180	_, err := io.ReadFull(r, bs)
181	if err != nil {
182		return 0, err
183	}
184	return binary.BigEndian.Uint16(bs), nil
185}
186
187func decodeUint32(r io.Reader) (uint32, error) {
188	var b [4]byte
189	bs := b[:]
190	_, err := io.ReadFull(r, bs)
191	if err != nil {
192		return 0, err
193	}
194	return binary.BigEndian.Uint32(bs), nil
195}
196
197func decodeUint64(r io.Reader) (uint64, error) {
198	var b [8]byte
199	bs := b[:]
200	_, err := io.ReadFull(r, bs)
201	if err != nil {
202		return 0, err
203	}
204	return binary.BigEndian.Uint64(bs), nil
205}
206
207func validateCRC(r io.Reader, expect uint32) error {
208	msgCRC, err := decodeUint32(r)
209	if err != nil {
210		return err
211	}
212
213	if msgCRC != expect {
214		return ChecksumError{}
215	}
216
217	return nil
218}