mirror of https://github.com/status-im/consul.git
consul: sharing the RPC layer between Consul/Raft
This commit is contained in:
parent
fba2990e48
commit
e780255dd4
|
@ -0,0 +1,95 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RaftLayer implements the raft.StreamLayer interface,
|
||||
// so that we can use a single RPC layer for Raft and Consul
|
||||
type RaftLayer struct {
|
||||
// Addr is the listener address to return
|
||||
addr net.Addr
|
||||
|
||||
// connCh is used to accept connections
|
||||
connCh chan net.Conn
|
||||
|
||||
// ConnPool is used to make outbound connections
|
||||
connPool *ConnPool
|
||||
|
||||
// Tracks if we are closed
|
||||
closed bool
|
||||
closeCh chan struct{}
|
||||
closeLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewRaftLayer is used to initialize a new RaftLayer which can
|
||||
// be used as a StreamLayer for Raft
|
||||
func NewRaftLayer(addr net.Addr, pool *ConnPool) *RaftLayer {
|
||||
layer := &RaftLayer{
|
||||
addr: addr,
|
||||
connCh: make(chan net.Conn),
|
||||
connPool: pool,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
return layer
|
||||
}
|
||||
|
||||
// Handoff is used to hand off a connection to the
|
||||
// RaftLayer. This allows it to be Accept()'ed
|
||||
func (l *RaftLayer) Handoff(c net.Conn) error {
|
||||
select {
|
||||
case l.connCh <- c:
|
||||
return nil
|
||||
case <-l.closeCh:
|
||||
return fmt.Errorf("Raft RPC layer closed")
|
||||
}
|
||||
}
|
||||
|
||||
// Accept is used to return connection which are
|
||||
// dialed to be used with the Raft layer
|
||||
func (l *RaftLayer) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case conn := <-l.connCh:
|
||||
return conn, nil
|
||||
case <-l.closeCh:
|
||||
return nil, fmt.Errorf("Raft RPC layer closed")
|
||||
}
|
||||
}
|
||||
|
||||
// Close is used to stop listening for Raft connections
|
||||
func (l *RaftLayer) Close() error {
|
||||
l.closeLock.Lock()
|
||||
defer l.closeLock.Unlock()
|
||||
|
||||
if !l.closed {
|
||||
l.closed = true
|
||||
close(l.closeCh)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addr is used to return the address of the listener
|
||||
func (l *RaftLayer) Addr() net.Addr {
|
||||
return l.addr
|
||||
}
|
||||
|
||||
// Dial is used to create a new outgoing connection
|
||||
func (l *RaftLayer) Dial(address string, timeout time.Duration) (net.Conn, error) {
|
||||
// Get a net.Addr
|
||||
addr, err := net.ResolveTCPAddr("tcp", address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Use the conn pool
|
||||
conn, err := l.connPool.Acquire(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Discard the Conn wrapper
|
||||
return conn.conn, nil
|
||||
}
|
|
@ -5,6 +5,13 @@ import (
|
|||
"net"
|
||||
)
|
||||
|
||||
type RPCType byte
|
||||
|
||||
const (
|
||||
rpcConsul RPCType = iota
|
||||
rpcRaft
|
||||
)
|
||||
|
||||
// listen is used to listen for incoming RPC connections
|
||||
func (s *Server) listen() {
|
||||
for {
|
||||
|
@ -27,8 +34,34 @@ func (s *Server) listen() {
|
|||
}
|
||||
}
|
||||
|
||||
// handleConn is used to service a single RPC connection
|
||||
// handleConn is used to determine if this is a Raft or
|
||||
// Consul type RPC connection and invoke the correct handler
|
||||
func (s *Server) handleConn(conn net.Conn) {
|
||||
// Read a single byte
|
||||
buf := make([]byte, 1)
|
||||
if _, err := conn.Read(buf); err != nil {
|
||||
s.logger.Printf("[ERR] Failed to read byte: %v", err)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Switch on the byte
|
||||
switch RPCType(buf[0]) {
|
||||
case rpcConsul:
|
||||
s.handleConsulConn(conn)
|
||||
|
||||
case rpcRaft:
|
||||
s.raftLayer.Handoff(conn)
|
||||
|
||||
default:
|
||||
s.logger.Printf("[ERR] Unrecognized RPC byte: %v", buf[0])
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// handleConsulConn is used to service a single Consul RPC connection
|
||||
func (s *Server) handleConsulConn(conn net.Conn) {
|
||||
defer func() {
|
||||
conn.Close()
|
||||
s.rpcClientLock.Lock()
|
||||
|
|
|
@ -44,9 +44,9 @@ type Server struct {
|
|||
|
||||
// The raft instance is used among Consul nodes within the
|
||||
// DC to protect operations that require strong consistency
|
||||
raft *raft.Raft
|
||||
raftStore *raft.SQLiteStore
|
||||
raftTransport *raft.NetworkTransport
|
||||
raft *raft.Raft
|
||||
raftLayer *RaftLayer
|
||||
raftStore *raft.SQLiteStore
|
||||
|
||||
// rpcClients is used to track active clients
|
||||
rpcClients map[net.Conn]struct{}
|
||||
|
@ -88,7 +88,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
// Create server
|
||||
s := &Server{
|
||||
config: config,
|
||||
connPool: NewPool(3),
|
||||
connPool: NewPool(5),
|
||||
eventChLAN: make(chan serf.Event, 256),
|
||||
eventChWAN: make(chan serf.Event, 256),
|
||||
logger: logger,
|
||||
|
@ -97,6 +97,12 @@ func NewServer(config *Config) (*Server, error) {
|
|||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Initialize the RPC layer
|
||||
if err := s.setupRPC(); err != nil {
|
||||
s.Shutdown()
|
||||
return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
|
||||
}
|
||||
|
||||
// Start the Serf listeners to prevent a deadlock
|
||||
go s.lanEventHandler()
|
||||
go s.wanEventHandler()
|
||||
|
@ -162,6 +168,7 @@ func (s *Server) setupRaft() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.raftStore = store
|
||||
|
||||
// Create the snapshot store
|
||||
snapshots, err := raft.NewFileSnapshotStore(path, 3)
|
||||
|
@ -171,11 +178,7 @@ func (s *Server) setupRaft() error {
|
|||
}
|
||||
|
||||
// Create a transport layer
|
||||
trans, err := raft.NewTCPTransport(s.config.RaftBindAddr, 3, 10*time.Second)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
return err
|
||||
}
|
||||
trans := raft.NewNetworkTransport(s.raftLayer, 3, 10*time.Second)
|
||||
|
||||
// Setup the peer store
|
||||
peers := raft.NewJSONPeers(path, trans)
|
||||
|
@ -184,17 +187,13 @@ func (s *Server) setupRaft() error {
|
|||
s.fsm = &consulFSM{server: s}
|
||||
|
||||
// Setup the Raft store
|
||||
raft, err := raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots,
|
||||
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots,
|
||||
peers, trans)
|
||||
if err != nil {
|
||||
store.Close()
|
||||
trans.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
s.raft = raft
|
||||
s.raftStore = store
|
||||
s.raftTransport = trans
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -205,6 +204,7 @@ func (s *Server) setupRPC() error {
|
|||
return err
|
||||
}
|
||||
s.rpcListener = list
|
||||
s.raftLayer = NewRaftLayer(s.rpcListener.Addr(), s.connPool)
|
||||
go s.listen()
|
||||
return nil
|
||||
}
|
||||
|
@ -234,11 +234,11 @@ func (s *Server) Shutdown() error {
|
|||
|
||||
if s.raft != nil {
|
||||
s.raft.Shutdown()
|
||||
s.raftLayer.Close()
|
||||
s.raftStore.Close()
|
||||
s.raftTransport.Close()
|
||||
s.raft = nil
|
||||
s.raftLayer = nil
|
||||
s.raftStore = nil
|
||||
s.raftTransport = nil
|
||||
}
|
||||
|
||||
if s.rpcListener != nil {
|
||||
|
@ -288,7 +288,7 @@ func (s *Server) Leave() error {
|
|||
|
||||
// Request that we are removed
|
||||
// TODO: Properly forward to leader
|
||||
future := s.raft.RemovePeer(s.raftTransport.LocalAddr())
|
||||
future := s.raft.RemovePeer(s.rpcListener.Addr())
|
||||
|
||||
// Wait for the future
|
||||
ch := make(chan error, 1)
|
||||
|
|
Loading…
Reference in New Issue