Introduce asynchronous management of consul server lists

Instead of blocking the RPC call path and performing a potentially expensive calculation (including a call to `c.LANMembers()`), introduce a channel to request a rebalance.  Some events don't force a reshuffle, instead the extend the duration of the current rebalance window because the environment thrashed enough to redistribute a client's load.
This commit is contained in:
Sean Chittenden 2016-02-24 10:55:04 -08:00
parent bad6cb8897
commit 0c87463b7e
5 changed files with 327 additions and 29 deletions

View File

@ -118,7 +118,7 @@ func NewClient(config *Config) (*Client, error) {
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh) c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh, c.serf)
// Start consulServers maintenance // Start consulServers maintenance
go c.serverMgr.StartServerManager() go c.serverMgr.StartServerManager()

View File

@ -83,6 +83,12 @@ func TestClient_JoinLAN(t *testing.T) {
if _, err := c1.JoinLAN([]string{addr}); err != nil { if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
numServers := c1.serverMgr.GetNumServers()
testutil.WaitForResult(func() (bool, error) {
return numServers == 1, nil
}, func(err error) {
t.Fatalf("expected consul server: %d", numServers)
})
// Check the members // Check the members
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
@ -93,9 +99,10 @@ func TestClient_JoinLAN(t *testing.T) {
t.Fatalf("bad len") t.Fatalf("bad len")
}) })
numServers = c1.serverMgr.GetNumServers()
// Check we have a new consul // Check we have a new consul
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
return c1.serverMgr.GetNumServers() == 1, nil return numServers == 1, nil
}, func(err error) { }, func(err error) {
t.Fatalf("expected consul server") t.Fatalf("expected consul server")
}) })

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/serf"
) )
type consulServerEventTypes int type consulServerEventTypes int
@ -23,6 +24,11 @@ const (
// connection load across servers // connection load across servers
consulServersRebalance consulServersRebalance
// consulServersRefreshRebalanceDuration is used to signal when we
// should reset the rebalance duration because the server list has
// changed and we don't need to proactively change our connection
consulServersRefreshRebalanceDuration
// consulServersRPCError is used to signal when a server has either // consulServersRPCError is used to signal when a server has either
// timed out or returned an error and we would like to have the // timed out or returned an error and we would like to have the
// server manager find a new preferredServer. // server manager find a new preferredServer.
@ -47,6 +53,11 @@ const (
// queries are sent over an established connection to a single server // queries are sent over an established connection to a single server
clientRPCMinReuseDuration = 120 * time.Second clientRPCMinReuseDuration = 120 * time.Second
// initialRebalanceTimeoutHours is the initial value for the
// rebalanceTimer. This value is discarded immediately after the
// client becomes aware of the first server.
initialRebalanceTimeoutHours = 24
// Limit the number of new connections a server receives per second // Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by // for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A // continual rebalancing efforts when a cluster is in equilibrium. A
@ -61,6 +72,14 @@ const (
// will take ~26min for all servers to rebalance. A 10K cluster in // will take ~26min for all servers to rebalance. A 10K cluster in
// the same scenario will take ~2.6min to rebalance. // the same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64 newRebalanceConnsPerSecPerServer = 64
// maxConsulServerManagerEvents is the size of the consulServersCh
// buffer.
maxConsulServerManagerEvents = 16
// defaultClusterSize is the assumed cluster size if no serf cluster
// is available.
defaultClusterSize = 1024
) )
// serverCfg is the thread-safe configuration structure that is used to // serverCfg is the thread-safe configuration structure that is used to
@ -71,9 +90,6 @@ const (
type serverConfig struct { type serverConfig struct {
// servers tracks the locally known servers // servers tracks the locally known servers
servers []*server_details.ServerDetails servers []*server_details.ServerDetails
// Timer used to control rebalancing of servers
rebalanceTimer *time.Timer
} }
type ServerManager struct { type ServerManager struct {
@ -86,11 +102,20 @@ type ServerManager struct {
// maintenance of the list of consulServers // maintenance of the list of consulServers
consulServersCh chan consulServerEventTypes consulServersCh chan consulServerEventTypes
// refreshRebalanceDurationCh is used to signal that a refresh should
// occur
refreshRebalanceDurationCh chan bool
// shutdownCh is a copy of the channel in consul.Client // shutdownCh is a copy of the channel in consul.Client
shutdownCh chan struct{} shutdownCh chan struct{}
// logger uses the provided LogOutput // logger uses the provided LogOutput
logger *log.Logger logger *log.Logger
// serf is used to estimate the approximate number of nodes in a
// cluster and limit the rate at which it rebalances server
// connections
serf *serf.Serf
} }
// AddServer takes out an internal write lock and adds a new server. If the // AddServer takes out an internal write lock and adds a new server. If the
@ -150,8 +175,8 @@ func (sm *ServerManager) CycleFailedServers() {
} }
} }
serverCfg.resetRebalanceTimer(sm)
sm.saveServerConfig(serverCfg) sm.saveServerConfig(serverCfg)
sm.requestRefreshRebalanceDuration()
} }
// cycleServers returns a new list of servers that has dequeued the first // cycleServers returns a new list of servers that has dequeued the first
@ -211,16 +236,18 @@ func (sm *ServerManager) getServerConfig() serverConfig {
// NewServerManager is the only way to safely create a new ServerManager // NewServerManager is the only way to safely create a new ServerManager
// struct. // struct.
func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, serf *serf.Serf) (sm *ServerManager) {
// NOTE(sean@): Can't pass *consul.Client due to an import cycle // NOTE(sean@): Can't pass *consul.Client due to an import cycle
sm = new(ServerManager) sm = new(ServerManager)
sm.logger = logger sm.logger = logger
sm.serf = serf
sm.consulServersCh = make(chan consulServerEventTypes, maxConsulServerManagerEvents) sm.consulServersCh = make(chan consulServerEventTypes, maxConsulServerManagerEvents)
sm.shutdownCh = shutdownCh sm.shutdownCh = shutdownCh
sm.refreshRebalanceDurationCh = make(chan bool, maxConsulServerManagerEvents)
sc := serverConfig{} sc := serverConfig{}
sc.servers = make([]*server_details.ServerDetails, 0) sc.servers = make([]*server_details.ServerDetails, 0)
sc.rebalanceTimer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour))
sm.serverConfigValue.Store(sc) sm.serverConfigValue.Store(sc)
return sm return sm
} }
@ -259,8 +286,8 @@ func (sm *ServerManager) RebalanceServers() {
} }
serverCfg.servers = newServers serverCfg.servers = newServers
serverCfg.resetRebalanceTimer(sm)
sm.saveServerConfig(serverCfg) sm.saveServerConfig(serverCfg)
sm.requestRefreshRebalanceDuration()
} }
// RemoveServer takes out an internal write lock and removes a server from // RemoveServer takes out an internal write lock and removes a server from
@ -291,27 +318,45 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
} }
} }
// resetRebalanceTimer assumes: // requestRefreshRebalanceDuration sends a message to which causes a background
// // thread to recalc the duration
// 1) the serverConfigLock is already held by the caller. func (sm *ServerManager) requestRefreshRebalanceDuration() {
// 2) the caller will call serverConfigValue.Store() sm.refreshRebalanceDurationCh <- true
func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) { }
numConsulServers := len(sc.servers)
// requestServerRebalance sends a message to which causes a background thread
// to reshuffle the list of servers
func (sm *ServerManager) requestServerRebalance() {
sm.consulServersCh <- consulServersRebalance
}
// refreshServerRebalanceTimer is called
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
serverCfg := sm.getServerConfig()
numConsulServers := len(serverCfg.servers)
// Limit this connection's life based on the size (and health) of the // Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than // cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed // connReuseLowWatermarkDuration, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers. // clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers := 16384 // Assume sufficiently large for now. FIXME: numLanMembers := len(c.LANMembers())
// Assume a moderate sized cluster unless we have an actual serf
// instance we can query.
numLANMembers := defaultClusterSize
if sm.serf != nil {
numLANMembers = sm.serf.NumNodes()
}
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout) sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout)
if sc.rebalanceTimer == nil { timer.Reset(connRebalanceTimeout)
sc.rebalanceTimer = time.NewTimer(connRebalanceTimeout) }
} else {
sc.rebalanceTimer.Reset(connRebalanceTimeout) // saveServerConfig is a convenience method which hides the locking semantics
} // of atomic.Value from the caller.
func (sm *ServerManager) saveServerConfig(sc serverConfig) {
sm.serverConfigValue.Store(sc)
} }
// StartServerManager is used to start and manage the task of automatically // StartServerManager is used to start and manage the task of automatically
@ -319,7 +364,9 @@ func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) {
// happens either when a new server is added or when a duration has been // happens either when a new server is added or when a duration has been
// exceed. // exceed.
func (sm *ServerManager) StartServerManager() { func (sm *ServerManager) StartServerManager() {
var rebalanceTimer *time.Timer var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour))
var rebalanceTaskDispatched int32
func() { func() {
sm.serverConfigLock.Lock() sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock() defer sm.serverConfigLock.Unlock()
@ -330,8 +377,6 @@ func (sm *ServerManager) StartServerManager() {
} }
var serverCfg serverConfig var serverCfg serverConfig
serverCfg = serverCfgPtr.(serverConfig) serverCfg = serverCfgPtr.(serverConfig)
serverCfg.resetRebalanceTimer(sm)
rebalanceTimer = serverCfg.rebalanceTimer
sm.saveServerConfig(serverCfg) sm.saveServerConfig(serverCfg)
}() }()
@ -340,13 +385,14 @@ func (sm *ServerManager) StartServerManager() {
case e := <-sm.consulServersCh: case e := <-sm.consulServersCh:
switch e { switch e {
case consulServersNodeJoin: case consulServersNodeJoin:
sm.logger.Printf("[INFO] consul: new node joined cluster") sm.logger.Printf("[INFO] server manager: new node joined cluster")
sm.RebalanceServers() // rebalance on new server
sm.requestServerRebalance()
case consulServersRebalance: case consulServersRebalance:
sm.logger.Printf("[INFO] consul: rebalancing servers by request") sm.logger.Printf("[INFO] server manager: rebalancing servers by request")
sm.RebalanceServers() sm.RebalanceServers()
case consulServersRPCError: case consulServersRPCError:
sm.logger.Printf("[INFO] consul: need to find a new server to talk with") sm.logger.Printf("[INFO] server manager: need to find a new server to talk with")
sm.CycleFailedServers() sm.CycleFailedServers()
// FIXME(sean@): wtb preemptive Status.Ping // FIXME(sean@): wtb preemptive Status.Ping
// of servers, ideally parallel fan-out of N // of servers, ideally parallel fan-out of N
@ -360,7 +406,21 @@ func (sm *ServerManager) StartServerManager() {
// their RPC time too low even though the // their RPC time too low even though the
// Ping did return successfully? // Ping did return successfully?
default: default:
sm.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) sm.logger.Printf("[WARN] server manager: unhandled LAN Serf Event: %#v", e)
}
case <-sm.refreshRebalanceDurationCh:
chanLen := len(sm.refreshRebalanceDurationCh)
// Drain all messages from the rebalance channel
for i := 0; i < chanLen; i++ {
<-sm.refreshRebalanceDurationCh
}
// Only run one rebalance task at a time, but do
// allow for the channel to be drained
if atomic.CompareAndSwapInt32(&rebalanceTaskDispatched, 0, 1) {
go func() {
defer atomic.StoreInt32(&rebalanceTaskDispatched, 0)
sm.refreshServerRebalanceTimer(rebalanceTimer)
}()
} }
case <-rebalanceTimer.C: case <-rebalanceTimer.C:
sm.logger.Printf("[INFO] consul: server rebalance timeout") sm.logger.Printf("[INFO] consul: server rebalance timeout")

View File

@ -0,0 +1,156 @@
package server_manager
import (
"bytes"
"log"
"testing"
"github.com/hashicorp/consul/consul/server_details"
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
func testServerManager() (sm *ServerManager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
sm = NewServerManager(logger, shutdownCh, nil)
return sm
}
// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
func TestServerManagerInternal_cycleServer(t *testing.T) {
sm := testServerManager()
sc := sm.getServerConfig()
server0 := &server_details.ServerDetails{Name: "server1"}
server1 := &server_details.ServerDetails{Name: "server2"}
server2 := &server_details.ServerDetails{Name: "server3"}
sc.servers = append(sc.servers, server0, server1, server2)
sm.saveServerConfig(sc)
sc = sm.getServerConfig()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server0 &&
sc.servers[1] != server1 &&
sc.servers[2] != server2 {
t.Fatalf("initial server ordering not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server1 &&
sc.servers[1] != server2 &&
sc.servers[2] != server0 {
t.Fatalf("server ordering after one cycle not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server2 &&
sc.servers[1] != server0 &&
sc.servers[2] != server1 {
t.Fatalf("server ordering after two cycles not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server0 &&
sc.servers[1] != server1 &&
sc.servers[2] != server2 {
t.Fatalf("server ordering after three cycles not correct")
}
}
// func (sm *ServerManager) getServerConfig() serverConfig {
func TestServerManagerInternal_getServerConfig(t *testing.T) {
sm := testServerManager()
sc := sm.getServerConfig()
if sc.servers == nil {
t.Fatalf("serverConfig.servers nil")
}
if len(sc.servers) != 0 {
t.Fatalf("serverConfig.servers length not zero")
}
}
// func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
func TestServerManagerInternal_NewServerManager(t *testing.T) {
sm := testServerManager()
if sm == nil {
t.Fatalf("ServerManager nil")
}
if sm.logger == nil {
t.Fatalf("ServerManager.logger nil")
}
if sm.consulServersCh == nil {
t.Fatalf("ServerManager.consulServersCh nil")
}
if sm.shutdownCh == nil {
t.Fatalf("ServerManager.shutdownCh nil")
}
}
// func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) {
// func (sm *ServerManager) saveServerConfig(sc serverConfig) {
func TestServerManagerInternal_saveServerConfig(t *testing.T) {
sm := testServerManager()
// Initial condition
func() {
sc := sm.getServerConfig()
if len(sc.servers) != 0 {
t.Fatalf("ServerManager.saveServerConfig failed to load init config")
}
newServer := new(server_details.ServerDetails)
sc.servers = append(sc.servers, newServer)
sm.saveServerConfig(sc)
}()
// Test that save works
func() {
sc1 := sm.getServerConfig()
t1NumServers := len(sc1.servers)
if t1NumServers != 1 {
t.Fatalf("ServerManager.saveServerConfig failed to save mutated config")
}
}()
// Verify mutation w/o a save doesn't alter the original
func() {
newServer := new(server_details.ServerDetails)
sc := sm.getServerConfig()
sc.servers = append(sc.servers, newServer)
sc_orig := sm.getServerConfig()
origNumServers := len(sc_orig.servers)
if origNumServers >= len(sc.servers) {
t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original")
}
}()
}

View File

@ -0,0 +1,75 @@
package server_manager_test
import (
"bytes"
"log"
"testing"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/consul/server_manager"
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
func makeMockServerManager() (sm *server_manager.ServerManager) {
logger, shutdownCh := mockServerManager()
sm = server_manager.NewServerManager(logger, shutdownCh, nil)
return sm
}
func mockServerManager() (logger *log.Logger, shutdownCh chan struct{}) {
logger = GetBufferedLogger()
shutdownCh = make(chan struct{})
return logger, shutdownCh
}
// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
// func (sm *ServerManager) CycleFailedServers() {
// func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetails) {
// func (sm *ServerManager) GetNumServers() (numServers int) {
func TestServerManager_GetNumServers(t *testing.T) {
sm := makeMockServerManager()
var num int
num = sm.GetNumServers()
if num != 0 {
t.Fatalf("Expected zero servers to start")
}
s := &server_details.ServerDetails{}
sm.AddServer(s)
num = sm.GetNumServers()
if num != 1 {
t.Fatalf("Expected one server after AddServer")
}
}
// func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
func TestServerManager_NewServerManager(t *testing.T) {
sm := makeMockServerManager()
if sm == nil {
t.Fatalf("ServerManager nil")
}
}
// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
// func (sm *ServerManager) RebalanceServers() {
// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
// func (sm *ServerManager) StartServerManager() {