From da872fee6367fec990e2fe4eafc3a6318bfbb1fa Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 25 Feb 2016 08:05:15 -0800 Subject: [PATCH] Test ServerManager.refreshServerRebalanceTimer Change the signature so it returns a value so that this can be tested externally with mock data. See the sample table in TestServerManagerInternal_refreshServerRebalanceTimer() for the rate at which it will back off. This function is mostly used to not cripple large clusters in the event of a partition. --- consul/server_manager/server_manager.go | 5 +- .../server_manager_internal_test.go | 72 ++++++++++++++++++- 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/consul/server_manager/server_manager.go b/consul/server_manager/server_manager.go index 892f1f560f..260148324d 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/server_manager/server_manager.go @@ -271,7 +271,7 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { // refreshServerRebalanceTimer is only called once the rebalanceTimer // expires. Historically this was an expensive routine and is intended to be // run in isolation in a dedicated, non-concurrent task. -func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) { +func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration { serverCfg := sm.getServerConfig() numConsulServers := len(serverCfg.servers) // Limit this connection's life based on the size (and health) of the @@ -280,12 +280,11 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) { // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers := sm.clusterInfo.NumNodes() connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) - sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout) timer.Reset(connRebalanceTimeout) + return connRebalanceTimeout } // saveServerConfig is a convenience method which hides the locking semantics diff --git a/consul/server_manager/server_manager_internal_test.go b/consul/server_manager/server_manager_internal_test.go index 74277c9228..6ea93c3e9d 100644 --- a/consul/server_manager/server_manager_internal_test.go +++ b/consul/server_manager/server_manager_internal_test.go @@ -2,8 +2,11 @@ package server_manager import ( "bytes" + "fmt" "log" + "os" "testing" + "time" "github.com/hashicorp/consul/consul/server_details" ) @@ -23,16 +26,17 @@ func GetBufferedLogger() *log.Logger { } type fauxSerf struct { + numNodes int } func (s *fauxSerf) NumNodes() int { - return 16384 + return s.numNodes } func testServerManager() (sm *ServerManager) { logger := GetBufferedLogger() shutdownCh := make(chan struct{}) - sm = New(logger, shutdownCh, &fauxSerf{}) + sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}) return sm } @@ -108,6 +112,10 @@ func TestServerManagerInternal_New(t *testing.T) { t.Fatalf("ServerManager nil") } + if sm.clusterInfo == nil { + t.Fatalf("ServerManager.clusterInfo nil") + } + if sm.logger == nil { t.Fatalf("ServerManager.logger nil") } @@ -117,7 +125,65 @@ func TestServerManagerInternal_New(t *testing.T) { } } -// func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) { +// func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) { +func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { + sm := testServerManager() + + timer := time.NewTimer(time.Duration(1 * time.Nanosecond)) + time.Sleep(1 * time.Millisecond) + sm.refreshServerRebalanceTimer(timer) + + logger := log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + + type clusterSizes struct { + numNodes int + numServers int + minRebalance time.Duration + } + clusters := []clusterSizes{ + {0, 3, 2 * time.Minute}, + {1, 0, 2 * time.Minute}, // partitioned cluster + {1, 3, 2 * time.Minute}, + {2, 3, 2 * time.Minute}, + {100, 0, 2 * time.Minute}, // partitioned + {100, 1, 2 * time.Minute}, // partitioned + {100, 3, 2 * time.Minute}, + {1024, 1, 2 * time.Minute}, // partitioned + {1024, 3, 2 * time.Minute}, // partitioned + {1024, 5, 2 * time.Minute}, + {16384, 1, 4 * time.Minute}, // partitioned + {16384, 2, 2 * time.Minute}, // partitioned + {16384, 3, 2 * time.Minute}, // partitioned + {16384, 5, 2 * time.Minute}, + {65535, 0, 2 * time.Minute}, // partitioned + {65535, 1, 8 * time.Minute}, // partitioned + {65535, 2, 3 * time.Minute}, // partitioned + {65535, 3, 5 * time.Minute}, // partitioned + {65535, 5, 3 * time.Minute}, // partitioned + {65535, 7, 2 * time.Minute}, + {1000000, 1, 4 * time.Hour}, // partitioned + {1000000, 2, 2 * time.Hour}, // partitioned + {1000000, 3, 80 * time.Minute}, // partitioned + {1000000, 5, 50 * time.Minute}, // partitioned + {1000000, 11, 20 * time.Minute}, // partitioned + {1000000, 19, 10 * time.Minute}, + } + + for _, s := range clusters { + sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}) + + for i := 0; i < s.numServers; i++ { + nodeName := fmt.Sprintf("s%02d", i) + sm.AddServer(&server_details.ServerDetails{Name: nodeName}) + } + + d := sm.refreshServerRebalanceTimer(timer) + if d < s.minRebalance { + t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) + } + } +} // func (sm *ServerManager) saveServerConfig(sc serverConfig) { func TestServerManagerInternal_saveServerConfig(t *testing.T) {