1package eventstream
2
3import (
4 "bytes"
5 "encoding/binary"
6 "encoding/hex"
7 "encoding/json"
8 "fmt"
9 "github.com/aws/smithy-go/logging"
10 "hash"
11 "hash/crc32"
12 "io"
13)
14
15// EncoderOptions is the configuration options for Encoder.
16type EncoderOptions struct {
17 Logger logging.Logger
18 LogMessages bool
19}
20
21// Encoder provides EventStream message encoding.
22type Encoder struct {
23 options EncoderOptions
24
25 headersBuf *bytes.Buffer
26 messageBuf *bytes.Buffer
27}
28
29// NewEncoder initializes and returns an Encoder to encode Event Stream
30// messages.
31func NewEncoder(optFns ...func(*EncoderOptions)) *Encoder {
32 o := EncoderOptions{}
33
34 for _, fn := range optFns {
35 fn(&o)
36 }
37
38 return &Encoder{
39 options: o,
40 headersBuf: bytes.NewBuffer(nil),
41 messageBuf: bytes.NewBuffer(nil),
42 }
43}
44
45// Encode encodes a single EventStream message to the io.Writer the Encoder
46// was created with. An error is returned if writing the message fails.
47func (e *Encoder) Encode(w io.Writer, msg Message) (err error) {
48 e.headersBuf.Reset()
49 e.messageBuf.Reset()
50
51 var writer io.Writer = e.messageBuf
52 if e.options.Logger != nil && e.options.LogMessages {
53 encodeMsgBuf := bytes.NewBuffer(nil)
54 writer = io.MultiWriter(writer, encodeMsgBuf)
55 defer func() {
56 logMessageEncode(e.options.Logger, encodeMsgBuf, msg, err)
57 }()
58 }
59
60 if err = EncodeHeaders(e.headersBuf, msg.Headers); err != nil {
61 return err
62 }
63
64 crc := crc32.New(crc32IEEETable)
65 hashWriter := io.MultiWriter(writer, crc)
66
67 headersLen := uint32(e.headersBuf.Len())
68 payloadLen := uint32(len(msg.Payload))
69
70 if err = encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil {
71 return err
72 }
73
74 if headersLen > 0 {
75 if _, err = io.Copy(hashWriter, e.headersBuf); err != nil {
76 return err
77 }
78 }
79
80 if payloadLen > 0 {
81 if _, err = hashWriter.Write(msg.Payload); err != nil {
82 return err
83 }
84 }
85
86 msgCRC := crc.Sum32()
87 if err := binary.Write(writer, binary.BigEndian, msgCRC); err != nil {
88 return err
89 }
90
91 _, err = io.Copy(w, e.messageBuf)
92
93 return err
94}
95
96func logMessageEncode(logger logging.Logger, msgBuf *bytes.Buffer, msg Message, encodeErr error) {
97 w := bytes.NewBuffer(nil)
98 defer func() { logger.Logf(logging.Debug, w.String()) }()
99
100 fmt.Fprintf(w, "Message to encode:\n")
101 encoder := json.NewEncoder(w)
102 if err := encoder.Encode(msg); err != nil {
103 fmt.Fprintf(w, "Failed to get encoded message, %v\n", err)
104 }
105
106 if encodeErr != nil {
107 fmt.Fprintf(w, "Encode error: %v\n", encodeErr)
108 return
109 }
110
111 fmt.Fprintf(w, "Raw message:\n%s\n", hex.Dump(msgBuf.Bytes()))
112}
113
114func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error {
115 p := messagePrelude{
116 Length: minMsgLen + headersLen + payloadLen,
117 HeadersLen: headersLen,
118 }
119 if err := p.ValidateLens(); err != nil {
120 return err
121 }
122
123 err := binaryWriteFields(w, binary.BigEndian,
124 p.Length,
125 p.HeadersLen,
126 )
127 if err != nil {
128 return err
129 }
130
131 p.PreludeCRC = crc.Sum32()
132 err = binary.Write(w, binary.BigEndian, p.PreludeCRC)
133 if err != nil {
134 return err
135 }
136
137 return nil
138}
139
140// EncodeHeaders writes the header values to the writer encoded in the event
141// stream format. Returns an error if a header fails to encode.
142func EncodeHeaders(w io.Writer, headers Headers) error {
143 for _, h := range headers {
144 hn := headerName{
145 Len: uint8(len(h.Name)),
146 }
147 copy(hn.Name[:hn.Len], h.Name)
148 if err := hn.encode(w); err != nil {
149 return err
150 }
151
152 if err := h.Value.encode(w); err != nil {
153 return err
154 }
155 }
156
157 return nil
158}
159
160func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error {
161 for _, v := range vs {
162 if err := binary.Write(w, order, v); err != nil {
163 return err
164 }
165 }
166 return nil
167}