consul: Basic RPC framework

This commit is contained in:
Armon Dadgar 2013-12-06 16:35:13 -08:00
parent b8bc9d2027
commit 310eb2f506
2 changed files with 86 additions and 3 deletions

View File

@ -9,9 +9,10 @@ import (
)
const (
DefaultRaftAddr = "0.0.0.0:8300"
DefaultLANSerfPort = 8301
DefaultWANSerfPort = 8302
DefaultRPCAddr = "0.0.0.0:8300"
DefaultRaftAddr = "0.0.0.0:8301"
DefaultLANSerfPort = 8302
DefaultWANSerfPort = 8303
)
// Config is used to configure the server
@ -31,6 +32,10 @@ type Config struct {
// RaftConfig is the configuration used for Raft in the local DC
RaftConfig *raft.Config
// RPCAddr is the RPC address used by Consul. This should be reachable
// by the WAN and LAN
RPCAddr string
// SerfLocalConfig is the configuration for the local serf
SerfLocalConfig *serf.Config
@ -53,6 +58,7 @@ func DefaultConfig() *Config {
Datacenter: "dc1",
NodeName: hostname,
RaftBindAddr: DefaultRaftAddr,
RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(),
SerfLocalConfig: serf.DefaultConfig(),
SerfRemoteConfig: serf.DefaultConfig(),

View File

@ -4,7 +4,10 @@ import (
"fmt"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/ugorji/go/codec"
"log"
"net"
"net/rpc"
"os"
"path/filepath"
"sync"
@ -43,6 +46,14 @@ type Server struct {
raftStore *raft.SQLiteStore
raftTransport *raft.NetworkTransport
// rpcClients is used to track active clients
rpcClients map[net.Conn]struct{}
rpcClientLock sync.Mutex
// rpcListener is used to listen for incoming connections
rpcListener net.Listener
rpcServer *rpc.Server
// serfLAN is the Serf cluster maintained inside the DC
// which contains all the DC nodes
serfLAN *serf.Serf
@ -78,6 +89,8 @@ func NewServer(config *Config) (*Server, error) {
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
logger: logger,
rpcClients: make(map[net.Conn]struct{}),
rpcServer: rpc.NewServer(),
shutdownCh: make(chan struct{}),
}
@ -182,6 +195,17 @@ func (s *Server) setupRaft() error {
return nil
}
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC() error {
list, err := net.Listen("tcp", s.config.RPCAddr)
if err != nil {
return err
}
s.rpcListener = list
go s.listen()
return nil
}
// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Printf("[INFO] Shutting down Consul server")
@ -213,6 +237,19 @@ func (s *Server) Shutdown() error {
s.raftStore = nil
s.raftTransport = nil
}
if s.rpcListener != nil {
s.rpcListener.Close()
s.rpcListener = nil
}
// Close all the RPC connections
s.rpcClientLock.Lock()
for conn := range s.rpcClients {
conn.Close()
}
s.rpcClientLock.Unlock()
return nil
}
@ -239,3 +276,43 @@ func (s *Server) wanEventHandler() {
}
}
}
// listen is used to listen for incoming RPC connections
func (s *Server) listen() {
for {
// Accept a connection
conn, err := s.rpcListener.Accept()
if err != nil {
if s.shutdown {
return
}
s.logger.Printf("[ERR] Failed to accept RPC conn: %v", err)
continue
}
// Track this client
s.rpcClientLock.Lock()
s.rpcClients[conn] = struct{}{}
s.rpcClientLock.Unlock()
go s.handleConn(conn)
}
}
// handleConn is used to service a single RPC connection
func (s *Server) handleConn(conn net.Conn) {
defer func() {
conn.Close()
s.rpcClientLock.Lock()
delete(s.rpcClients, conn)
s.rpcClientLock.Unlock()
}()
rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{})
for !s.shutdown {
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
s.logger.Printf("[ERR] RPC error: %v (%v)", err, conn)
return
}
}
}