From 310eb2f50695b5c3eec5756461be7eebd3d24bde Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 6 Dec 2013 16:35:13 -0800 Subject: [PATCH] consul: Basic RPC framework --- consul/config.go | 12 ++++++-- consul/server.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/consul/config.go b/consul/config.go index 902c5b171b..acda4840ec 100644 --- a/consul/config.go +++ b/consul/config.go @@ -9,9 +9,10 @@ import ( ) const ( - DefaultRaftAddr = "0.0.0.0:8300" - DefaultLANSerfPort = 8301 - DefaultWANSerfPort = 8302 + DefaultRPCAddr = "0.0.0.0:8300" + DefaultRaftAddr = "0.0.0.0:8301" + DefaultLANSerfPort = 8302 + DefaultWANSerfPort = 8303 ) // Config is used to configure the server @@ -31,6 +32,10 @@ type Config struct { // RaftConfig is the configuration used for Raft in the local DC RaftConfig *raft.Config + // RPCAddr is the RPC address used by Consul. This should be reachable + // by the WAN and LAN + RPCAddr string + // SerfLocalConfig is the configuration for the local serf SerfLocalConfig *serf.Config @@ -53,6 +58,7 @@ func DefaultConfig() *Config { Datacenter: "dc1", NodeName: hostname, RaftBindAddr: DefaultRaftAddr, + RPCAddr: DefaultRPCAddr, RaftConfig: raft.DefaultConfig(), SerfLocalConfig: serf.DefaultConfig(), SerfRemoteConfig: serf.DefaultConfig(), diff --git a/consul/server.go b/consul/server.go index a4b6463cdd..a0943d7e77 100644 --- a/consul/server.go +++ b/consul/server.go @@ -4,7 +4,10 @@ import ( "fmt" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "github.com/ugorji/go/codec" "log" + "net" + "net/rpc" "os" "path/filepath" "sync" @@ -43,6 +46,14 @@ type Server struct { raftStore *raft.SQLiteStore raftTransport *raft.NetworkTransport + // rpcClients is used to track active clients + rpcClients map[net.Conn]struct{} + rpcClientLock sync.Mutex + + // rpcListener is used to listen for incoming connections + rpcListener net.Listener + rpcServer *rpc.Server + // serfLAN is the Serf cluster maintained inside the DC // which contains all the DC nodes serfLAN *serf.Serf @@ -78,6 +89,8 @@ func NewServer(config *Config) (*Server, error) { eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), logger: logger, + rpcClients: make(map[net.Conn]struct{}), + rpcServer: rpc.NewServer(), shutdownCh: make(chan struct{}), } @@ -182,6 +195,17 @@ func (s *Server) setupRaft() error { return nil } +// setupRPC is used to setup the RPC listener +func (s *Server) setupRPC() error { + list, err := net.Listen("tcp", s.config.RPCAddr) + if err != nil { + return err + } + s.rpcListener = list + go s.listen() + return nil +} + // Shutdown is used to shutdown the server func (s *Server) Shutdown() error { s.logger.Printf("[INFO] Shutting down Consul server") @@ -213,6 +237,19 @@ func (s *Server) Shutdown() error { s.raftStore = nil s.raftTransport = nil } + + if s.rpcListener != nil { + s.rpcListener.Close() + s.rpcListener = nil + } + + // Close all the RPC connections + s.rpcClientLock.Lock() + for conn := range s.rpcClients { + conn.Close() + } + s.rpcClientLock.Unlock() + return nil } @@ -239,3 +276,43 @@ func (s *Server) wanEventHandler() { } } } + +// listen is used to listen for incoming RPC connections +func (s *Server) listen() { + for { + // Accept a connection + conn, err := s.rpcListener.Accept() + if err != nil { + if s.shutdown { + return + } + s.logger.Printf("[ERR] Failed to accept RPC conn: %v", err) + continue + } + + // Track this client + s.rpcClientLock.Lock() + s.rpcClients[conn] = struct{}{} + s.rpcClientLock.Unlock() + + go s.handleConn(conn) + } +} + +// handleConn is used to service a single RPC connection +func (s *Server) handleConn(conn net.Conn) { + defer func() { + conn.Close() + s.rpcClientLock.Lock() + delete(s.rpcClients, conn) + s.rpcClientLock.Unlock() + }() + + rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{}) + for !s.shutdown { + if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { + s.logger.Printf("[ERR] RPC error: %v (%v)", err, conn) + return + } + } +}