Expose ServerManager.ResetRebalanceTimer

Move the rebalance timer from ServerManager.Start's stack to struct ServerManager.  This makes it possible to shuffle during tests without actually waiting >120s.
This commit is contained in:
Sean Chittenden 2016-03-26 23:41:01 -07:00
parent d275cac1a3
commit efc1113406
1 changed files with 17 additions and 8 deletions

View File

@ -76,6 +76,9 @@ type ServerManager struct {
serverConfigValue atomic.Value serverConfigValue atomic.Value
serverConfigLock sync.Mutex serverConfigLock sync.Mutex
// rebalanceTimer controls the duration of the rebalance interval
rebalanceTimer *time.Timer
// shutdownCh is a copy of the channel in consul.Client // shutdownCh is a copy of the channel in consul.Client
shutdownCh chan struct{} shutdownCh chan struct{}
@ -405,10 +408,8 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
} }
} }
// refreshServerRebalanceTimer is only called once the rebalanceTimer // refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires.
// expires. Historically this was an expensive routine and is intended to be func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration {
// run in isolation in a dedicated, non-concurrent task.
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration {
serverCfg := sm.getServerConfig() serverCfg := sm.getServerConfig()
numConsulServers := len(serverCfg.servers) numConsulServers := len(serverCfg.servers)
// Limit this connection's life based on the size (and health) of the // Limit this connection's life based on the size (and health) of the
@ -420,10 +421,18 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Dur
numLANMembers := sm.clusterInfo.NumNodes() numLANMembers := sm.clusterInfo.NumNodes()
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
timer.Reset(connRebalanceTimeout) sm.rebalanceTimer.Reset(connRebalanceTimeout)
return connRebalanceTimeout return connRebalanceTimeout
} }
// ResetRebalanceTimer resets the rebalance timer. This method primarily
// exists for testing and should not be used directly.
func (sm *ServerManager) ResetRebalanceTimer() {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
sm.rebalanceTimer.Reset(clientRPCMinReuseDuration)
}
// Start is used to start and manage the task of automatically shuffling and // Start is used to start and manage the task of automatically shuffling and
// rebalancing the list of consul servers. This maintenance only happens // rebalancing the list of consul servers. This maintenance only happens
// periodically based on the expiration of the timer. Failed servers are // periodically based on the expiration of the timer. Failed servers are
@ -431,14 +440,14 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Dur
// the list. The order of the server list must be shuffled periodically to // the list. The order of the server list must be shuffled periodically to
// distribute load across all known and available consul servers. // distribute load across all known and available consul servers.
func (sm *ServerManager) Start() { func (sm *ServerManager) Start() {
var rebalanceTimer *time.Timer = time.NewTimer(clientRPCMinReuseDuration) sm.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
for { for {
select { select {
case <-rebalanceTimer.C: case <-sm.rebalanceTimer.C:
sm.logger.Printf("[INFO] server manager: Rebalancing server connections") sm.logger.Printf("[INFO] server manager: Rebalancing server connections")
sm.RebalanceServers() sm.RebalanceServers()
sm.refreshServerRebalanceTimer(rebalanceTimer) sm.refreshServerRebalanceTimer()
case <-sm.shutdownCh: case <-sm.shutdownCh:
sm.logger.Printf("[INFO] server manager: shutting down") sm.logger.Printf("[INFO] server manager: shutting down")