diff --git a/consul/client.go b/consul/client.go index 0b536a8d52..82a4bd7fac 100644 --- a/consul/client.go +++ b/consul/client.go @@ -119,12 +119,7 @@ func NewClient(config *Config) (*Client, error) { shutdownCh: make(chan struct{}), } - c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf) - - // Start maintenance task for serverMgr - go c.serverMgr.Start() - - // Start the Serf listeners to prevent a deadlock + // Start lan event handlers before lan Serf setup to prevent deadlock go c.lanEventHandler() // Initialize the lan Serf @@ -134,6 +129,11 @@ func NewClient(config *Config) (*Client, error) { c.Shutdown() return nil, fmt.Errorf("Failed to start lan serf: %v", err) } + + // Start maintenance task for server_manager + c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf, c.connPool) + go c.serverMgr.Start() + return c, nil } diff --git a/consul/client_test.go b/consul/client_test.go index 5fe6194e35..124a7fb811 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -236,6 +236,73 @@ func TestClient_RPC_Pool(t *testing.T) { wg.Wait() } +func TestClient_RPC_ConsulServerPing(t *testing.T) { + var servers []*Server + var serverDirs []string + const numServers = 5 + + for n := numServers; n > 0; n-- { + var bootstrap bool + if n == numServers { + bootstrap = true + } + dir, s := testServerDCBootstrap(t, "dc1", bootstrap) + defer os.RemoveAll(dir) + defer s.Shutdown() + + servers = append(servers, s) + serverDirs = append(serverDirs, dir) + } + + const numClients = 1 + clientDir, c := testClient(t) + defer os.RemoveAll(clientDir) + defer c.Shutdown() + + // Join all servers. + for _, s := range servers { + addr := fmt.Sprintf("127.0.0.1:%d", + s.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Sleep to allow Serf to sync, shuffle, and let the shuffle complete + time.Sleep(1 * time.Second) + c.serverMgr.ResetRebalanceTimer() + time.Sleep(1 * time.Second) + + if len(c.LANMembers()) != numServers+numClients { + t.Errorf("bad len: %d", len(c.LANMembers())) + } + for _, s := range servers { + if len(s.LANMembers()) != numServers+numClients { + t.Errorf("bad len: %d", len(s.LANMembers())) + } + } + + // Ping each server in the list + var pingCount int + for range servers { + time.Sleep(1 * time.Second) + s := c.serverMgr.FindServer() + ok, err := c.connPool.PingConsulServer(s) + if !ok { + t.Errorf("Unable to ping server %v: %s", s.String(), err) + } + pingCount += 1 + + // Artificially fail the server in order to rotate the server + // list + c.serverMgr.NotifyFailedServer(s) + } + + if pingCount != numServers { + t.Errorf("bad len: %d/%d", pingCount, numServers) + } +} + func TestClient_RPC_TLS(t *testing.T) { dir1, conf1 := testServerConfig(t, "a.testco.internal") conf1.VerifyIncoming = true diff --git a/consul/pool.go b/consul/pool.go index 4abecbf4bf..cdfd2129df 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" @@ -405,6 +406,30 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg return nil } +// PingConsulServer sends a Status.Ping message to the specified server and +// returns true if healthy, false if an error occurred +func (p *ConnPool) PingConsulServer(s *server_details.ServerDetails) (bool, error) { + // Get a usable client + conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version) + if err != nil { + return false, err + } + + // Make the RPC call + var out struct{} + err = msgpackrpc.CallWithCodec(sc.codec, "Status.Ping", struct{}{}, &out) + if err != nil { + sc.Close() + p.releaseConn(conn) + return false, err + } + + // Done with the connection + conn.returnClient(sc) + p.releaseConn(conn) + return true, nil +} + // Reap is used to close conns open over maxTime func (p *ConnPool) reap() { for { diff --git a/consul/server_details/server_details.go b/consul/server_details/server_details.go index e9386e8ca3..4f3dc794a6 100644 --- a/consul/server_details/server_details.go +++ b/consul/server_details/server_details.go @@ -8,6 +8,16 @@ import ( "github.com/hashicorp/serf/serf" ) +// Key is used in maps and for equality tests. A key is based on endpoints. +type Key struct { + name string +} + +// Equal compares two Key objects +func (k *Key) Equal(x *Key) bool { + return k.name == x.name +} + // ServerDetails is used to return details of a consul server type ServerDetails struct { Name string @@ -19,8 +29,22 @@ type ServerDetails struct { Addr net.Addr } +// Key returns the corresponding Key +func (s *ServerDetails) Key() *Key { + return &Key{ + name: s.Name, + } +} + +// String returns a string representation of ServerDetails func (s *ServerDetails) String() string { - return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter) + var addrStr, networkStr string + if s.Addr != nil { + addrStr = s.Addr.String() + networkStr = s.Addr.Network() + } + + return fmt.Sprintf("%s (Addr: %s/%s) (DC: %s)", s.Name, networkStr, addrStr, s.Datacenter) } // IsConsulServer returns true if a serf member is a consul server. Returns a diff --git a/consul/server_details/server_details_internal_test.go b/consul/server_details/server_details_internal_test.go new file mode 100644 index 0000000000..ee09d8f57f --- /dev/null +++ b/consul/server_details/server_details_internal_test.go @@ -0,0 +1,84 @@ +package server_details + +import ( + "testing" +) + +func TestServerDetails_Key_Equal(t *testing.T) { + tests := []struct { + name string + k1 *Key + k2 *Key + equal bool + }{ + { + name: "Key equality", + k1: &Key{ + name: "s1", + }, + k2: &Key{ + name: "s1", + }, + equal: true, + }, + { + name: "Key Inequality", + k1: &Key{ + name: "s1", + }, + k2: &Key{ + name: "s2", + }, + equal: false, + }, + } + + for _, test := range tests { + if test.k1.Equal(test.k2) != test.equal { + t.Errorf("Expected a %v result from test %s", test.equal, test.name) + } + + // Test Key to make sure it actually works as a key + m := make(map[Key]bool) + m[*test.k1] = true + if _, found := m[*test.k2]; found != test.equal { + t.Errorf("Expected a %v result from map test %s", test.equal, test.name) + } + } +} + +func TestServerDetails_Key(t *testing.T) { + tests := []struct { + name string + sd *ServerDetails + k *Key + equal bool + }{ + { + name: "Key equality", + sd: &ServerDetails{ + Name: "s1", + }, + k: &Key{ + name: "s1", + }, + equal: true, + }, + { + name: "Key inequality", + sd: &ServerDetails{ + Name: "s1", + }, + k: &Key{ + name: "s2", + }, + equal: false, + }, + } + + for _, test := range tests { + if test.k.Equal(test.sd.Key()) != test.equal { + t.Errorf("Expected a %v result from test %s", test.equal, test.name) + } + } +} diff --git a/consul/server_details/server_details_test.go b/consul/server_details/server_details_test.go index 09b87ec165..b16cbf5a0a 100644 --- a/consul/server_details/server_details_test.go +++ b/consul/server_details/server_details_test.go @@ -8,6 +8,48 @@ import ( "github.com/hashicorp/serf/serf" ) +func TestServerDetails_Key_params(t *testing.T) { + ipv4a := net.ParseIP("127.0.0.1") + ipv4b := net.ParseIP("1.2.3.4") + + tests := []struct { + name string + sd1 *server_details.ServerDetails + sd2 *server_details.ServerDetails + equal bool + }{ + { + name: "Addr inequality", + sd1: &server_details.ServerDetails{ + Name: "s1", + Datacenter: "dc1", + Port: 8300, + Addr: &net.IPAddr{IP: ipv4a}, + }, + sd2: &server_details.ServerDetails{ + Name: "s1", + Datacenter: "dc1", + Port: 8300, + Addr: &net.IPAddr{IP: ipv4b}, + }, + equal: true, + }, + } + + for _, test := range tests { + if test.sd1.Key().Equal(test.sd2.Key()) != test.equal { + t.Errorf("Expected a %v result from test %s", test.equal, test.name) + } + + // Test Key to make sure it actually works as a key + m := make(map[server_details.Key]bool) + m[*test.sd1.Key()] = true + if _, found := m[*test.sd2.Key()]; found != test.equal { + t.Errorf("Expected a %v result from map test %s", test.equal, test.name) + } + } +} + func TestIsConsulServer(t *testing.T) { m := serf.Member{ Name: "foo", diff --git a/consul/server_manager/server_manager.go b/consul/server_manager/server_manager.go index 504c5ce2bf..24844b0f91 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/server_manager/server_manager.go @@ -47,11 +47,19 @@ const ( newRebalanceConnsPerSecPerServer = 64 ) +// ConsulClusterInfo is an interface wrapper around serf and prevents a +// cyclic import dependency type ConsulClusterInfo interface { NumNodes() int } -// serverCfg is the thread-safe configuration struct used to maintain the +// Pinger is an interface wrapping client.ConnPool to prevent a +// cyclic import dependency +type Pinger interface { + PingConsulServer(server *server_details.ServerDetails) (bool, error) +} + +// serverConfig is the thread-safe configuration struct used to maintain the // list of Consul servers in ServerManager. // // NOTE(sean@): We are explicitly relying on the fact that serverConfig will @@ -68,6 +76,9 @@ type ServerManager struct { serverConfigValue atomic.Value serverConfigLock sync.Mutex + // rebalanceTimer controls the duration of the rebalance interval + rebalanceTimer *time.Timer + // shutdownCh is a copy of the channel in consul.Client shutdownCh chan struct{} @@ -78,8 +89,13 @@ type ServerManager struct { // connections. ConsulClusterInfo is an interface that wraps serf. clusterInfo ConsulClusterInfo - // notifyFailedServersBarrier is acts as a barrier to prevent - // queueing behind serverConfigLog and acts as a TryLock(). + // connPoolPinger is used to test the health of a server in the + // connection pool. Pinger is an interface that wraps + // client.ConnPool. + connPoolPinger Pinger + + // notifyFailedBarrier is acts as a barrier to prevent queuing behind + // serverConfigLog and acts as a TryLock(). notifyFailedBarrier int32 } @@ -91,20 +107,20 @@ type ServerManager struct { func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() - serverCfg := sm.getServerConfig() + sc := sm.getServerConfig() // Check if this server is known found := false - for idx, existing := range serverCfg.servers { + for idx, existing := range sc.servers { if existing.Name == server.Name { - newServers := make([]*server_details.ServerDetails, len(serverCfg.servers)) - copy(newServers, serverCfg.servers) + newServers := make([]*server_details.ServerDetails, len(sc.servers)) + copy(newServers, sc.servers) // Overwrite the existing server details in order to // possibly update metadata (e.g. server version) newServers[idx] = server - serverCfg.servers = newServers + sc.servers = newServers found = true break } @@ -112,18 +128,23 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { // Add to the list if not known if !found { - newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1) - copy(newServers, serverCfg.servers) + newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1) + copy(newServers, sc.servers) newServers = append(newServers, server) - serverCfg.servers = newServers + sc.servers = newServers } - sm.saveServerConfig(serverCfg) + sm.saveServerConfig(sc) } // cycleServers returns a new list of servers that has dequeued the first // server and enqueued it at the end of the list. cycleServers assumes the -// caller is holding the serverConfigLock. +// caller is holding the serverConfigLock. cycleServer does not test or ping +// the next server inline. cycleServer may be called when the environment +// has just entered an unhealthy situation and blocking on a server test is +// less desirable than just returning the next server in the firing line. If +// the next server fails, it will fail fast enough and cycleServer will be +// called again. func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { numServers := len(sc.servers) if numServers < 2 { @@ -133,9 +154,30 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) newServers := make([]*server_details.ServerDetails, 0, numServers) newServers = append(newServers, sc.servers[1:]...) newServers = append(newServers, sc.servers[0]) + return newServers } +// removeServerByKey performs an inline removal of the first matching server +func (sc *serverConfig) removeServerByKey(targetKey *server_details.Key) { + for i, s := range sc.servers { + if targetKey.Equal(s.Key()) { + copy(sc.servers[i:], sc.servers[i+1:]) + sc.servers[len(sc.servers)-1] = nil + sc.servers = sc.servers[:len(sc.servers)-1] + return + } + } +} + +// shuffleServers shuffles the server list in place +func (sc *serverConfig) shuffleServers() { + for i := len(sc.servers) - 1; i > 0; i-- { + j := rand.Int31n(int32(i + 1)) + sc.servers[i], sc.servers[j] = sc.servers[j], sc.servers[i] + } +} + // FindServer takes out an internal "read lock" and searches through the list // of servers to find a "healthy" server. If the server is actually // unhealthy, we rely on Serf to detect this and remove the node from the @@ -143,17 +185,17 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) // during an RPC call, it is rotated to the end of the list. If there are no // servers available, return nil. func (sm *ServerManager) FindServer() *server_details.ServerDetails { - serverCfg := sm.getServerConfig() - numServers := len(serverCfg.servers) + sc := sm.getServerConfig() + numServers := len(sc.servers) if numServers == 0 { - sm.logger.Printf("[WARN] consul: No servers available") + sm.logger.Printf("[WARN] server manager: No servers available") return nil } else { // Return whatever is at the front of the list because it is // assumed to be the oldest in the server list (unless - // hypothetically - the server list was rotated right after a // server was added). - return serverCfg.servers[0] + return sc.servers[0] } } @@ -170,11 +212,12 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) { } // New is the only way to safely create a new ServerManager struct. -func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) { - // NOTE(sean@): Can't pass *consul.Client due to an import cycle +func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (sm *ServerManager) { sm = new(ServerManager) sm.logger = logger - sm.clusterInfo = clusterInfo + sm.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle + sm.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle + sm.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) sm.shutdownCh = shutdownCh sc := serverConfig{} @@ -186,7 +229,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulCluster // NotifyFailedServer marks the passed in server as "failed" by rotating it // to the end of the server list. func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { - serverCfg := sm.getServerConfig() + sc := sm.getServerConfig() // If the server being failed is not the first server on the list, // this is a noop. If, however, the server is failed and first on @@ -194,7 +237,7 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // the server to the end of the list. // Only rotate the server list when there is more than one server - if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server && + if len(sc.servers) > 1 && sc.servers[0] == server && // Use atomic.CAS to emulate a TryLock(). atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0) @@ -203,11 +246,11 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // server to the end. sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() - serverCfg = sm.getServerConfig() + sc = sm.getServerConfig() - if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server { - serverCfg.servers = serverCfg.cycleServer() - sm.saveServerConfig(serverCfg) + if len(sc.servers) > 1 && sc.servers[0] == server { + sc.servers = sc.cycleServer() + sm.saveServerConfig(sc) } } } @@ -215,36 +258,144 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // NumServers takes out an internal "read lock" and returns the number of // servers. numServers includes both healthy and unhealthy servers. func (sm *ServerManager) NumServers() (numServers int) { - serverCfg := sm.getServerConfig() - numServers = len(serverCfg.servers) + sc := sm.getServerConfig() + numServers = len(sc.servers) return numServers } -// RebalanceServers takes out an internal write lock and shuffles the list of -// servers on this agent. This allows for a redistribution of work across -// consul servers and provides a guarantee that the order of the server list -// isn't related to the age at which the node was added to the cluster. -// Elsewhere we rely on the position in the server list as a hint regarding -// the stability of a server relative to its position in the server list. -// Servers at or near the front of the list are more stable than servers near -// the end of the list. Unhealthy servers are removed when serf notices the -// server has been deregistered. +// RebalanceServers shuffles the list of servers on this agent. The server +// at the front of the list is selected for the next RPC. RPC calls that +// fail for a particular server are rotated to the end of the list. This +// method reshuffles the list periodically in order to redistribute work +// across all known consul servers (i.e. guarantee that the order of servers +// in the server list isn't positively correlated with the age of a server in +// the consul cluster). Periodically shuffling the server list prevents +// long-lived clients from fixating on long-lived servers. +// +// Unhealthy servers are removed when serf notices the server has been +// deregistered. Before the newly shuffled server list is saved, the new +// remote endpoint is tested to ensure its responsive. func (sm *ServerManager) RebalanceServers() { + // Obtain a copy of the current serverConfig + sc := sm.getServerConfig() + + // Early abort if there is no value to shuffling + if len(sc.servers) < 2 { + return + } + + sc.shuffleServers() + + // Iterate through the shuffled server list to find a healthy server. + // Don't iterate on the list directly, this loop mutates the server + // list. + var foundHealthyServer bool + for i := 0; i < len(sc.servers); i++ { + // Always test the first server. Failed servers are cycled + // while Serf detects the node has failed. + selectedServer := sc.servers[0] + + ok, err := sm.connPoolPinger.PingConsulServer(selectedServer) + if ok { + foundHealthyServer = true + break + } + sm.logger.Printf(`[DEBUG] server manager: pinging server "%s" failed: %s`, selectedServer.String(), err) + + sc.cycleServer() + } + + // If no healthy servers were found, sleep and wait for Serf to make + // the world a happy place again. + if !foundHealthyServer { + sm.logger.Printf("[DEBUG] server manager: No healthy servers during rebalance, aborting") + return + } + + // Verify that all servers are present + if sm.reconcileServerList(&sc) { + sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String()) + } else { + // reconcileServerList failed because Serf removed the server + // that was at the front of the list that had successfully + // been Ping'ed. Between the Ping and reconcile, a Serf + // event had shown up removing the node. Prevent an RPC + // timeout by retrying RebalanceServers(). + // + // Instead of doing any heroics, "freeze in place" and + // continue to use the existing connection until the next + // rebalance occurs. + } + + return +} + +// reconcileServerList returns true when the first server in serverConfig +// exists in the receiver's serverConfig. If true, the merged serverConfig +// is stored as the receiver's serverConfig. Returns false if the first +// server does not exist in the list (i.e. was removed by Serf during a +// PingConsulServer() call. Newly added servers are appended to the list and +// other missing servers are removed from the list. +func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() - serverCfg := sm.getServerConfig() - newServers := make([]*server_details.ServerDetails, len(serverCfg.servers)) - copy(newServers, serverCfg.servers) + // newServerCfg is a serverConfig that has been kept up to date with + // Serf node join and node leave events. + newServerCfg := sm.getServerConfig() - // Shuffle the server list - for i := len(serverCfg.servers) - 1; i > 0; i-- { - j := rand.Int31n(int32(i + 1)) - newServers[i], newServers[j] = newServers[j], newServers[i] + // If Serf has removed all nodes, or there is no selected server + // (zero nodes in sc), abort early. + if len(newServerCfg.servers) == 0 || len(sc.servers) == 0 { + return false } - serverCfg.servers = newServers - sm.saveServerConfig(serverCfg) + type targetServer struct { + server *server_details.ServerDetails + + // 'b' == both + // 'o' == original + // 'n' == new + state byte + } + mergedList := make(map[server_details.Key]*targetServer, len(sc.servers)) + for _, s := range sc.servers { + mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} + } + for _, s := range newServerCfg.servers { + k := s.Key() + _, found := mergedList[*k] + if found { + mergedList[*k].state = 'b' + } else { + mergedList[*k] = &targetServer{server: s, state: 'n'} + } + } + + // Ensure the selected server has not been removed by Serf + selectedServerKey := sc.servers[0].Key() + if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { + return false + } + + // Append any new servers and remove any old servers + for k, v := range mergedList { + switch v.state { + case 'b': + // Do nothing, server exists in both + case 'o': + // Server has been removed + sc.removeServerByKey(&k) + case 'n': + // Server added + sc.servers = append(sc.servers, v.server) + default: + panic("unknown merge list state") + } + } + + sm.saveServerConfig(*sc) + return true } // RemoveServer takes out an internal write lock and removes a server from @@ -252,28 +403,26 @@ func (sm *ServerManager) RebalanceServers() { func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() - serverCfg := sm.getServerConfig() + sc := sm.getServerConfig() // Remove the server if known - for i, _ := range serverCfg.servers { - if serverCfg.servers[i].Name == server.Name { - newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1) - newServers = append(newServers, serverCfg.servers[:i]...) - newServers = append(newServers, serverCfg.servers[i+1:]...) - serverCfg.servers = newServers + for i, _ := range sc.servers { + if sc.servers[i].Name == server.Name { + newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1) + newServers = append(newServers, sc.servers[:i]...) + newServers = append(newServers, sc.servers[i+1:]...) + sc.servers = newServers - sm.saveServerConfig(serverCfg) + sm.saveServerConfig(sc) return } } } -// refreshServerRebalanceTimer is only called once the rebalanceTimer -// expires. Historically this was an expensive routine and is intended to be -// run in isolation in a dedicated, non-concurrent task. -func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration { - serverCfg := sm.getServerConfig() - numConsulServers := len(serverCfg.servers) +// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires. +func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration { + sc := sm.getServerConfig() + numConsulServers := len(sc.servers) // Limit this connection's life based on the size (and health) of the // cluster. Never rebalance a connection more frequently than // connReuseLowWatermarkDuration, and make sure we never exceed @@ -283,10 +432,18 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Dur numLANMembers := sm.clusterInfo.NumNodes() connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) - timer.Reset(connRebalanceTimeout) + sm.rebalanceTimer.Reset(connRebalanceTimeout) return connRebalanceTimeout } +// ResetRebalanceTimer resets the rebalance timer. This method primarily +// exists for testing and should not be used directly. +func (sm *ServerManager) ResetRebalanceTimer() { + sm.serverConfigLock.Lock() + defer sm.serverConfigLock.Unlock() + sm.rebalanceTimer.Reset(clientRPCMinReuseDuration) +} + // Start is used to start and manage the task of automatically shuffling and // rebalancing the list of consul servers. This maintenance only happens // periodically based on the expiration of the timer. Failed servers are @@ -294,14 +451,11 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Dur // the list. The order of the server list must be shuffled periodically to // distribute load across all known and available consul servers. func (sm *ServerManager) Start() { - var rebalanceTimer *time.Timer = time.NewTimer(clientRPCMinReuseDuration) - for { select { - case <-rebalanceTimer.C: - sm.logger.Printf("[INFO] server manager: Rebalancing server connections") + case <-sm.rebalanceTimer.C: sm.RebalanceServers() - sm.refreshServerRebalanceTimer(rebalanceTimer) + sm.refreshServerRebalanceTimer() case <-sm.shutdownCh: sm.logger.Printf("[INFO] server manager: shutting down") diff --git a/consul/server_manager/server_manager_internal_test.go b/consul/server_manager/server_manager_internal_test.go index 6ea93c3e9d..4a5a6d43ab 100644 --- a/consul/server_manager/server_manager_internal_test.go +++ b/consul/server_manager/server_manager_internal_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "log" + "math/rand" "os" "testing" "time" @@ -25,6 +26,20 @@ func GetBufferedLogger() *log.Logger { return localLogger } +type fauxConnPool struct { + // failPct between 0.0 and 1.0 == pct of time a Ping should fail + failPct float64 +} + +func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) { + var success bool + successProb := rand.Float64() + if successProb > cp.failPct { + success = true + } + return success, nil +} + type fauxSerf struct { numNodes int } @@ -36,7 +51,15 @@ func (s *fauxSerf) NumNodes() int { func testServerManager() (sm *ServerManager) { logger := GetBufferedLogger() shutdownCh := make(chan struct{}) - sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}) + sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) + return sm +} + +func testServerManagerFailProb(failPct float64) (sm *ServerManager) { + logger := GetBufferedLogger() + logger = log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + sm = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) return sm } @@ -125,17 +148,119 @@ func TestServerManagerInternal_New(t *testing.T) { } } -// func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) { +// func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { +func TestServerManagerInternal_reconcileServerList(t *testing.T) { + tests := []int{0, 1, 2, 3, 4, 5, 10, 100} + for _, n := range tests { + ok, err := test_reconcileServerList(n) + if !ok { + t.Errorf("Expected %d to pass: %v", n, err) + } + } +} + +func test_reconcileServerList(maxServers int) (bool, error) { + // Build a server list, reconcile, verify the missing servers are + // missing, the added have been added, and the original server is + // present. + const failPct = 0.5 + sm := testServerManagerFailProb(failPct) + + var failedServers, healthyServers []*server_details.ServerDetails + for i := 0; i < maxServers; i++ { + nodeName := fmt.Sprintf("s%02d", i) + + node := &server_details.ServerDetails{Name: nodeName} + // Add 66% of servers to ServerManager + if rand.Float64() > 0.33 { + sm.AddServer(node) + + // Of healthy servers, (ab)use connPoolPinger to + // failPct of the servers for the reconcile. This + // allows for the selected server to no longer be + // healthy for the reconcile below. + if ok, _ := sm.connPoolPinger.PingConsulServer(node); ok { + // Will still be present + healthyServers = append(healthyServers, node) + } else { + // Will be missing + failedServers = append(failedServers, node) + } + } else { + // Will be added from the call to reconcile + healthyServers = append(healthyServers, node) + } + } + + // Randomize ServerManager's server list + sm.RebalanceServers() + selectedServer := sm.FindServer() + + var selectedServerFailed bool + for _, s := range failedServers { + if selectedServer.Key().Equal(s.Key()) { + selectedServerFailed = true + break + } + } + + // Update ServerManager's server list to be "healthy" based on Serf. + // Reconcile this with origServers, which is shuffled and has a live + // connection, but possibly out of date. + origServers := sm.getServerConfig() + sm.saveServerConfig(serverConfig{servers: healthyServers}) + + // This should always succeed with non-zero server lists + if !selectedServerFailed && !sm.reconcileServerList(&origServers) && + len(sm.getServerConfig().servers) != 0 && + len(origServers.servers) != 0 { + // If the random gods are unfavorable and we end up with zero + // length lists, expect things to fail and retry the test. + return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d", + selectedServerFailed, + len(sm.getServerConfig().servers), + len(origServers.servers)) + } + + // If we have zero-length server lists, test succeeded in degenerate + // case. + if len(sm.getServerConfig().servers) == 0 && + len(origServers.servers) == 0 { + // Failed as expected w/ zero length list + return true, nil + } + + resultingServerMap := make(map[server_details.Key]bool) + for _, s := range sm.getServerConfig().servers { + resultingServerMap[*s.Key()] = true + } + + // Test to make sure no failed servers are in the ServerManager's + // list. Error if there are any failedServers in sc.servers + for _, s := range failedServers { + _, ok := resultingServerMap[*s.Key()] + if ok { + return false, fmt.Errorf("Found failed server %v in merged list %v", s, resultingServerMap) + } + } + + // Test to make sure all healthy servers are in the healthy list. + if len(healthyServers) != len(sm.getServerConfig().servers) { + return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers)) + } + + // Test to make sure all healthy servers are in the resultingServerMap list. + for _, s := range healthyServers { + _, ok := resultingServerMap[*s.Key()] + if !ok { + return false, fmt.Errorf("Server %v missing from healthy map after merged lists", s) + } + } + return true, nil +} + +// func (sc *serverConfig) refreshServerRebalanceTimer() { func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { - sm := testServerManager() - - timer := time.NewTimer(time.Duration(1 * time.Nanosecond)) - time.Sleep(1 * time.Millisecond) - sm.refreshServerRebalanceTimer(timer) - - logger := log.New(os.Stderr, "", log.LstdFlags) - shutdownCh := make(chan struct{}) - type clusterSizes struct { numNodes int numServers int @@ -170,17 +295,19 @@ func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { {1000000, 19, 10 * time.Minute}, } - for _, s := range clusters { - sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}) + logger := log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + for _, s := range clusters { + sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) for i := 0; i < s.numServers; i++ { nodeName := fmt.Sprintf("s%02d", i) sm.AddServer(&server_details.ServerDetails{Name: nodeName}) } - d := sm.refreshServerRebalanceTimer(timer) + d := sm.refreshServerRebalanceTimer() if d < s.minRebalance { - t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) + t.Errorf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) } } } diff --git a/consul/server_manager/server_manager_test.go b/consul/server_manager/server_manager_test.go index 8b292a2ddc..25673140a4 100644 --- a/consul/server_manager/server_manager_test.go +++ b/consul/server_manager/server_manager_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "log" + "math/rand" "os" "strings" "testing" @@ -26,6 +27,20 @@ func GetBufferedLogger() *log.Logger { return localLogger } +type fauxConnPool struct { + // failPct between 0.0 and 1.0 == pct of time a Ping should fail + failPct float64 +} + +func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) { + var success bool + successProb := rand.Float64() + if successProb > cp.failPct { + success = true + } + return success, nil +} + type fauxSerf struct { } @@ -37,7 +52,15 @@ func testServerManager() (sm *server_manager.ServerManager) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm = server_manager.New(logger, shutdownCh, &fauxSerf{}) + sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) + return sm +} + +func testServerManagerFailProb(failPct float64) (sm *server_manager.ServerManager) { + logger := GetBufferedLogger() + logger = log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) return sm } @@ -124,7 +147,7 @@ func TestServerManager_New(t *testing.T) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm := server_manager.New(logger, shutdownCh, &fauxSerf{}) + sm := server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) if sm == nil { t.Fatalf("ServerManager nil") } @@ -202,7 +225,8 @@ func TestServerManager_NumServers(t *testing.T) { // func (sm *ServerManager) RebalanceServers() { func TestServerManager_RebalanceServers(t *testing.T) { - sm := testServerManager() + const failPct = 0.5 + sm := testServerManagerFailProb(failPct) const maxServers = 100 const numShuffleTests = 100 const uniquePassRate = 0.5 @@ -265,6 +289,10 @@ func TestServerManager_RemoveServer(t *testing.T) { servers = append(servers, server) sm.AddServer(server) } + if sm.NumServers() != maxServers { + t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) + } + sm.RebalanceServers() if sm.NumServers() != maxServers {