diff --git a/consul/server.go b/consul/server.go index 0a29504c3d..b1a744f4c7 100644 --- a/consul/server.go +++ b/consul/server.go @@ -2,6 +2,7 @@ package consul import ( "crypto/tls" + "errors" "fmt" "github.com/hashicorp/raft" "github.com/hashicorp/raft-mdb" @@ -11,6 +12,7 @@ import ( "net/rpc" "os" "path/filepath" + "reflect" "runtime" "strconv" "sync" @@ -489,11 +491,52 @@ func (s *Server) IsLeader() bool { return s.raft.State() == raft.Leader } +// inmemCodec is used to do an RPC call without going over a network +type inmemCodec struct { + method string + args interface{} + reply interface{} + err error +} + +func (i *inmemCodec) ReadRequestHeader(req *rpc.Request) error { + req.ServiceMethod = i.method + return nil +} + +func (i *inmemCodec) ReadRequestBody(args interface{}) error { + sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.args))) + dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(args))) + dst.Set(sourceValue) + return nil +} + +func (i *inmemCodec) WriteResponse(resp *rpc.Response, reply interface{}) error { + if resp.Error != "" { + i.err = errors.New(resp.Error) + return nil + } + sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(reply))) + dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.reply))) + dst.Set(sourceValue) + return nil +} + +func (i *inmemCodec) Close() error { + return nil +} + // RPC is used to make a local RPC call func (s *Server) RPC(method string, args interface{}, reply interface{}) error { - addr := s.rpcListener.Addr() - version := int(s.config.ProtocolVersion) - return s.connPool.RPC(addr, version, method, args, reply) + codec := &inmemCodec{ + method: method, + args: args, + reply: reply, + } + if err := s.rpcServer.ServeRequest(codec); err != nil { + return err + } + return codec.err } // Stats is used to return statistics for debugging and insight