1package eventstream
2
3import (
4 "bytes"
5 "encoding/binary"
6 "encoding/hex"
7 "encoding/json"
8 "fmt"
9 "github.com/aws/smithy-go/logging"
10 "hash"
11 "hash/crc32"
12 "io"
13)
14
15// DecoderOptions is the Decoder configuration options.
16type DecoderOptions struct {
17 Logger logging.Logger
18 LogMessages bool
19}
20
21// Decoder provides decoding of an Event Stream messages.
22type Decoder struct {
23 options DecoderOptions
24}
25
26// NewDecoder initializes and returns a Decoder for decoding event
27// stream messages from the reader provided.
28func NewDecoder(optFns ...func(*DecoderOptions)) *Decoder {
29 options := DecoderOptions{}
30
31 for _, fn := range optFns {
32 fn(&options)
33 }
34
35 return &Decoder{
36 options: options,
37 }
38}
39
40// Decode attempts to decode a single message from the event stream reader.
41// Will return the event stream message, or error if decodeMessage fails to read
42// the message from the stream.
43//
44// payloadBuf is a byte slice that will be used in the returned Message.Payload. Callers
45// must ensure that the Message.Payload from a previous decode has been consumed before passing in the same underlying
46// payloadBuf byte slice.
47func (d *Decoder) Decode(reader io.Reader, payloadBuf []byte) (m Message, err error) {
48 if d.options.Logger != nil && d.options.LogMessages {
49 debugMsgBuf := bytes.NewBuffer(nil)
50 reader = io.TeeReader(reader, debugMsgBuf)
51 defer func() {
52 logMessageDecode(d.options.Logger, debugMsgBuf, m, err)
53 }()
54 }
55
56 m, err = decodeMessage(reader, payloadBuf)
57
58 return m, err
59}
60
61// decodeMessage attempts to decode a single message from the event stream reader.
62// Will return the event stream message, or error if decodeMessage fails to read
63// the message from the reader.
64func decodeMessage(reader io.Reader, payloadBuf []byte) (m Message, err error) {
65 crc := crc32.New(crc32IEEETable)
66 hashReader := io.TeeReader(reader, crc)
67
68 prelude, err := decodePrelude(hashReader, crc)
69 if err != nil {
70 return Message{}, err
71 }
72
73 if prelude.HeadersLen > 0 {
74 lr := io.LimitReader(hashReader, int64(prelude.HeadersLen))
75 m.Headers, err = decodeHeaders(lr)
76 if err != nil {
77 return Message{}, err
78 }
79 }
80
81 if payloadLen := prelude.PayloadLen(); payloadLen > 0 {
82 buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen)))
83 if err != nil {
84 return Message{}, err
85 }
86 m.Payload = buf
87 }
88
89 msgCRC := crc.Sum32()
90 if err := validateCRC(reader, msgCRC); err != nil {
91 return Message{}, err
92 }
93
94 return m, nil
95}
96
97func logMessageDecode(logger logging.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) {
98 w := bytes.NewBuffer(nil)
99 defer func() { logger.Logf(logging.Debug, w.String()) }()
100
101 fmt.Fprintf(w, "Raw message:\n%s\n",
102 hex.Dump(msgBuf.Bytes()))
103
104 if decodeErr != nil {
105 fmt.Fprintf(w, "decodeMessage error: %v\n", decodeErr)
106 return
107 }
108
109 rawMsg, err := msg.rawMessage()
110 if err != nil {
111 fmt.Fprintf(w, "failed to create raw message, %v\n", err)
112 return
113 }
114
115 decodedMsg := decodedMessage{
116 rawMessage: rawMsg,
117 Headers: decodedHeaders(msg.Headers),
118 }
119
120 fmt.Fprintf(w, "Decoded message:\n")
121 encoder := json.NewEncoder(w)
122 if err := encoder.Encode(decodedMsg); err != nil {
123 fmt.Fprintf(w, "failed to generate decoded message, %v\n", err)
124 }
125}
126
127func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) {
128 var p messagePrelude
129
130 var err error
131 p.Length, err = decodeUint32(r)
132 if err != nil {
133 return messagePrelude{}, err
134 }
135
136 p.HeadersLen, err = decodeUint32(r)
137 if err != nil {
138 return messagePrelude{}, err
139 }
140
141 if err := p.ValidateLens(); err != nil {
142 return messagePrelude{}, err
143 }
144
145 preludeCRC := crc.Sum32()
146 if err := validateCRC(r, preludeCRC); err != nil {
147 return messagePrelude{}, err
148 }
149
150 p.PreludeCRC = preludeCRC
151
152 return p, nil
153}
154
155func decodePayload(buf []byte, r io.Reader) ([]byte, error) {
156 w := bytes.NewBuffer(buf[0:0])
157
158 _, err := io.Copy(w, r)
159 return w.Bytes(), err
160}
161
162func decodeUint8(r io.Reader) (uint8, error) {
163 type byteReader interface {
164 ReadByte() (byte, error)
165 }
166
167 if br, ok := r.(byteReader); ok {
168 v, err := br.ReadByte()
169 return v, err
170 }
171
172 var b [1]byte
173 _, err := io.ReadFull(r, b[:])
174 return b[0], err
175}
176
177func decodeUint16(r io.Reader) (uint16, error) {
178 var b [2]byte
179 bs := b[:]
180 _, err := io.ReadFull(r, bs)
181 if err != nil {
182 return 0, err
183 }
184 return binary.BigEndian.Uint16(bs), nil
185}
186
187func decodeUint32(r io.Reader) (uint32, error) {
188 var b [4]byte
189 bs := b[:]
190 _, err := io.ReadFull(r, bs)
191 if err != nil {
192 return 0, err
193 }
194 return binary.BigEndian.Uint32(bs), nil
195}
196
197func decodeUint64(r io.Reader) (uint64, error) {
198 var b [8]byte
199 bs := b[:]
200 _, err := io.ReadFull(r, bs)
201 if err != nil {
202 return 0, err
203 }
204 return binary.BigEndian.Uint64(bs), nil
205}
206
207func validateCRC(r io.Reader, expect uint32) error {
208 msgCRC, err := decodeUint32(r)
209 if err != nil {
210 return err
211 }
212
213 if msgCRC != expect {
214 return ChecksumError{}
215 }
216
217 return nil
218}