diff --git a/consul/server_manager/server_manager.go b/consul/server_manager/server_manager.go index 819acd73ff..c3c9051d8f 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/server_manager/server_manager.go @@ -9,7 +9,6 @@ import ( "github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/lib" - "github.com/hashicorp/serf/serf" ) type consulServerEventTypes int @@ -61,6 +60,10 @@ const ( defaultClusterSize = 1024 ) +type ConsulClusterInfo interface { + NumNodes() int +} + // serverCfg is the thread-safe configuration structure that is used to // maintain the list of consul servers in Client. // @@ -94,7 +97,7 @@ type ServerManager struct { // serf is used to estimate the approximate number of nodes in a // cluster and limit the rate at which it rebalances server // connections - serf *serf.Serf + clusterInfo ConsulClusterInfo } // AddServer takes out an internal write lock and adds a new server. If the @@ -179,11 +182,11 @@ func (sm *ServerManager) getServerConfig() serverConfig { // NewServerManager is the only way to safely create a new ServerManager // struct. -func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, serf *serf.Serf) (sm *ServerManager) { +func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, cci ConsulClusterInfo) (sm *ServerManager) { // NOTE(sean@): Can't pass *consul.Client due to an import cycle sm = new(ServerManager) sm.logger = logger - sm.serf = serf + sm.clusterInfo = cci sm.consulServersCh = make(chan consulServerEventTypes, maxConsulServerManagerEvents) sm.shutdownCh = shutdownCh @@ -289,12 +292,7 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) { clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - // Assume a moderate sized cluster unless we have an actual serf - // instance we can query. - numLANMembers := defaultClusterSize - if sm.serf != nil { - numLANMembers = sm.serf.NumNodes() - } + numLANMembers := sm.clusterInfo.NumNodes() connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout) diff --git a/consul/server_manager/server_manager_internal_test.go b/consul/server_manager/server_manager_internal_test.go index 614b56f7b0..75be1efd33 100644 --- a/consul/server_manager/server_manager_internal_test.go +++ b/consul/server_manager/server_manager_internal_test.go @@ -22,10 +22,17 @@ func GetBufferedLogger() *log.Logger { return localLogger } +type fauxSerf struct { +} + +func (s *fauxSerf) NumNodes() int { + return 16384 +} + func testServerManager() (sm *ServerManager) { logger := GetBufferedLogger() shutdownCh := make(chan struct{}) - sm = NewServerManager(logger, shutdownCh, nil) + sm = NewServerManager(logger, shutdownCh, &fauxSerf{}) return sm }