encode.go

  1package eventstream
  2
  3import (
  4	"bytes"
  5	"encoding/binary"
  6	"encoding/hex"
  7	"encoding/json"
  8	"fmt"
  9	"github.com/aws/smithy-go/logging"
 10	"hash"
 11	"hash/crc32"
 12	"io"
 13)
 14
 15// EncoderOptions is the configuration options for Encoder.
 16type EncoderOptions struct {
 17	Logger      logging.Logger
 18	LogMessages bool
 19}
 20
 21// Encoder provides EventStream message encoding.
 22type Encoder struct {
 23	options EncoderOptions
 24
 25	headersBuf *bytes.Buffer
 26	messageBuf *bytes.Buffer
 27}
 28
 29// NewEncoder initializes and returns an Encoder to encode Event Stream
 30// messages.
 31func NewEncoder(optFns ...func(*EncoderOptions)) *Encoder {
 32	o := EncoderOptions{}
 33
 34	for _, fn := range optFns {
 35		fn(&o)
 36	}
 37
 38	return &Encoder{
 39		options:    o,
 40		headersBuf: bytes.NewBuffer(nil),
 41		messageBuf: bytes.NewBuffer(nil),
 42	}
 43}
 44
 45// Encode encodes a single EventStream message to the io.Writer the Encoder
 46// was created with. An error is returned if writing the message fails.
 47func (e *Encoder) Encode(w io.Writer, msg Message) (err error) {
 48	e.headersBuf.Reset()
 49	e.messageBuf.Reset()
 50
 51	var writer io.Writer = e.messageBuf
 52	if e.options.Logger != nil && e.options.LogMessages {
 53		encodeMsgBuf := bytes.NewBuffer(nil)
 54		writer = io.MultiWriter(writer, encodeMsgBuf)
 55		defer func() {
 56			logMessageEncode(e.options.Logger, encodeMsgBuf, msg, err)
 57		}()
 58	}
 59
 60	if err = EncodeHeaders(e.headersBuf, msg.Headers); err != nil {
 61		return err
 62	}
 63
 64	crc := crc32.New(crc32IEEETable)
 65	hashWriter := io.MultiWriter(writer, crc)
 66
 67	headersLen := uint32(e.headersBuf.Len())
 68	payloadLen := uint32(len(msg.Payload))
 69
 70	if err = encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil {
 71		return err
 72	}
 73
 74	if headersLen > 0 {
 75		if _, err = io.Copy(hashWriter, e.headersBuf); err != nil {
 76			return err
 77		}
 78	}
 79
 80	if payloadLen > 0 {
 81		if _, err = hashWriter.Write(msg.Payload); err != nil {
 82			return err
 83		}
 84	}
 85
 86	msgCRC := crc.Sum32()
 87	if err := binary.Write(writer, binary.BigEndian, msgCRC); err != nil {
 88		return err
 89	}
 90
 91	_, err = io.Copy(w, e.messageBuf)
 92
 93	return err
 94}
 95
 96func logMessageEncode(logger logging.Logger, msgBuf *bytes.Buffer, msg Message, encodeErr error) {
 97	w := bytes.NewBuffer(nil)
 98	defer func() { logger.Logf(logging.Debug, w.String()) }()
 99
100	fmt.Fprintf(w, "Message to encode:\n")
101	encoder := json.NewEncoder(w)
102	if err := encoder.Encode(msg); err != nil {
103		fmt.Fprintf(w, "Failed to get encoded message, %v\n", err)
104	}
105
106	if encodeErr != nil {
107		fmt.Fprintf(w, "Encode error: %v\n", encodeErr)
108		return
109	}
110
111	fmt.Fprintf(w, "Raw message:\n%s\n", hex.Dump(msgBuf.Bytes()))
112}
113
114func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error {
115	p := messagePrelude{
116		Length:     minMsgLen + headersLen + payloadLen,
117		HeadersLen: headersLen,
118	}
119	if err := p.ValidateLens(); err != nil {
120		return err
121	}
122
123	err := binaryWriteFields(w, binary.BigEndian,
124		p.Length,
125		p.HeadersLen,
126	)
127	if err != nil {
128		return err
129	}
130
131	p.PreludeCRC = crc.Sum32()
132	err = binary.Write(w, binary.BigEndian, p.PreludeCRC)
133	if err != nil {
134		return err
135	}
136
137	return nil
138}
139
140// EncodeHeaders writes the header values to the writer encoded in the event
141// stream format. Returns an error if a header fails to encode.
142func EncodeHeaders(w io.Writer, headers Headers) error {
143	for _, h := range headers {
144		hn := headerName{
145			Len: uint8(len(h.Name)),
146		}
147		copy(hn.Name[:hn.Len], h.Name)
148		if err := hn.encode(w); err != nil {
149			return err
150		}
151
152		if err := h.Value.encode(w); err != nil {
153			return err
154		}
155	}
156
157	return nil
158}
159
160func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error {
161	for _, v := range vs {
162		if err := binary.Write(w, order, v); err != nil {
163			return err
164		}
165	}
166	return nil
167}