From 474e286aca219663c3b025a519d22a44731f128a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 8 Jun 2014 14:02:42 -0700 Subject: [PATCH] Rename shared msgpack handle --- command/agent/rpc.go | 11 ++++++++--- command/agent/rpc_client.go | 4 ++-- consul/fsm.go | 4 ++-- consul/mdb_table_test.go | 4 ++-- consul/pool.go | 5 +++-- consul/rpc.go | 2 +- consul/status_endpoint_test.go | 2 +- consul/structs/structs.go | 9 +++++---- 8 files changed, 24 insertions(+), 17 deletions(-) diff --git a/command/agent/rpc.go b/command/agent/rpc.go index e4d17ff38a..63b7a6bce4 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -60,7 +60,12 @@ const ( monitorExists = "Monitor already exists" ) -var mh = codec.MsgpackHandle{RawToString: true, WriteExt: true} +// msgpackHandle is a shared handle for encoding/decoding of +// messages +var msgpackHandle = &codec.MsgpackHandle{ + RawToString: true, + WriteExt: true, +} // Request header is sent before each request type requestHeader struct { @@ -251,8 +256,8 @@ func (i *AgentRPC) listen() { reader: bufio.NewReader(conn), writer: bufio.NewWriter(conn), } - client.dec = codec.NewDecoder(client.reader, &mh) - client.enc = codec.NewEncoder(client.writer, &mh) + client.dec = codec.NewDecoder(client.reader, msgpackHandle) + client.enc = codec.NewEncoder(client.writer, msgpackHandle) if err != nil { i.logger.Printf("[ERR] agent.rpc: Failed to create decoder: %v", err) conn.Close() diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 2b6d98ca86..e173ea99a6 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -94,8 +94,8 @@ func NewRPCClient(addr string) (*RPCClient, error) { dispatch: make(map[uint64]seqHandler), shutdownCh: make(chan struct{}), } - client.dec = codec.NewDecoder(client.reader, &mh) - client.enc = codec.NewEncoder(client.writer, &mh) + client.dec = codec.NewDecoder(client.reader, msgpackHandle) + client.enc = codec.NewEncoder(client.writer, msgpackHandle) go client.listen() // Do the initial handshake diff --git a/consul/fsm.go b/consul/fsm.go index e26dffa56f..8b4fd3d65a 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -221,7 +221,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { c.state = state // Create a decoder - dec := codec.NewDecoder(old, &mh) + dec := codec.NewDecoder(old, msgpackHandle) // Read in the header var header snapshotHeader @@ -277,7 +277,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { // Register the nodes - encoder := codec.NewEncoder(sink, &mh) + encoder := codec.NewEncoder(sink, msgpackHandle) // Write the header header := snapshotHeader{ diff --git a/consul/mdb_table_test.go b/consul/mdb_table_test.go index 11bb10e79e..f67d0cc418 100644 --- a/consul/mdb_table_test.go +++ b/consul/mdb_table_test.go @@ -19,7 +19,7 @@ type MockData struct { func MockEncoder(obj interface{}) []byte { buf := bytes.NewBuffer(nil) - encoder := codec.NewEncoder(buf, &mh) + encoder := codec.NewEncoder(buf, msgpackHandle) err := encoder.Encode(obj) if err != nil { panic(err) @@ -29,7 +29,7 @@ func MockEncoder(obj interface{}) []byte { func MockDecoder(buf []byte) interface{} { out := new(MockData) - err := codec.NewDecoder(bytes.NewReader(buf), &mh).Decode(out) + err := codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) if err != nil { panic(err) } diff --git a/consul/pool.go b/consul/pool.go index d6cfb90350..4e95d30145 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -15,7 +15,8 @@ import ( "time" ) -var mh = codec.MsgpackHandle{} +// msgpackHandle is a shared handle for encoding/decoding of RPC messages +var msgpackHandle = &codec.MsgpackHandle{} // muxSession is used to provide an interface for either muxado or yamux type muxSession interface { @@ -81,7 +82,7 @@ func (c *Conn) getClient() (*StreamClient, error) { } // Create the RPC client - cc := codec.GoRpc.ClientCodec(stream, &mh) + cc := codec.GoRpc.ClientCodec(stream, msgpackHandle) client := rpc.NewClientWithCodec(cc) // Return a new stream client diff --git a/consul/rpc.go b/consul/rpc.go index 788f09f839..d2d6459e09 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -148,7 +148,7 @@ func (s *Server) handleMultiplexV2(conn net.Conn) { // handleConsulConn is used to service a single Consul RPC connection func (s *Server) handleConsulConn(conn net.Conn) { defer conn.Close() - rpcCodec := codec.GoRpc.ServerCodec(conn, &mh) + rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle) for !s.shutdown { if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { if err != io.EOF && !strings.Contains(err.Error(), "closed") { diff --git a/consul/status_endpoint_test.go b/consul/status_endpoint_test.go index b5759befe4..15cb2dcfcd 100644 --- a/consul/status_endpoint_test.go +++ b/consul/status_endpoint_test.go @@ -19,7 +19,7 @@ func rpcClient(t *testing.T, s *Server) *rpc.Client { // Write the Consul RPC byte to set the mode conn.Write([]byte{byte(rpcConsul)}) - cc := codec.GoRpc.ClientCodec(conn, &mh) + cc := codec.GoRpc.ClientCodec(conn, msgpackHandle) return rpc.NewClientWithCodec(cc) } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 9a305eae55..c42250214d 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -395,17 +395,18 @@ type IndexedSessions struct { QueryMeta } -var mh = codec.MsgpackHandle{} +// msgpackHandle is a shared handle for encoding/decoding of structs +var msgpackHandle = &codec.MsgpackHandle{} // Decode is used to decode a MsgPack encoded object func Decode(buf []byte, out interface{}) error { - return codec.NewDecoder(bytes.NewReader(buf), &mh).Decode(out) + return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) } // Encode is used to encode a MsgPack object with type prefix func Encode(t MessageType, msg interface{}) ([]byte, error) { - buf := bytes.NewBuffer(nil) + var buf bytes.Buffer buf.WriteByte(uint8(t)) - err := codec.NewEncoder(buf, &mh).Encode(msg) + err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) return buf.Bytes(), err }