mirror of https://github.com/status-im/consul.git
Merge pull request #1880 from hashicorp/f-pretest-server
Pre-connect new rebalanced server
This commit is contained in:
commit
5cffcd56c3
|
@ -119,12 +119,7 @@ func NewClient(config *Config) (*Client, error) {
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf)
|
// Start lan event handlers before lan Serf setup to prevent deadlock
|
||||||
|
|
||||||
// Start maintenance task for serverMgr
|
|
||||||
go c.serverMgr.Start()
|
|
||||||
|
|
||||||
// Start the Serf listeners to prevent a deadlock
|
|
||||||
go c.lanEventHandler()
|
go c.lanEventHandler()
|
||||||
|
|
||||||
// Initialize the lan Serf
|
// Initialize the lan Serf
|
||||||
|
@ -134,6 +129,11 @@ func NewClient(config *Config) (*Client, error) {
|
||||||
c.Shutdown()
|
c.Shutdown()
|
||||||
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start maintenance task for server_manager
|
||||||
|
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf, c.connPool)
|
||||||
|
go c.serverMgr.Start()
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -236,6 +236,73 @@ func TestClient_RPC_Pool(t *testing.T) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClient_RPC_ConsulServerPing(t *testing.T) {
|
||||||
|
var servers []*Server
|
||||||
|
var serverDirs []string
|
||||||
|
const numServers = 5
|
||||||
|
|
||||||
|
for n := numServers; n > 0; n-- {
|
||||||
|
var bootstrap bool
|
||||||
|
if n == numServers {
|
||||||
|
bootstrap = true
|
||||||
|
}
|
||||||
|
dir, s := testServerDCBootstrap(t, "dc1", bootstrap)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer s.Shutdown()
|
||||||
|
|
||||||
|
servers = append(servers, s)
|
||||||
|
serverDirs = append(serverDirs, dir)
|
||||||
|
}
|
||||||
|
|
||||||
|
const numClients = 1
|
||||||
|
clientDir, c := testClient(t)
|
||||||
|
defer os.RemoveAll(clientDir)
|
||||||
|
defer c.Shutdown()
|
||||||
|
|
||||||
|
// Join all servers.
|
||||||
|
for _, s := range servers {
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := c.JoinLAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep to allow Serf to sync, shuffle, and let the shuffle complete
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
c.serverMgr.ResetRebalanceTimer()
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
if len(c.LANMembers()) != numServers+numClients {
|
||||||
|
t.Errorf("bad len: %d", len(c.LANMembers()))
|
||||||
|
}
|
||||||
|
for _, s := range servers {
|
||||||
|
if len(s.LANMembers()) != numServers+numClients {
|
||||||
|
t.Errorf("bad len: %d", len(s.LANMembers()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping each server in the list
|
||||||
|
var pingCount int
|
||||||
|
for range servers {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
s := c.serverMgr.FindServer()
|
||||||
|
ok, err := c.connPool.PingConsulServer(s)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Unable to ping server %v: %s", s.String(), err)
|
||||||
|
}
|
||||||
|
pingCount += 1
|
||||||
|
|
||||||
|
// Artificially fail the server in order to rotate the server
|
||||||
|
// list
|
||||||
|
c.serverMgr.NotifyFailedServer(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pingCount != numServers {
|
||||||
|
t.Errorf("bad len: %d/%d", pingCount, numServers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestClient_RPC_TLS(t *testing.T) {
|
func TestClient_RPC_TLS(t *testing.T) {
|
||||||
dir1, conf1 := testServerConfig(t, "a.testco.internal")
|
dir1, conf1 := testServerConfig(t, "a.testco.internal")
|
||||||
conf1.VerifyIncoming = true
|
conf1.VerifyIncoming = true
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/server_details"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/yamux"
|
"github.com/hashicorp/yamux"
|
||||||
|
@ -405,6 +406,30 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PingConsulServer sends a Status.Ping message to the specified server and
|
||||||
|
// returns true if healthy, false if an error occurred
|
||||||
|
func (p *ConnPool) PingConsulServer(s *server_details.ServerDetails) (bool, error) {
|
||||||
|
// Get a usable client
|
||||||
|
conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the RPC call
|
||||||
|
var out struct{}
|
||||||
|
err = msgpackrpc.CallWithCodec(sc.codec, "Status.Ping", struct{}{}, &out)
|
||||||
|
if err != nil {
|
||||||
|
sc.Close()
|
||||||
|
p.releaseConn(conn)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done with the connection
|
||||||
|
conn.returnClient(sc)
|
||||||
|
p.releaseConn(conn)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Reap is used to close conns open over maxTime
|
// Reap is used to close conns open over maxTime
|
||||||
func (p *ConnPool) reap() {
|
func (p *ConnPool) reap() {
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -8,6 +8,16 @@ import (
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Key is used in maps and for equality tests. A key is based on endpoints.
|
||||||
|
type Key struct {
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Equal compares two Key objects
|
||||||
|
func (k *Key) Equal(x *Key) bool {
|
||||||
|
return k.name == x.name
|
||||||
|
}
|
||||||
|
|
||||||
// ServerDetails is used to return details of a consul server
|
// ServerDetails is used to return details of a consul server
|
||||||
type ServerDetails struct {
|
type ServerDetails struct {
|
||||||
Name string
|
Name string
|
||||||
|
@ -19,8 +29,22 @@ type ServerDetails struct {
|
||||||
Addr net.Addr
|
Addr net.Addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Key returns the corresponding Key
|
||||||
|
func (s *ServerDetails) Key() *Key {
|
||||||
|
return &Key{
|
||||||
|
name: s.Name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns a string representation of ServerDetails
|
||||||
func (s *ServerDetails) String() string {
|
func (s *ServerDetails) String() string {
|
||||||
return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter)
|
var addrStr, networkStr string
|
||||||
|
if s.Addr != nil {
|
||||||
|
addrStr = s.Addr.String()
|
||||||
|
networkStr = s.Addr.Network()
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("%s (Addr: %s/%s) (DC: %s)", s.Name, networkStr, addrStr, s.Datacenter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsConsulServer returns true if a serf member is a consul server. Returns a
|
// IsConsulServer returns true if a serf member is a consul server. Returns a
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
package server_details
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServerDetails_Key_Equal(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
k1 *Key
|
||||||
|
k2 *Key
|
||||||
|
equal bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Key equality",
|
||||||
|
k1: &Key{
|
||||||
|
name: "s1",
|
||||||
|
},
|
||||||
|
k2: &Key{
|
||||||
|
name: "s1",
|
||||||
|
},
|
||||||
|
equal: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Key Inequality",
|
||||||
|
k1: &Key{
|
||||||
|
name: "s1",
|
||||||
|
},
|
||||||
|
k2: &Key{
|
||||||
|
name: "s2",
|
||||||
|
},
|
||||||
|
equal: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
if test.k1.Equal(test.k2) != test.equal {
|
||||||
|
t.Errorf("Expected a %v result from test %s", test.equal, test.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test Key to make sure it actually works as a key
|
||||||
|
m := make(map[Key]bool)
|
||||||
|
m[*test.k1] = true
|
||||||
|
if _, found := m[*test.k2]; found != test.equal {
|
||||||
|
t.Errorf("Expected a %v result from map test %s", test.equal, test.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerDetails_Key(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
sd *ServerDetails
|
||||||
|
k *Key
|
||||||
|
equal bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Key equality",
|
||||||
|
sd: &ServerDetails{
|
||||||
|
Name: "s1",
|
||||||
|
},
|
||||||
|
k: &Key{
|
||||||
|
name: "s1",
|
||||||
|
},
|
||||||
|
equal: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Key inequality",
|
||||||
|
sd: &ServerDetails{
|
||||||
|
Name: "s1",
|
||||||
|
},
|
||||||
|
k: &Key{
|
||||||
|
name: "s2",
|
||||||
|
},
|
||||||
|
equal: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
if test.k.Equal(test.sd.Key()) != test.equal {
|
||||||
|
t.Errorf("Expected a %v result from test %s", test.equal, test.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,6 +8,48 @@ import (
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestServerDetails_Key_params(t *testing.T) {
|
||||||
|
ipv4a := net.ParseIP("127.0.0.1")
|
||||||
|
ipv4b := net.ParseIP("1.2.3.4")
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
sd1 *server_details.ServerDetails
|
||||||
|
sd2 *server_details.ServerDetails
|
||||||
|
equal bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Addr inequality",
|
||||||
|
sd1: &server_details.ServerDetails{
|
||||||
|
Name: "s1",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Port: 8300,
|
||||||
|
Addr: &net.IPAddr{IP: ipv4a},
|
||||||
|
},
|
||||||
|
sd2: &server_details.ServerDetails{
|
||||||
|
Name: "s1",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Port: 8300,
|
||||||
|
Addr: &net.IPAddr{IP: ipv4b},
|
||||||
|
},
|
||||||
|
equal: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
if test.sd1.Key().Equal(test.sd2.Key()) != test.equal {
|
||||||
|
t.Errorf("Expected a %v result from test %s", test.equal, test.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test Key to make sure it actually works as a key
|
||||||
|
m := make(map[server_details.Key]bool)
|
||||||
|
m[*test.sd1.Key()] = true
|
||||||
|
if _, found := m[*test.sd2.Key()]; found != test.equal {
|
||||||
|
t.Errorf("Expected a %v result from map test %s", test.equal, test.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestIsConsulServer(t *testing.T) {
|
func TestIsConsulServer(t *testing.T) {
|
||||||
m := serf.Member{
|
m := serf.Member{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
|
|
|
@ -47,11 +47,19 @@ const (
|
||||||
newRebalanceConnsPerSecPerServer = 64
|
newRebalanceConnsPerSecPerServer = 64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ConsulClusterInfo is an interface wrapper around serf and prevents a
|
||||||
|
// cyclic import dependency
|
||||||
type ConsulClusterInfo interface {
|
type ConsulClusterInfo interface {
|
||||||
NumNodes() int
|
NumNodes() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverCfg is the thread-safe configuration struct used to maintain the
|
// Pinger is an interface wrapping client.ConnPool to prevent a
|
||||||
|
// cyclic import dependency
|
||||||
|
type Pinger interface {
|
||||||
|
PingConsulServer(server *server_details.ServerDetails) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// serverConfig is the thread-safe configuration struct used to maintain the
|
||||||
// list of Consul servers in ServerManager.
|
// list of Consul servers in ServerManager.
|
||||||
//
|
//
|
||||||
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will
|
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will
|
||||||
|
@ -68,6 +76,9 @@ type ServerManager struct {
|
||||||
serverConfigValue atomic.Value
|
serverConfigValue atomic.Value
|
||||||
serverConfigLock sync.Mutex
|
serverConfigLock sync.Mutex
|
||||||
|
|
||||||
|
// rebalanceTimer controls the duration of the rebalance interval
|
||||||
|
rebalanceTimer *time.Timer
|
||||||
|
|
||||||
// shutdownCh is a copy of the channel in consul.Client
|
// shutdownCh is a copy of the channel in consul.Client
|
||||||
shutdownCh chan struct{}
|
shutdownCh chan struct{}
|
||||||
|
|
||||||
|
@ -78,8 +89,13 @@ type ServerManager struct {
|
||||||
// connections. ConsulClusterInfo is an interface that wraps serf.
|
// connections. ConsulClusterInfo is an interface that wraps serf.
|
||||||
clusterInfo ConsulClusterInfo
|
clusterInfo ConsulClusterInfo
|
||||||
|
|
||||||
// notifyFailedServersBarrier is acts as a barrier to prevent
|
// connPoolPinger is used to test the health of a server in the
|
||||||
// queueing behind serverConfigLog and acts as a TryLock().
|
// connection pool. Pinger is an interface that wraps
|
||||||
|
// client.ConnPool.
|
||||||
|
connPoolPinger Pinger
|
||||||
|
|
||||||
|
// notifyFailedBarrier is acts as a barrier to prevent queuing behind
|
||||||
|
// serverConfigLog and acts as a TryLock().
|
||||||
notifyFailedBarrier int32
|
notifyFailedBarrier int32
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,20 +107,20 @@ type ServerManager struct {
|
||||||
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
||||||
sm.serverConfigLock.Lock()
|
sm.serverConfigLock.Lock()
|
||||||
defer sm.serverConfigLock.Unlock()
|
defer sm.serverConfigLock.Unlock()
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
// Check if this server is known
|
// Check if this server is known
|
||||||
found := false
|
found := false
|
||||||
for idx, existing := range serverCfg.servers {
|
for idx, existing := range sc.servers {
|
||||||
if existing.Name == server.Name {
|
if existing.Name == server.Name {
|
||||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers))
|
newServers := make([]*server_details.ServerDetails, len(sc.servers))
|
||||||
copy(newServers, serverCfg.servers)
|
copy(newServers, sc.servers)
|
||||||
|
|
||||||
// Overwrite the existing server details in order to
|
// Overwrite the existing server details in order to
|
||||||
// possibly update metadata (e.g. server version)
|
// possibly update metadata (e.g. server version)
|
||||||
newServers[idx] = server
|
newServers[idx] = server
|
||||||
|
|
||||||
serverCfg.servers = newServers
|
sc.servers = newServers
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -112,18 +128,23 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
||||||
|
|
||||||
// Add to the list if not known
|
// Add to the list if not known
|
||||||
if !found {
|
if !found {
|
||||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1)
|
newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1)
|
||||||
copy(newServers, serverCfg.servers)
|
copy(newServers, sc.servers)
|
||||||
newServers = append(newServers, server)
|
newServers = append(newServers, server)
|
||||||
serverCfg.servers = newServers
|
sc.servers = newServers
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.saveServerConfig(serverCfg)
|
sm.saveServerConfig(sc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cycleServers returns a new list of servers that has dequeued the first
|
// cycleServers returns a new list of servers that has dequeued the first
|
||||||
// server and enqueued it at the end of the list. cycleServers assumes the
|
// server and enqueued it at the end of the list. cycleServers assumes the
|
||||||
// caller is holding the serverConfigLock.
|
// caller is holding the serverConfigLock. cycleServer does not test or ping
|
||||||
|
// the next server inline. cycleServer may be called when the environment
|
||||||
|
// has just entered an unhealthy situation and blocking on a server test is
|
||||||
|
// less desirable than just returning the next server in the firing line. If
|
||||||
|
// the next server fails, it will fail fast enough and cycleServer will be
|
||||||
|
// called again.
|
||||||
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
|
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
|
||||||
numServers := len(sc.servers)
|
numServers := len(sc.servers)
|
||||||
if numServers < 2 {
|
if numServers < 2 {
|
||||||
|
@ -133,9 +154,30 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
|
||||||
newServers := make([]*server_details.ServerDetails, 0, numServers)
|
newServers := make([]*server_details.ServerDetails, 0, numServers)
|
||||||
newServers = append(newServers, sc.servers[1:]...)
|
newServers = append(newServers, sc.servers[1:]...)
|
||||||
newServers = append(newServers, sc.servers[0])
|
newServers = append(newServers, sc.servers[0])
|
||||||
|
|
||||||
return newServers
|
return newServers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeServerByKey performs an inline removal of the first matching server
|
||||||
|
func (sc *serverConfig) removeServerByKey(targetKey *server_details.Key) {
|
||||||
|
for i, s := range sc.servers {
|
||||||
|
if targetKey.Equal(s.Key()) {
|
||||||
|
copy(sc.servers[i:], sc.servers[i+1:])
|
||||||
|
sc.servers[len(sc.servers)-1] = nil
|
||||||
|
sc.servers = sc.servers[:len(sc.servers)-1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// shuffleServers shuffles the server list in place
|
||||||
|
func (sc *serverConfig) shuffleServers() {
|
||||||
|
for i := len(sc.servers) - 1; i > 0; i-- {
|
||||||
|
j := rand.Int31n(int32(i + 1))
|
||||||
|
sc.servers[i], sc.servers[j] = sc.servers[j], sc.servers[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// FindServer takes out an internal "read lock" and searches through the list
|
// FindServer takes out an internal "read lock" and searches through the list
|
||||||
// of servers to find a "healthy" server. If the server is actually
|
// of servers to find a "healthy" server. If the server is actually
|
||||||
// unhealthy, we rely on Serf to detect this and remove the node from the
|
// unhealthy, we rely on Serf to detect this and remove the node from the
|
||||||
|
@ -143,17 +185,17 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
|
||||||
// during an RPC call, it is rotated to the end of the list. If there are no
|
// during an RPC call, it is rotated to the end of the list. If there are no
|
||||||
// servers available, return nil.
|
// servers available, return nil.
|
||||||
func (sm *ServerManager) FindServer() *server_details.ServerDetails {
|
func (sm *ServerManager) FindServer() *server_details.ServerDetails {
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
numServers := len(serverCfg.servers)
|
numServers := len(sc.servers)
|
||||||
if numServers == 0 {
|
if numServers == 0 {
|
||||||
sm.logger.Printf("[WARN] consul: No servers available")
|
sm.logger.Printf("[WARN] server manager: No servers available")
|
||||||
return nil
|
return nil
|
||||||
} else {
|
} else {
|
||||||
// Return whatever is at the front of the list because it is
|
// Return whatever is at the front of the list because it is
|
||||||
// assumed to be the oldest in the server list (unless -
|
// assumed to be the oldest in the server list (unless -
|
||||||
// hypothetically - the server list was rotated right after a
|
// hypothetically - the server list was rotated right after a
|
||||||
// server was added).
|
// server was added).
|
||||||
return serverCfg.servers[0]
|
return sc.servers[0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,11 +212,12 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New is the only way to safely create a new ServerManager struct.
|
// New is the only way to safely create a new ServerManager struct.
|
||||||
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
|
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (sm *ServerManager) {
|
||||||
// NOTE(sean@): Can't pass *consul.Client due to an import cycle
|
|
||||||
sm = new(ServerManager)
|
sm = new(ServerManager)
|
||||||
sm.logger = logger
|
sm.logger = logger
|
||||||
sm.clusterInfo = clusterInfo
|
sm.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
|
||||||
|
sm.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
|
||||||
|
sm.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
|
||||||
sm.shutdownCh = shutdownCh
|
sm.shutdownCh = shutdownCh
|
||||||
|
|
||||||
sc := serverConfig{}
|
sc := serverConfig{}
|
||||||
|
@ -186,7 +229,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulCluster
|
||||||
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
||||||
// to the end of the server list.
|
// to the end of the server list.
|
||||||
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
// If the server being failed is not the first server on the list,
|
// If the server being failed is not the first server on the list,
|
||||||
// this is a noop. If, however, the server is failed and first on
|
// this is a noop. If, however, the server is failed and first on
|
||||||
|
@ -194,7 +237,7 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
|
||||||
// the server to the end of the list.
|
// the server to the end of the list.
|
||||||
|
|
||||||
// Only rotate the server list when there is more than one server
|
// Only rotate the server list when there is more than one server
|
||||||
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server &&
|
if len(sc.servers) > 1 && sc.servers[0] == server &&
|
||||||
// Use atomic.CAS to emulate a TryLock().
|
// Use atomic.CAS to emulate a TryLock().
|
||||||
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
|
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
|
||||||
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
|
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
|
||||||
|
@ -203,11 +246,11 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
|
||||||
// server to the end.
|
// server to the end.
|
||||||
sm.serverConfigLock.Lock()
|
sm.serverConfigLock.Lock()
|
||||||
defer sm.serverConfigLock.Unlock()
|
defer sm.serverConfigLock.Unlock()
|
||||||
serverCfg = sm.getServerConfig()
|
sc = sm.getServerConfig()
|
||||||
|
|
||||||
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server {
|
if len(sc.servers) > 1 && sc.servers[0] == server {
|
||||||
serverCfg.servers = serverCfg.cycleServer()
|
sc.servers = sc.cycleServer()
|
||||||
sm.saveServerConfig(serverCfg)
|
sm.saveServerConfig(sc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,36 +258,144 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
|
||||||
// NumServers takes out an internal "read lock" and returns the number of
|
// NumServers takes out an internal "read lock" and returns the number of
|
||||||
// servers. numServers includes both healthy and unhealthy servers.
|
// servers. numServers includes both healthy and unhealthy servers.
|
||||||
func (sm *ServerManager) NumServers() (numServers int) {
|
func (sm *ServerManager) NumServers() (numServers int) {
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
numServers = len(serverCfg.servers)
|
numServers = len(sc.servers)
|
||||||
return numServers
|
return numServers
|
||||||
}
|
}
|
||||||
|
|
||||||
// RebalanceServers takes out an internal write lock and shuffles the list of
|
// RebalanceServers shuffles the list of servers on this agent. The server
|
||||||
// servers on this agent. This allows for a redistribution of work across
|
// at the front of the list is selected for the next RPC. RPC calls that
|
||||||
// consul servers and provides a guarantee that the order of the server list
|
// fail for a particular server are rotated to the end of the list. This
|
||||||
// isn't related to the age at which the node was added to the cluster.
|
// method reshuffles the list periodically in order to redistribute work
|
||||||
// Elsewhere we rely on the position in the server list as a hint regarding
|
// across all known consul servers (i.e. guarantee that the order of servers
|
||||||
// the stability of a server relative to its position in the server list.
|
// in the server list isn't positively correlated with the age of a server in
|
||||||
// Servers at or near the front of the list are more stable than servers near
|
// the consul cluster). Periodically shuffling the server list prevents
|
||||||
// the end of the list. Unhealthy servers are removed when serf notices the
|
// long-lived clients from fixating on long-lived servers.
|
||||||
// server has been deregistered.
|
//
|
||||||
|
// Unhealthy servers are removed when serf notices the server has been
|
||||||
|
// deregistered. Before the newly shuffled server list is saved, the new
|
||||||
|
// remote endpoint is tested to ensure its responsive.
|
||||||
func (sm *ServerManager) RebalanceServers() {
|
func (sm *ServerManager) RebalanceServers() {
|
||||||
|
// Obtain a copy of the current serverConfig
|
||||||
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
|
// Early abort if there is no value to shuffling
|
||||||
|
if len(sc.servers) < 2 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sc.shuffleServers()
|
||||||
|
|
||||||
|
// Iterate through the shuffled server list to find a healthy server.
|
||||||
|
// Don't iterate on the list directly, this loop mutates the server
|
||||||
|
// list.
|
||||||
|
var foundHealthyServer bool
|
||||||
|
for i := 0; i < len(sc.servers); i++ {
|
||||||
|
// Always test the first server. Failed servers are cycled
|
||||||
|
// while Serf detects the node has failed.
|
||||||
|
selectedServer := sc.servers[0]
|
||||||
|
|
||||||
|
ok, err := sm.connPoolPinger.PingConsulServer(selectedServer)
|
||||||
|
if ok {
|
||||||
|
foundHealthyServer = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
sm.logger.Printf(`[DEBUG] server manager: pinging server "%s" failed: %s`, selectedServer.String(), err)
|
||||||
|
|
||||||
|
sc.cycleServer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no healthy servers were found, sleep and wait for Serf to make
|
||||||
|
// the world a happy place again.
|
||||||
|
if !foundHealthyServer {
|
||||||
|
sm.logger.Printf("[DEBUG] server manager: No healthy servers during rebalance, aborting")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that all servers are present
|
||||||
|
if sm.reconcileServerList(&sc) {
|
||||||
|
sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String())
|
||||||
|
} else {
|
||||||
|
// reconcileServerList failed because Serf removed the server
|
||||||
|
// that was at the front of the list that had successfully
|
||||||
|
// been Ping'ed. Between the Ping and reconcile, a Serf
|
||||||
|
// event had shown up removing the node. Prevent an RPC
|
||||||
|
// timeout by retrying RebalanceServers().
|
||||||
|
//
|
||||||
|
// Instead of doing any heroics, "freeze in place" and
|
||||||
|
// continue to use the existing connection until the next
|
||||||
|
// rebalance occurs.
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// reconcileServerList returns true when the first server in serverConfig
|
||||||
|
// exists in the receiver's serverConfig. If true, the merged serverConfig
|
||||||
|
// is stored as the receiver's serverConfig. Returns false if the first
|
||||||
|
// server does not exist in the list (i.e. was removed by Serf during a
|
||||||
|
// PingConsulServer() call. Newly added servers are appended to the list and
|
||||||
|
// other missing servers are removed from the list.
|
||||||
|
func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool {
|
||||||
sm.serverConfigLock.Lock()
|
sm.serverConfigLock.Lock()
|
||||||
defer sm.serverConfigLock.Unlock()
|
defer sm.serverConfigLock.Unlock()
|
||||||
serverCfg := sm.getServerConfig()
|
|
||||||
|
|
||||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers))
|
// newServerCfg is a serverConfig that has been kept up to date with
|
||||||
copy(newServers, serverCfg.servers)
|
// Serf node join and node leave events.
|
||||||
|
newServerCfg := sm.getServerConfig()
|
||||||
|
|
||||||
// Shuffle the server list
|
// If Serf has removed all nodes, or there is no selected server
|
||||||
for i := len(serverCfg.servers) - 1; i > 0; i-- {
|
// (zero nodes in sc), abort early.
|
||||||
j := rand.Int31n(int32(i + 1))
|
if len(newServerCfg.servers) == 0 || len(sc.servers) == 0 {
|
||||||
newServers[i], newServers[j] = newServers[j], newServers[i]
|
return false
|
||||||
}
|
}
|
||||||
serverCfg.servers = newServers
|
|
||||||
|
|
||||||
sm.saveServerConfig(serverCfg)
|
type targetServer struct {
|
||||||
|
server *server_details.ServerDetails
|
||||||
|
|
||||||
|
// 'b' == both
|
||||||
|
// 'o' == original
|
||||||
|
// 'n' == new
|
||||||
|
state byte
|
||||||
|
}
|
||||||
|
mergedList := make(map[server_details.Key]*targetServer, len(sc.servers))
|
||||||
|
for _, s := range sc.servers {
|
||||||
|
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
|
||||||
|
}
|
||||||
|
for _, s := range newServerCfg.servers {
|
||||||
|
k := s.Key()
|
||||||
|
_, found := mergedList[*k]
|
||||||
|
if found {
|
||||||
|
mergedList[*k].state = 'b'
|
||||||
|
} else {
|
||||||
|
mergedList[*k] = &targetServer{server: s, state: 'n'}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the selected server has not been removed by Serf
|
||||||
|
selectedServerKey := sc.servers[0].Key()
|
||||||
|
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append any new servers and remove any old servers
|
||||||
|
for k, v := range mergedList {
|
||||||
|
switch v.state {
|
||||||
|
case 'b':
|
||||||
|
// Do nothing, server exists in both
|
||||||
|
case 'o':
|
||||||
|
// Server has been removed
|
||||||
|
sc.removeServerByKey(&k)
|
||||||
|
case 'n':
|
||||||
|
// Server added
|
||||||
|
sc.servers = append(sc.servers, v.server)
|
||||||
|
default:
|
||||||
|
panic("unknown merge list state")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.saveServerConfig(*sc)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveServer takes out an internal write lock and removes a server from
|
// RemoveServer takes out an internal write lock and removes a server from
|
||||||
|
@ -252,28 +403,26 @@ func (sm *ServerManager) RebalanceServers() {
|
||||||
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
||||||
sm.serverConfigLock.Lock()
|
sm.serverConfigLock.Lock()
|
||||||
defer sm.serverConfigLock.Unlock()
|
defer sm.serverConfigLock.Unlock()
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
// Remove the server if known
|
// Remove the server if known
|
||||||
for i, _ := range serverCfg.servers {
|
for i, _ := range sc.servers {
|
||||||
if serverCfg.servers[i].Name == server.Name {
|
if sc.servers[i].Name == server.Name {
|
||||||
newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1)
|
newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1)
|
||||||
newServers = append(newServers, serverCfg.servers[:i]...)
|
newServers = append(newServers, sc.servers[:i]...)
|
||||||
newServers = append(newServers, serverCfg.servers[i+1:]...)
|
newServers = append(newServers, sc.servers[i+1:]...)
|
||||||
serverCfg.servers = newServers
|
sc.servers = newServers
|
||||||
|
|
||||||
sm.saveServerConfig(serverCfg)
|
sm.saveServerConfig(sc)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// refreshServerRebalanceTimer is only called once the rebalanceTimer
|
// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires.
|
||||||
// expires. Historically this was an expensive routine and is intended to be
|
func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration {
|
||||||
// run in isolation in a dedicated, non-concurrent task.
|
sc := sm.getServerConfig()
|
||||||
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration {
|
numConsulServers := len(sc.servers)
|
||||||
serverCfg := sm.getServerConfig()
|
|
||||||
numConsulServers := len(serverCfg.servers)
|
|
||||||
// Limit this connection's life based on the size (and health) of the
|
// Limit this connection's life based on the size (and health) of the
|
||||||
// cluster. Never rebalance a connection more frequently than
|
// cluster. Never rebalance a connection more frequently than
|
||||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
// connReuseLowWatermarkDuration, and make sure we never exceed
|
||||||
|
@ -283,10 +432,18 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Dur
|
||||||
numLANMembers := sm.clusterInfo.NumNodes()
|
numLANMembers := sm.clusterInfo.NumNodes()
|
||||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
||||||
|
|
||||||
timer.Reset(connRebalanceTimeout)
|
sm.rebalanceTimer.Reset(connRebalanceTimeout)
|
||||||
return connRebalanceTimeout
|
return connRebalanceTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetRebalanceTimer resets the rebalance timer. This method primarily
|
||||||
|
// exists for testing and should not be used directly.
|
||||||
|
func (sm *ServerManager) ResetRebalanceTimer() {
|
||||||
|
sm.serverConfigLock.Lock()
|
||||||
|
defer sm.serverConfigLock.Unlock()
|
||||||
|
sm.rebalanceTimer.Reset(clientRPCMinReuseDuration)
|
||||||
|
}
|
||||||
|
|
||||||
// Start is used to start and manage the task of automatically shuffling and
|
// Start is used to start and manage the task of automatically shuffling and
|
||||||
// rebalancing the list of consul servers. This maintenance only happens
|
// rebalancing the list of consul servers. This maintenance only happens
|
||||||
// periodically based on the expiration of the timer. Failed servers are
|
// periodically based on the expiration of the timer. Failed servers are
|
||||||
|
@ -294,14 +451,11 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Dur
|
||||||
// the list. The order of the server list must be shuffled periodically to
|
// the list. The order of the server list must be shuffled periodically to
|
||||||
// distribute load across all known and available consul servers.
|
// distribute load across all known and available consul servers.
|
||||||
func (sm *ServerManager) Start() {
|
func (sm *ServerManager) Start() {
|
||||||
var rebalanceTimer *time.Timer = time.NewTimer(clientRPCMinReuseDuration)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-rebalanceTimer.C:
|
case <-sm.rebalanceTimer.C:
|
||||||
sm.logger.Printf("[INFO] server manager: Rebalancing server connections")
|
|
||||||
sm.RebalanceServers()
|
sm.RebalanceServers()
|
||||||
sm.refreshServerRebalanceTimer(rebalanceTimer)
|
sm.refreshServerRebalanceTimer()
|
||||||
|
|
||||||
case <-sm.shutdownCh:
|
case <-sm.shutdownCh:
|
||||||
sm.logger.Printf("[INFO] server manager: shutting down")
|
sm.logger.Printf("[INFO] server manager: shutting down")
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -25,6 +26,20 @@ func GetBufferedLogger() *log.Logger {
|
||||||
return localLogger
|
return localLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fauxConnPool struct {
|
||||||
|
// failPct between 0.0 and 1.0 == pct of time a Ping should fail
|
||||||
|
failPct float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) {
|
||||||
|
var success bool
|
||||||
|
successProb := rand.Float64()
|
||||||
|
if successProb > cp.failPct {
|
||||||
|
success = true
|
||||||
|
}
|
||||||
|
return success, nil
|
||||||
|
}
|
||||||
|
|
||||||
type fauxSerf struct {
|
type fauxSerf struct {
|
||||||
numNodes int
|
numNodes int
|
||||||
}
|
}
|
||||||
|
@ -36,7 +51,15 @@ func (s *fauxSerf) NumNodes() int {
|
||||||
func testServerManager() (sm *ServerManager) {
|
func testServerManager() (sm *ServerManager) {
|
||||||
logger := GetBufferedLogger()
|
logger := GetBufferedLogger()
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384})
|
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{})
|
||||||
|
return sm
|
||||||
|
}
|
||||||
|
|
||||||
|
func testServerManagerFailProb(failPct float64) (sm *ServerManager) {
|
||||||
|
logger := GetBufferedLogger()
|
||||||
|
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||||
|
shutdownCh := make(chan struct{})
|
||||||
|
sm = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,17 +148,119 @@ func TestServerManagerInternal_New(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) {
|
// func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool {
|
||||||
|
func TestServerManagerInternal_reconcileServerList(t *testing.T) {
|
||||||
|
tests := []int{0, 1, 2, 3, 4, 5, 10, 100}
|
||||||
|
for _, n := range tests {
|
||||||
|
ok, err := test_reconcileServerList(n)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Expected %d to pass: %v", n, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func test_reconcileServerList(maxServers int) (bool, error) {
|
||||||
|
// Build a server list, reconcile, verify the missing servers are
|
||||||
|
// missing, the added have been added, and the original server is
|
||||||
|
// present.
|
||||||
|
const failPct = 0.5
|
||||||
|
sm := testServerManagerFailProb(failPct)
|
||||||
|
|
||||||
|
var failedServers, healthyServers []*server_details.ServerDetails
|
||||||
|
for i := 0; i < maxServers; i++ {
|
||||||
|
nodeName := fmt.Sprintf("s%02d", i)
|
||||||
|
|
||||||
|
node := &server_details.ServerDetails{Name: nodeName}
|
||||||
|
// Add 66% of servers to ServerManager
|
||||||
|
if rand.Float64() > 0.33 {
|
||||||
|
sm.AddServer(node)
|
||||||
|
|
||||||
|
// Of healthy servers, (ab)use connPoolPinger to
|
||||||
|
// failPct of the servers for the reconcile. This
|
||||||
|
// allows for the selected server to no longer be
|
||||||
|
// healthy for the reconcile below.
|
||||||
|
if ok, _ := sm.connPoolPinger.PingConsulServer(node); ok {
|
||||||
|
// Will still be present
|
||||||
|
healthyServers = append(healthyServers, node)
|
||||||
|
} else {
|
||||||
|
// Will be missing
|
||||||
|
failedServers = append(failedServers, node)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Will be added from the call to reconcile
|
||||||
|
healthyServers = append(healthyServers, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Randomize ServerManager's server list
|
||||||
|
sm.RebalanceServers()
|
||||||
|
selectedServer := sm.FindServer()
|
||||||
|
|
||||||
|
var selectedServerFailed bool
|
||||||
|
for _, s := range failedServers {
|
||||||
|
if selectedServer.Key().Equal(s.Key()) {
|
||||||
|
selectedServerFailed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update ServerManager's server list to be "healthy" based on Serf.
|
||||||
|
// Reconcile this with origServers, which is shuffled and has a live
|
||||||
|
// connection, but possibly out of date.
|
||||||
|
origServers := sm.getServerConfig()
|
||||||
|
sm.saveServerConfig(serverConfig{servers: healthyServers})
|
||||||
|
|
||||||
|
// This should always succeed with non-zero server lists
|
||||||
|
if !selectedServerFailed && !sm.reconcileServerList(&origServers) &&
|
||||||
|
len(sm.getServerConfig().servers) != 0 &&
|
||||||
|
len(origServers.servers) != 0 {
|
||||||
|
// If the random gods are unfavorable and we end up with zero
|
||||||
|
// length lists, expect things to fail and retry the test.
|
||||||
|
return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d",
|
||||||
|
selectedServerFailed,
|
||||||
|
len(sm.getServerConfig().servers),
|
||||||
|
len(origServers.servers))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we have zero-length server lists, test succeeded in degenerate
|
||||||
|
// case.
|
||||||
|
if len(sm.getServerConfig().servers) == 0 &&
|
||||||
|
len(origServers.servers) == 0 {
|
||||||
|
// Failed as expected w/ zero length list
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
resultingServerMap := make(map[server_details.Key]bool)
|
||||||
|
for _, s := range sm.getServerConfig().servers {
|
||||||
|
resultingServerMap[*s.Key()] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test to make sure no failed servers are in the ServerManager's
|
||||||
|
// list. Error if there are any failedServers in sc.servers
|
||||||
|
for _, s := range failedServers {
|
||||||
|
_, ok := resultingServerMap[*s.Key()]
|
||||||
|
if ok {
|
||||||
|
return false, fmt.Errorf("Found failed server %v in merged list %v", s, resultingServerMap)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test to make sure all healthy servers are in the healthy list.
|
||||||
|
if len(healthyServers) != len(sm.getServerConfig().servers) {
|
||||||
|
return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test to make sure all healthy servers are in the resultingServerMap list.
|
||||||
|
for _, s := range healthyServers {
|
||||||
|
_, ok := resultingServerMap[*s.Key()]
|
||||||
|
if !ok {
|
||||||
|
return false, fmt.Errorf("Server %v missing from healthy map after merged lists", s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// func (sc *serverConfig) refreshServerRebalanceTimer() {
|
||||||
func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
|
func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||||
sm := testServerManager()
|
|
||||||
|
|
||||||
timer := time.NewTimer(time.Duration(1 * time.Nanosecond))
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
sm.refreshServerRebalanceTimer(timer)
|
|
||||||
|
|
||||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
|
||||||
shutdownCh := make(chan struct{})
|
|
||||||
|
|
||||||
type clusterSizes struct {
|
type clusterSizes struct {
|
||||||
numNodes int
|
numNodes int
|
||||||
numServers int
|
numServers int
|
||||||
|
@ -170,17 +295,19 @@ func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||||
{1000000, 19, 10 * time.Minute},
|
{1000000, 19, 10 * time.Minute},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, s := range clusters {
|
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||||
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes})
|
shutdownCh := make(chan struct{})
|
||||||
|
|
||||||
|
for _, s := range clusters {
|
||||||
|
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
|
||||||
for i := 0; i < s.numServers; i++ {
|
for i := 0; i < s.numServers; i++ {
|
||||||
nodeName := fmt.Sprintf("s%02d", i)
|
nodeName := fmt.Sprintf("s%02d", i)
|
||||||
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
|
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
|
||||||
}
|
}
|
||||||
|
|
||||||
d := sm.refreshServerRebalanceTimer(timer)
|
d := sm.refreshServerRebalanceTimer()
|
||||||
if d < s.minRebalance {
|
if d < s.minRebalance {
|
||||||
t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance)
|
t.Errorf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -26,6 +27,20 @@ func GetBufferedLogger() *log.Logger {
|
||||||
return localLogger
|
return localLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fauxConnPool struct {
|
||||||
|
// failPct between 0.0 and 1.0 == pct of time a Ping should fail
|
||||||
|
failPct float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cp *fauxConnPool) PingConsulServer(server *server_details.ServerDetails) (bool, error) {
|
||||||
|
var success bool
|
||||||
|
successProb := rand.Float64()
|
||||||
|
if successProb > cp.failPct {
|
||||||
|
success = true
|
||||||
|
}
|
||||||
|
return success, nil
|
||||||
|
}
|
||||||
|
|
||||||
type fauxSerf struct {
|
type fauxSerf struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +52,15 @@ func testServerManager() (sm *server_manager.ServerManager) {
|
||||||
logger := GetBufferedLogger()
|
logger := GetBufferedLogger()
|
||||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
sm = server_manager.New(logger, shutdownCh, &fauxSerf{})
|
sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||||
|
return sm
|
||||||
|
}
|
||||||
|
|
||||||
|
func testServerManagerFailProb(failPct float64) (sm *server_manager.ServerManager) {
|
||||||
|
logger := GetBufferedLogger()
|
||||||
|
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||||
|
shutdownCh := make(chan struct{})
|
||||||
|
sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,7 +147,7 @@ func TestServerManager_New(t *testing.T) {
|
||||||
logger := GetBufferedLogger()
|
logger := GetBufferedLogger()
|
||||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
sm := server_manager.New(logger, shutdownCh, &fauxSerf{})
|
sm := server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||||
if sm == nil {
|
if sm == nil {
|
||||||
t.Fatalf("ServerManager nil")
|
t.Fatalf("ServerManager nil")
|
||||||
}
|
}
|
||||||
|
@ -202,7 +225,8 @@ func TestServerManager_NumServers(t *testing.T) {
|
||||||
|
|
||||||
// func (sm *ServerManager) RebalanceServers() {
|
// func (sm *ServerManager) RebalanceServers() {
|
||||||
func TestServerManager_RebalanceServers(t *testing.T) {
|
func TestServerManager_RebalanceServers(t *testing.T) {
|
||||||
sm := testServerManager()
|
const failPct = 0.5
|
||||||
|
sm := testServerManagerFailProb(failPct)
|
||||||
const maxServers = 100
|
const maxServers = 100
|
||||||
const numShuffleTests = 100
|
const numShuffleTests = 100
|
||||||
const uniquePassRate = 0.5
|
const uniquePassRate = 0.5
|
||||||
|
@ -265,6 +289,10 @@ func TestServerManager_RemoveServer(t *testing.T) {
|
||||||
servers = append(servers, server)
|
servers = append(servers, server)
|
||||||
sm.AddServer(server)
|
sm.AddServer(server)
|
||||||
}
|
}
|
||||||
|
if sm.NumServers() != maxServers {
|
||||||
|
t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers())
|
||||||
|
}
|
||||||
|
|
||||||
sm.RebalanceServers()
|
sm.RebalanceServers()
|
||||||
|
|
||||||
if sm.NumServers() != maxServers {
|
if sm.NumServers() != maxServers {
|
||||||
|
|
Loading…
Reference in New Issue