Proactively ping server before rotation

Before shuffling the server list, proactively ping the next server in the list to establish the connection and verify the remote endpoint is healthy.
This commit is contained in:
Sean Chittenden 2016-03-26 19:28:13 -07:00
parent 18270bbd04
commit ca5950a538
5 changed files with 181 additions and 25 deletions

View File

@ -119,7 +119,7 @@ func NewClient(config *Config) (*Client, error) {
shutdownCh: make(chan struct{}),
}
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf)
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf, c.connPool)
// Start maintenance task for serverMgr
go c.serverMgr.Start()

View File

@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/yamux"
@ -405,6 +406,30 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg
return nil
}
// PingConsulServer sends a Status.Ping message to the specified server and
// returns true if healthy, false if an error occurred
func (p *ConnPool) PingConsulServer(s *server_details.ServerDetails) bool {
// Get a usable client
conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version)
if err != nil {
return false
}
// Make the RPC call
var out struct{}
err = msgpackrpc.CallWithCodec(sc.codec, "Status.Ping", struct{}{}, &out)
if err != nil {
sc.Close()
p.releaseConn(conn)
return false
}
// Done with the connection
conn.returnClient(sc)
p.releaseConn(conn)
return true
}
// Reap is used to close conns open over maxTime
func (p *ConnPool) reap() {
for {

View File

@ -53,6 +53,12 @@ type ConsulClusterInfo interface {
NumNodes() int
}
// ConnPoolTester is an interface wrapping client.ConnPool to prevent a
// cyclic import dependency
type ConnPoolPinger interface {
PingConsulServer(server *server_details.ServerDetails) bool
}
// serverCfg is the thread-safe configuration struct used to maintain the
// list of Consul servers in ServerManager.
//
@ -80,6 +86,11 @@ type ServerManager struct {
// connections. ConsulClusterInfo is an interface that wraps serf.
clusterInfo ConsulClusterInfo
// connPoolPinger is used to test the health of a server in the
// connection pool. ConnPoolPinger is an interface that wraps
// client.ConnPool.
connPoolPinger ConnPoolPinger
// notifyFailedServersBarrier is acts as a barrier to prevent
// queueing behind serverConfigLog and acts as a TryLock().
notifyFailedBarrier int32
@ -142,10 +153,23 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
newServers = append(newServers, sc.servers[0])
// FIXME(sean@): Is it worth it to fire off a go routine and
// TestConsulServer?
// PingConsulServer?
return newServers
}
// removeServerByKey performs an inline removal of the first matching server
func (sc *serverConfig) removeServerByKey(targetKey *server_details.Key) {
for i, s := range sc.servers {
if targetKey.Equal(s.Key()) {
// Delete the target server
copy(sc.servers[i:], sc.servers[i+1:])
sc.servers[len(sc.servers)-1] = nil
sc.servers = sc.servers[:len(sc.servers)-1]
return
}
}
}
// shuffleServers shuffles the server list in place
func (sc *serverConfig) shuffleServers() {
newServers := make([]*server_details.ServerDetails, len(sc.servers))
@ -193,11 +217,11 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) {
}
// New is the only way to safely create a new ServerManager struct.
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
// NOTE(sean@): Can't pass *consul.Client due to an import cycle
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger ConnPoolPinger) (sm *ServerManager) {
sm = new(ServerManager)
sm.logger = logger
sm.clusterInfo = clusterInfo
sm.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
sm.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
sm.shutdownCh = shutdownCh
sc := serverConfig{}
@ -243,27 +267,120 @@ func (sm *ServerManager) NumServers() (numServers int) {
return numServers
}
// RebalanceServers takes out an internal write lock and shuffles the list of
// servers on this agent. This allows for a redistribution of work across
// consul servers and provides a guarantee that the order of the server list
// isn't related to the age at which the node was added to the cluster.
// Elsewhere we rely on the position in the server list as a hint regarding
// the stability of a server relative to its position in the server list.
// Servers at or near the front of the list are more stable than servers near
// the end of the list. Unhealthy servers are removed when serf notices the
// server has been deregistered.
serverCfg.servers = newServers
serverCfg.servers = newServers
// RebalanceServers shuffles the list of servers on this agent. The server
// at the front of the list is selected for the next RPC. RPC calls that
// fail for a particular server are rotated to the end of the list. This
// method reshuffles the list periodically in order to redistribute work
// across all known consul servers (i.e. guarantee that the order of servers
// in the server list isn't positively correlated with the age of a server in
// the consul cluster). Periodically shuffling the server list prevents
// long-lived clients from fixating on long-lived servers.
//
// Unhealthy servers are removed when serf notices the server has been
// deregistered. Before the newly shuffled server list is saved, the new
// remote endpoint is tested to ensure its responsive.
func (sm *ServerManager) RebalanceServers() {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
FAILED_SERVER_DURING_REBALANCE:
// Obtain a copy of the server config
serverCfg := sm.getServerConfig()
serverCfg.shuffleServers()
// Early abort if there is no value to shuffling
if len(serverCfg.servers) < 2 {
return
}
sm.saveServerConfig(serverCfg)
serverCfg.shuffleServers()
// Iterate through the shuffled server list to find a healthy server.
// Don't iterate on the list directly, this loop mutates server the
// list.
var foundHealthyServer bool
for n := len(serverCfg.servers); n > 0; n-- {
// Always test the first server. Failed servers are cycled
// while Serf detects the node has failed.
selectedServer := serverCfg.servers[0]
sm.logger.Printf("[INFO] server manager: Preemptively testing server %s before rebalance", selectedServer.String())
ok := sm.connPoolPinger.PingConsulServer(selectedServer)
if ok {
foundHealthyServer = true
break
}
serverCfg.cycleServer()
}
// If no healthy servers were found, sleep and wait for Serf to make
// the world a happy place again.
if !foundHealthyServer {
const backoffDuration = 1 * time.Second
sm.logger.Printf("[INFO] server manager: No servers available, sleeping for %v", backoffDuration)
// Sleep with no locks
time.Sleep(backoffDuration)
goto FAILED_SERVER_DURING_REBALANCE
}
// Verify that all servers are present. Use an anonymous func to
// ensure lock is released when exiting the critical section.
reconcileServerLists := func() bool {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
tmpServerCfg := sm.getServerConfig()
type targetServer struct {
server *server_details.ServerDetails
// 'b' == both
// 'o' == original
// 'n' == new
state byte
}
mergedList := make(map[server_details.Key]*targetServer)
for _, s := range serverCfg.servers {
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
}
for _, s := range tmpServerCfg.servers {
k := s.Key()
_, found := mergedList[*k]
if found {
mergedList[*k].state = 'b'
} else {
mergedList[*k] = &targetServer{server: s, state: 'n'}
}
}
// Ensure the selected server has not been removed by Serf
selectedServerKey := serverCfg.servers[0].Key()
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
return false
}
// Add any new servers and remove any old servers
for k, v := range mergedList {
switch v.state {
case 'b':
// Do nothing, server exists in both
case 'o':
// Server has been removed
serverCfg.removeServerByKey(&k)
case 'n':
// Server added
serverCfg.servers = append(serverCfg.servers, v.server)
default:
panic("not implemented")
}
}
sm.saveServerConfig(serverCfg)
return true
}
if !reconcileServerLists() {
goto FAILED_SERVER_DURING_REBALANCE
}
sm.logger.Printf("[INFO] server manager: Rebalancing server connections complete")
return
}
// RemoveServer takes out an internal write lock and removes a server from

View File

@ -25,6 +25,13 @@ func GetBufferedLogger() *log.Logger {
return localLogger
}
type fauxConnPool struct {
}
func (s *fauxConnPool) TestConsulServer(server *server_details.ServerDetails) bool {
return true
}
type fauxSerf struct {
numNodes int
}
@ -36,7 +43,7 @@ func (s *fauxSerf) NumNodes() int {
func testServerManager() (sm *ServerManager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384})
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{})
return sm
}
@ -171,7 +178,7 @@ func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
}
for _, s := range clusters {
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes})
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
for i := 0; i < s.numServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)

View File

@ -26,6 +26,13 @@ func GetBufferedLogger() *log.Logger {
return localLogger
}
type fauxConnPool struct {
}
func (s *fauxConnPool) TestConsulServer(server *server_details.ServerDetails) bool {
return true
}
type fauxSerf struct {
}
@ -37,7 +44,7 @@ func testServerManager() (sm *server_manager.ServerManager) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
sm = server_manager.New(logger, shutdownCh, &fauxSerf{})
sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
return sm
}
@ -124,7 +131,7 @@ func TestServerManager_New(t *testing.T) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
sm := server_manager.New(logger, shutdownCh, &fauxSerf{})
sm := server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
if sm == nil {
t.Fatalf("ServerManager nil")
}