diff --git a/consul/client.go b/consul/client.go index a5a4e5e11d..bf0b67ec63 100644 --- a/consul/client.go +++ b/consul/client.go @@ -118,7 +118,7 @@ func NewClient(config *Config) (*Client, error) { shutdownCh: make(chan struct{}), } - c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh) + c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh, c.serf) // Start consulServers maintenance go c.serverMgr.StartServerManager() diff --git a/consul/client_test.go b/consul/client_test.go index 59403dab84..2dec18d4c4 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -83,6 +83,12 @@ func TestClient_JoinLAN(t *testing.T) { if _, err := c1.JoinLAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } + numServers := c1.serverMgr.GetNumServers() + testutil.WaitForResult(func() (bool, error) { + return numServers == 1, nil + }, func(err error) { + t.Fatalf("expected consul server: %d", numServers) + }) // Check the members testutil.WaitForResult(func() (bool, error) { @@ -93,9 +99,10 @@ func TestClient_JoinLAN(t *testing.T) { t.Fatalf("bad len") }) + numServers = c1.serverMgr.GetNumServers() // Check we have a new consul testutil.WaitForResult(func() (bool, error) { - return c1.serverMgr.GetNumServers() == 1, nil + return numServers == 1, nil }, func(err error) { t.Fatalf("expected consul server") }) diff --git a/consul/server_manager/server_manager.go b/consul/server_manager/server_manager.go index cbdec6b5b4..b287f8189e 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/server_manager/server_manager.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/serf/serf" ) type consulServerEventTypes int @@ -23,6 +24,11 @@ const ( // connection load across servers consulServersRebalance + // consulServersRefreshRebalanceDuration is used to signal when we + // should reset the rebalance duration because the server list has + // changed and we don't need to proactively change our connection + consulServersRefreshRebalanceDuration + // consulServersRPCError is used to signal when a server has either // timed out or returned an error and we would like to have the // server manager find a new preferredServer. @@ -47,6 +53,11 @@ const ( // queries are sent over an established connection to a single server clientRPCMinReuseDuration = 120 * time.Second + // initialRebalanceTimeoutHours is the initial value for the + // rebalanceTimer. This value is discarded immediately after the + // client becomes aware of the first server. + initialRebalanceTimeoutHours = 24 + // Limit the number of new connections a server receives per second // for connection rebalancing. This limit caps the load caused by // continual rebalancing efforts when a cluster is in equilibrium. A @@ -61,6 +72,14 @@ const ( // will take ~26min for all servers to rebalance. A 10K cluster in // the same scenario will take ~2.6min to rebalance. newRebalanceConnsPerSecPerServer = 64 + + // maxConsulServerManagerEvents is the size of the consulServersCh + // buffer. + maxConsulServerManagerEvents = 16 + + // defaultClusterSize is the assumed cluster size if no serf cluster + // is available. + defaultClusterSize = 1024 ) // serverCfg is the thread-safe configuration structure that is used to @@ -71,9 +90,6 @@ const ( type serverConfig struct { // servers tracks the locally known servers servers []*server_details.ServerDetails - - // Timer used to control rebalancing of servers - rebalanceTimer *time.Timer } type ServerManager struct { @@ -86,11 +102,20 @@ type ServerManager struct { // maintenance of the list of consulServers consulServersCh chan consulServerEventTypes + // refreshRebalanceDurationCh is used to signal that a refresh should + // occur + refreshRebalanceDurationCh chan bool + // shutdownCh is a copy of the channel in consul.Client shutdownCh chan struct{} // logger uses the provided LogOutput logger *log.Logger + + // serf is used to estimate the approximate number of nodes in a + // cluster and limit the rate at which it rebalances server + // connections + serf *serf.Serf } // AddServer takes out an internal write lock and adds a new server. If the @@ -150,8 +175,8 @@ func (sm *ServerManager) CycleFailedServers() { } } - serverCfg.resetRebalanceTimer(sm) sm.saveServerConfig(serverCfg) + sm.requestRefreshRebalanceDuration() } // cycleServers returns a new list of servers that has dequeued the first @@ -211,16 +236,18 @@ func (sm *ServerManager) getServerConfig() serverConfig { // NewServerManager is the only way to safely create a new ServerManager // struct. -func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { +func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, serf *serf.Serf) (sm *ServerManager) { // NOTE(sean@): Can't pass *consul.Client due to an import cycle sm = new(ServerManager) sm.logger = logger + sm.serf = serf sm.consulServersCh = make(chan consulServerEventTypes, maxConsulServerManagerEvents) sm.shutdownCh = shutdownCh + sm.refreshRebalanceDurationCh = make(chan bool, maxConsulServerManagerEvents) + sc := serverConfig{} sc.servers = make([]*server_details.ServerDetails, 0) - sc.rebalanceTimer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour)) sm.serverConfigValue.Store(sc) return sm } @@ -259,8 +286,8 @@ func (sm *ServerManager) RebalanceServers() { } serverCfg.servers = newServers - serverCfg.resetRebalanceTimer(sm) sm.saveServerConfig(serverCfg) + sm.requestRefreshRebalanceDuration() } // RemoveServer takes out an internal write lock and removes a server from @@ -291,27 +318,45 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { } } -// resetRebalanceTimer assumes: -// -// 1) the serverConfigLock is already held by the caller. -// 2) the caller will call serverConfigValue.Store() -func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) { - numConsulServers := len(sc.servers) +// requestRefreshRebalanceDuration sends a message to which causes a background +// thread to recalc the duration +func (sm *ServerManager) requestRefreshRebalanceDuration() { + sm.refreshRebalanceDurationCh <- true +} + +// requestServerRebalance sends a message to which causes a background thread +// to reshuffle the list of servers +func (sm *ServerManager) requestServerRebalance() { + sm.consulServersCh <- consulServersRebalance +} + +// refreshServerRebalanceTimer is called +func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) { + serverCfg := sm.getServerConfig() + numConsulServers := len(serverCfg.servers) // Limit this connection's life based on the size (and health) of the // cluster. Never rebalance a connection more frequently than // connReuseLowWatermarkDuration, and make sure we never exceed // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers := 16384 // Assume sufficiently large for now. FIXME: numLanMembers := len(c.LANMembers()) + + // Assume a moderate sized cluster unless we have an actual serf + // instance we can query. + numLANMembers := defaultClusterSize + if sm.serf != nil { + numLANMembers = sm.serf.NumNodes() + } connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout) - if sc.rebalanceTimer == nil { - sc.rebalanceTimer = time.NewTimer(connRebalanceTimeout) - } else { - sc.rebalanceTimer.Reset(connRebalanceTimeout) - } + timer.Reset(connRebalanceTimeout) +} + +// saveServerConfig is a convenience method which hides the locking semantics +// of atomic.Value from the caller. +func (sm *ServerManager) saveServerConfig(sc serverConfig) { + sm.serverConfigValue.Store(sc) } // StartServerManager is used to start and manage the task of automatically @@ -319,7 +364,9 @@ func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) { // happens either when a new server is added or when a duration has been // exceed. func (sm *ServerManager) StartServerManager() { - var rebalanceTimer *time.Timer + var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour)) + var rebalanceTaskDispatched int32 + func() { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() @@ -330,8 +377,6 @@ func (sm *ServerManager) StartServerManager() { } var serverCfg serverConfig serverCfg = serverCfgPtr.(serverConfig) - serverCfg.resetRebalanceTimer(sm) - rebalanceTimer = serverCfg.rebalanceTimer sm.saveServerConfig(serverCfg) }() @@ -340,13 +385,14 @@ func (sm *ServerManager) StartServerManager() { case e := <-sm.consulServersCh: switch e { case consulServersNodeJoin: - sm.logger.Printf("[INFO] consul: new node joined cluster") - sm.RebalanceServers() + sm.logger.Printf("[INFO] server manager: new node joined cluster") + // rebalance on new server + sm.requestServerRebalance() case consulServersRebalance: - sm.logger.Printf("[INFO] consul: rebalancing servers by request") + sm.logger.Printf("[INFO] server manager: rebalancing servers by request") sm.RebalanceServers() case consulServersRPCError: - sm.logger.Printf("[INFO] consul: need to find a new server to talk with") + sm.logger.Printf("[INFO] server manager: need to find a new server to talk with") sm.CycleFailedServers() // FIXME(sean@): wtb preemptive Status.Ping // of servers, ideally parallel fan-out of N @@ -360,7 +406,21 @@ func (sm *ServerManager) StartServerManager() { // their RPC time too low even though the // Ping did return successfully? default: - sm.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e) + sm.logger.Printf("[WARN] server manager: unhandled LAN Serf Event: %#v", e) + } + case <-sm.refreshRebalanceDurationCh: + chanLen := len(sm.refreshRebalanceDurationCh) + // Drain all messages from the rebalance channel + for i := 0; i < chanLen; i++ { + <-sm.refreshRebalanceDurationCh + } + // Only run one rebalance task at a time, but do + // allow for the channel to be drained + if atomic.CompareAndSwapInt32(&rebalanceTaskDispatched, 0, 1) { + go func() { + defer atomic.StoreInt32(&rebalanceTaskDispatched, 0) + sm.refreshServerRebalanceTimer(rebalanceTimer) + }() } case <-rebalanceTimer.C: sm.logger.Printf("[INFO] consul: server rebalance timeout") diff --git a/consul/server_manager/server_manager_internal_test.go b/consul/server_manager/server_manager_internal_test.go new file mode 100644 index 0000000000..614b56f7b0 --- /dev/null +++ b/consul/server_manager/server_manager_internal_test.go @@ -0,0 +1,156 @@ +package server_manager + +import ( + "bytes" + "log" + "testing" + + "github.com/hashicorp/consul/consul/server_details" +) + +var ( + localLogger *log.Logger + localLogBuffer *bytes.Buffer +) + +func init() { + localLogBuffer = new(bytes.Buffer) + localLogger = log.New(localLogBuffer, "", 0) +} + +func GetBufferedLogger() *log.Logger { + return localLogger +} + +func testServerManager() (sm *ServerManager) { + logger := GetBufferedLogger() + shutdownCh := make(chan struct{}) + sm = NewServerManager(logger, shutdownCh, nil) + return sm +} + +// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { +func TestServerManagerInternal_cycleServer(t *testing.T) { + sm := testServerManager() + sc := sm.getServerConfig() + + server0 := &server_details.ServerDetails{Name: "server1"} + server1 := &server_details.ServerDetails{Name: "server2"} + server2 := &server_details.ServerDetails{Name: "server3"} + sc.servers = append(sc.servers, server0, server1, server2) + sm.saveServerConfig(sc) + + sc = sm.getServerConfig() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server0 && + sc.servers[1] != server1 && + sc.servers[2] != server2 { + t.Fatalf("initial server ordering not correct") + } + + sc.servers = sc.cycleServer() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server1 && + sc.servers[1] != server2 && + sc.servers[2] != server0 { + t.Fatalf("server ordering after one cycle not correct") + } + + sc.servers = sc.cycleServer() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server2 && + sc.servers[1] != server0 && + sc.servers[2] != server1 { + t.Fatalf("server ordering after two cycles not correct") + } + + sc.servers = sc.cycleServer() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server0 && + sc.servers[1] != server1 && + sc.servers[2] != server2 { + t.Fatalf("server ordering after three cycles not correct") + } +} + +// func (sm *ServerManager) getServerConfig() serverConfig { +func TestServerManagerInternal_getServerConfig(t *testing.T) { + sm := testServerManager() + sc := sm.getServerConfig() + if sc.servers == nil { + t.Fatalf("serverConfig.servers nil") + } + + if len(sc.servers) != 0 { + t.Fatalf("serverConfig.servers length not zero") + } +} + +// func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { +func TestServerManagerInternal_NewServerManager(t *testing.T) { + sm := testServerManager() + if sm == nil { + t.Fatalf("ServerManager nil") + } + + if sm.logger == nil { + t.Fatalf("ServerManager.logger nil") + } + + if sm.consulServersCh == nil { + t.Fatalf("ServerManager.consulServersCh nil") + } + + if sm.shutdownCh == nil { + t.Fatalf("ServerManager.shutdownCh nil") + } +} + +// func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) { + +// func (sm *ServerManager) saveServerConfig(sc serverConfig) { +func TestServerManagerInternal_saveServerConfig(t *testing.T) { + sm := testServerManager() + + // Initial condition + func() { + sc := sm.getServerConfig() + if len(sc.servers) != 0 { + t.Fatalf("ServerManager.saveServerConfig failed to load init config") + } + + newServer := new(server_details.ServerDetails) + sc.servers = append(sc.servers, newServer) + sm.saveServerConfig(sc) + }() + + // Test that save works + func() { + sc1 := sm.getServerConfig() + t1NumServers := len(sc1.servers) + if t1NumServers != 1 { + t.Fatalf("ServerManager.saveServerConfig failed to save mutated config") + } + }() + + // Verify mutation w/o a save doesn't alter the original + func() { + newServer := new(server_details.ServerDetails) + sc := sm.getServerConfig() + sc.servers = append(sc.servers, newServer) + + sc_orig := sm.getServerConfig() + origNumServers := len(sc_orig.servers) + if origNumServers >= len(sc.servers) { + t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original") + } + }() +} diff --git a/consul/server_manager/server_manager_test.go b/consul/server_manager/server_manager_test.go new file mode 100644 index 0000000000..46efd0083b --- /dev/null +++ b/consul/server_manager/server_manager_test.go @@ -0,0 +1,75 @@ +package server_manager_test + +import ( + "bytes" + "log" + "testing" + + "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/server_manager" +) + +var ( + localLogger *log.Logger + localLogBuffer *bytes.Buffer +) + +func init() { + localLogBuffer = new(bytes.Buffer) + localLogger = log.New(localLogBuffer, "", 0) +} + +func GetBufferedLogger() *log.Logger { + return localLogger +} + +func makeMockServerManager() (sm *server_manager.ServerManager) { + logger, shutdownCh := mockServerManager() + sm = server_manager.NewServerManager(logger, shutdownCh, nil) + return sm +} + +func mockServerManager() (logger *log.Logger, shutdownCh chan struct{}) { + logger = GetBufferedLogger() + shutdownCh = make(chan struct{}) + return logger, shutdownCh +} + +// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { + +// func (sm *ServerManager) CycleFailedServers() { + +// func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetails) { + +// func (sm *ServerManager) GetNumServers() (numServers int) { +func TestServerManager_GetNumServers(t *testing.T) { + sm := makeMockServerManager() + var num int + num = sm.GetNumServers() + if num != 0 { + t.Fatalf("Expected zero servers to start") + } + + s := &server_details.ServerDetails{} + sm.AddServer(s) + num = sm.GetNumServers() + if num != 1 { + t.Fatalf("Expected one server after AddServer") + } +} + +// func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { +func TestServerManager_NewServerManager(t *testing.T) { + sm := makeMockServerManager() + if sm == nil { + t.Fatalf("ServerManager nil") + } +} + +// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { + +// func (sm *ServerManager) RebalanceServers() { + +// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { + +// func (sm *ServerManager) StartServerManager() {