Test ServerManager.refreshServerRebalanceTimer

Change the signature so it returns a value so that this can be tested externally with mock data.  See the sample table in TestServerManagerInternal_refreshServerRebalanceTimer() for the rate at which it will back off.  This function is mostly used to not cripple large clusters in the event of a partition.
This commit is contained in:
Sean Chittenden 2016-02-25 08:05:15 -08:00
parent a63d5ab963
commit da872fee63
2 changed files with 71 additions and 6 deletions

View File

@ -271,7 +271,7 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
// refreshServerRebalanceTimer is only called once the rebalanceTimer // refreshServerRebalanceTimer is only called once the rebalanceTimer
// expires. Historically this was an expensive routine and is intended to be // expires. Historically this was an expensive routine and is intended to be
// run in isolation in a dedicated, non-concurrent task. // run in isolation in a dedicated, non-concurrent task.
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) { func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration {
serverCfg := sm.getServerConfig() serverCfg := sm.getServerConfig()
numConsulServers := len(serverCfg.servers) numConsulServers := len(serverCfg.servers)
// Limit this connection's life based on the size (and health) of the // Limit this connection's life based on the size (and health) of the
@ -280,12 +280,11 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers. // clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers := sm.clusterInfo.NumNodes() numLANMembers := sm.clusterInfo.NumNodes()
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout)
timer.Reset(connRebalanceTimeout) timer.Reset(connRebalanceTimeout)
return connRebalanceTimeout
} }
// saveServerConfig is a convenience method which hides the locking semantics // saveServerConfig is a convenience method which hides the locking semantics

View File

@ -2,8 +2,11 @@ package server_manager
import ( import (
"bytes" "bytes"
"fmt"
"log" "log"
"os"
"testing" "testing"
"time"
"github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/server_details"
) )
@ -23,16 +26,17 @@ func GetBufferedLogger() *log.Logger {
} }
type fauxSerf struct { type fauxSerf struct {
numNodes int
} }
func (s *fauxSerf) NumNodes() int { func (s *fauxSerf) NumNodes() int {
return 16384 return s.numNodes
} }
func testServerManager() (sm *ServerManager) { func testServerManager() (sm *ServerManager) {
logger := GetBufferedLogger() logger := GetBufferedLogger()
shutdownCh := make(chan struct{}) shutdownCh := make(chan struct{})
sm = New(logger, shutdownCh, &fauxSerf{}) sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384})
return sm return sm
} }
@ -108,6 +112,10 @@ func TestServerManagerInternal_New(t *testing.T) {
t.Fatalf("ServerManager nil") t.Fatalf("ServerManager nil")
} }
if sm.clusterInfo == nil {
t.Fatalf("ServerManager.clusterInfo nil")
}
if sm.logger == nil { if sm.logger == nil {
t.Fatalf("ServerManager.logger nil") t.Fatalf("ServerManager.logger nil")
} }
@ -117,7 +125,65 @@ func TestServerManagerInternal_New(t *testing.T) {
} }
} }
// func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) { // func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) {
func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
sm := testServerManager()
timer := time.NewTimer(time.Duration(1 * time.Nanosecond))
time.Sleep(1 * time.Millisecond)
sm.refreshServerRebalanceTimer(timer)
logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
type clusterSizes struct {
numNodes int
numServers int
minRebalance time.Duration
}
clusters := []clusterSizes{
{0, 3, 2 * time.Minute},
{1, 0, 2 * time.Minute}, // partitioned cluster
{1, 3, 2 * time.Minute},
{2, 3, 2 * time.Minute},
{100, 0, 2 * time.Minute}, // partitioned
{100, 1, 2 * time.Minute}, // partitioned
{100, 3, 2 * time.Minute},
{1024, 1, 2 * time.Minute}, // partitioned
{1024, 3, 2 * time.Minute}, // partitioned
{1024, 5, 2 * time.Minute},
{16384, 1, 4 * time.Minute}, // partitioned
{16384, 2, 2 * time.Minute}, // partitioned
{16384, 3, 2 * time.Minute}, // partitioned
{16384, 5, 2 * time.Minute},
{65535, 0, 2 * time.Minute}, // partitioned
{65535, 1, 8 * time.Minute}, // partitioned
{65535, 2, 3 * time.Minute}, // partitioned
{65535, 3, 5 * time.Minute}, // partitioned
{65535, 5, 3 * time.Minute}, // partitioned
{65535, 7, 2 * time.Minute},
{1000000, 1, 4 * time.Hour}, // partitioned
{1000000, 2, 2 * time.Hour}, // partitioned
{1000000, 3, 80 * time.Minute}, // partitioned
{1000000, 5, 50 * time.Minute}, // partitioned
{1000000, 11, 20 * time.Minute}, // partitioned
{1000000, 19, 10 * time.Minute},
}
for _, s := range clusters {
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes})
for i := 0; i < s.numServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
}
d := sm.refreshServerRebalanceTimer(timer)
if d < s.minRebalance {
t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance)
}
}
}
// func (sm *ServerManager) saveServerConfig(sc serverConfig) { // func (sm *ServerManager) saveServerConfig(sc serverConfig) {
func TestServerManagerInternal_saveServerConfig(t *testing.T) { func TestServerManagerInternal_saveServerConfig(t *testing.T) {