rpc.go

  1// Copyright (c) 2012-2018 Ugorji Nwoke. All rights reserved.
  2// Use of this source code is governed by a MIT license found in the LICENSE file.
  3
  4package codec
  5
  6import (
  7	"bufio"
  8	"errors"
  9	"io"
 10	"net/rpc"
 11	"sync"
 12)
 13
 14// Rpc provides a rpc Server or Client Codec for rpc communication.
 15type Rpc interface {
 16	ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
 17	ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
 18}
 19
 20// RPCOptions holds options specific to rpc functionality
 21type RPCOptions struct {
 22	// RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
 23	//
 24	// Set RPCNoBuffer=true to turn buffering off.
 25	// Buffering can still be done if buffered connections are passed in, or
 26	// buffering is configured on the handle.
 27	RPCNoBuffer bool
 28}
 29
 30// rpcCodec defines the struct members and common methods.
 31type rpcCodec struct {
 32	c io.Closer
 33	r io.Reader
 34	w io.Writer
 35	f ioFlusher
 36
 37	dec *Decoder
 38	enc *Encoder
 39	// bw  *bufio.Writer
 40	// br  *bufio.Reader
 41	mu sync.Mutex
 42	h  Handle
 43
 44	cls    bool
 45	clsmu  sync.RWMutex
 46	clsErr error
 47}
 48
 49func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
 50	// return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
 51	return newRPCCodec2(conn, conn, conn, h)
 52}
 53
 54func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
 55	// defensive: ensure that jsonH has TermWhitespace turned on.
 56	if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
 57		panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
 58	}
 59	// always ensure that we use a flusher, and always flush what was written to the connection.
 60	// we lose nothing by using a buffered writer internally.
 61	f, ok := w.(ioFlusher)
 62	bh := h.getBasicHandle()
 63	if !bh.RPCNoBuffer {
 64		if bh.WriterBufferSize <= 0 {
 65			if !ok {
 66				bw := bufio.NewWriter(w)
 67				f, w = bw, bw
 68			}
 69		}
 70		if bh.ReaderBufferSize <= 0 {
 71			if _, ok = w.(ioPeeker); !ok {
 72				if _, ok = w.(ioBuffered); !ok {
 73					br := bufio.NewReader(r)
 74					r = br
 75				}
 76			}
 77		}
 78	}
 79	return rpcCodec{
 80		c:   c,
 81		w:   w,
 82		r:   r,
 83		f:   f,
 84		h:   h,
 85		enc: NewEncoder(w, h),
 86		dec: NewDecoder(r, h),
 87	}
 88}
 89
 90func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
 91	if c.isClosed() {
 92		return c.clsErr
 93	}
 94	err = c.enc.Encode(obj1)
 95	if err == nil {
 96		if writeObj2 {
 97			err = c.enc.Encode(obj2)
 98		}
 99		// if err == nil && c.f != nil {
100		// 	err = c.f.Flush()
101		// }
102	}
103	if c.f != nil {
104		if err == nil {
105			err = c.f.Flush()
106		} else {
107			_ = c.f.Flush() // swallow flush error, so we maintain prior error on write
108		}
109	}
110	return
111}
112
113func (c *rpcCodec) swallow(err *error) {
114	defer panicToErr(c.dec, err)
115	c.dec.swallow()
116}
117
118func (c *rpcCodec) read(obj interface{}) (err error) {
119	if c.isClosed() {
120		return c.clsErr
121	}
122	//If nil is passed in, we should read and discard
123	if obj == nil {
124		// var obj2 interface{}
125		// return c.dec.Decode(&obj2)
126		c.swallow(&err)
127		return
128	}
129	return c.dec.Decode(obj)
130}
131
132func (c *rpcCodec) isClosed() (b bool) {
133	if c.c != nil {
134		c.clsmu.RLock()
135		b = c.cls
136		c.clsmu.RUnlock()
137	}
138	return
139}
140
141func (c *rpcCodec) Close() error {
142	if c.c == nil || c.isClosed() {
143		return c.clsErr
144	}
145	c.clsmu.Lock()
146	c.cls = true
147	c.clsErr = c.c.Close()
148	c.clsmu.Unlock()
149	return c.clsErr
150}
151
152func (c *rpcCodec) ReadResponseBody(body interface{}) error {
153	return c.read(body)
154}
155
156// -------------------------------------
157
158type goRpcCodec struct {
159	rpcCodec
160}
161
162func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
163	// Must protect for concurrent access as per API
164	c.mu.Lock()
165	defer c.mu.Unlock()
166	return c.write(r, body, true)
167}
168
169func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
170	c.mu.Lock()
171	defer c.mu.Unlock()
172	return c.write(r, body, true)
173}
174
175func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
176	return c.read(r)
177}
178
179func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
180	return c.read(r)
181}
182
183func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
184	return c.read(body)
185}
186
187// -------------------------------------
188
189// goRpc is the implementation of Rpc that uses the communication protocol
190// as defined in net/rpc package.
191type goRpc struct{}
192
193// GoRpc implements Rpc using the communication protocol defined in net/rpc package.
194//
195// Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
196//
197// For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
198// This ensures we use an adequate buffer during reading and writing.
199// If not configured, we will internally initialize and use a buffer during reads and writes.
200// This can be turned off via the RPCNoBuffer option on the Handle.
201//   var handle codec.JsonHandle
202//   handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
203//
204// Example 1: one way of configuring buffering explicitly:
205//   var handle codec.JsonHandle // codec handle
206//   handle.ReaderBufferSize = 1024
207//   handle.WriterBufferSize = 1024
208//   var conn io.ReadWriteCloser // connection got from a socket
209//   var serverCodec = GoRpc.ServerCodec(conn, handle)
210//   var clientCodec = GoRpc.ClientCodec(conn, handle)
211//
212// Example 2: you can also explicitly create a buffered connection yourself,
213// and not worry about configuring the buffer sizes in the Handle.
214//   var handle codec.Handle     // codec handle
215//   var conn io.ReadWriteCloser // connection got from a socket
216//   var bufconn = struct {      // bufconn here is a buffered io.ReadWriteCloser
217//       io.Closer
218//       *bufio.Reader
219//       *bufio.Writer
220//   }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
221//   var serverCodec = GoRpc.ServerCodec(bufconn, handle)
222//   var clientCodec = GoRpc.ClientCodec(bufconn, handle)
223//
224var GoRpc goRpc
225
226func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
227	return &goRpcCodec{newRPCCodec(conn, h)}
228}
229
230func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
231	return &goRpcCodec{newRPCCodec(conn, h)}
232}