Merge pull request #1873 from hashicorp/f-rebalance-worker-0.7

Periodically rebalance the servers that agents talk to
This commit is contained in:
Sean Chittenden 2016-03-25 15:03:18 -07:00
commit ae32a3ceae
21 changed files with 1144 additions and 223 deletions

View File

@ -1,5 +1,17 @@
## 0.7.0 (UNRELEASED)
IMPROVEMENTS:
* Consul agents will now periodically reconnect to available Consul servers
in order to redistribute their RPC query load. Consul clients will, by
default, attempt to establish a new connection every 120s to 180s unless
the size of the cluster is sufficiently large. The rate at which agents
begin to query new servers is proportional to the size of the Consul
cluster (servers should never receive more than 64 new connections per
second per Consul server as a result of rebalancing). Clusters in stable
environments who use `allow_stale` should see a more even distribution of
query load across all of their Consul servers. [GH-1743]
## 0.6.4 (March 16, 2016)
BACKWARDS INCOMPATIBILITIES:

View File

@ -360,7 +360,7 @@ type Config struct {
// * deny - Deny all requests
// * extend-cache - Ignore the cache expiration, and allow cached
// ACL's to be used to service requests. This
// is the default. If the ACL is not in the cache,
// is the default. If the ACL is not in the cache,
// this acts like deny.
ACLDownPolicy string `mapstructure:"acl_down_policy"`

View File

@ -3,7 +3,6 @@ package consul
import (
"fmt"
"log"
"math/rand"
"os"
"path/filepath"
"strconv"
@ -11,19 +10,35 @@ import (
"sync"
"time"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/consul/server_manager"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
const (
// clientRPCCache controls how long we keep an idle connection
// open to a server
clientRPCCache = 30 * time.Second
// clientRPCConnMaxIdle controls how long we keep an idle connection
// open to a server. 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
clientRPCConnMaxIdle = 127 * time.Second
// clientMaxStreams controls how many idle streams we keep
// open to a server
clientMaxStreams = 32
// serfEventBacklog is the maximum number of unprocessed Serf Events
// that will be held in queue before new serf events block. A
// blocking serf event queue is a bad thing.
serfEventBacklog = 256
// serfEventBacklogWarning is the threshold at which point log
// warnings will be emitted indicating a problem when processing serf
// events.
serfEventBacklogWarning = 200
)
// Interface is used to provide either a Client or Server,
@ -43,19 +58,14 @@ type Client struct {
// Connection pool to consul servers
connPool *ConnPool
// consuls tracks the locally known servers
consuls []*serverParts
consulLock sync.RWMutex
// serverMgr is responsible for the selection and maintenance of
// Consul servers this agent uses for RPC requests
serverMgr *server_manager.ServerManager
// eventCh is used to receive events from the
// serf cluster in the datacenter
eventCh chan serf.Event
// lastServer is the last server we made an RPC call to,
// this is used to re-use the last connection
lastServer *serverParts
lastRPCTime time.Time
// Logger uses the provided LogOutput
logger *log.Logger
@ -103,12 +113,17 @@ func NewClient(config *Config) (*Client, error) {
// Create server
c := &Client{
config: config,
connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, 256),
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
shutdownCh: make(chan struct{}),
}
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf)
// Start maintenance task for serverMgr
go c.serverMgr.Start()
// Start the Serf listeners to prevent a deadlock
go c.lanEventHandler()
@ -215,7 +230,13 @@ func (c *Client) Encrypted() bool {
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
@ -240,7 +261,7 @@ func (c *Client) lanEventHandler() {
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isConsulServer(m)
ok, parts := server_details.IsConsulServer(m)
if !ok {
continue
}
@ -250,23 +271,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
// Check if this server is known
found := false
c.consulLock.Lock()
for idx, existing := range c.consuls {
if existing.Name == parts.Name {
c.consuls[idx] = parts
found = true
break
}
}
// Add to the list if not known
if !found {
c.consuls = append(c.consuls, parts)
}
c.consulLock.Unlock()
c.serverMgr.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
@ -278,23 +283,12 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isConsulServer(m)
ok, parts := server_details.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
// Remove the server if known
c.consulLock.Lock()
n := len(c.consuls)
for i := 0; i < n; i++ {
if c.consuls[i].Name == parts.Name {
c.consuls[i], c.consuls[n-1] = c.consuls[n-1], nil
c.consuls = c.consuls[:n-1]
break
}
}
c.consulLock.Unlock()
c.serverMgr.RemoveServer(parts)
}
}
@ -328,50 +322,33 @@ func (c *Client) localEvent(event serf.UserEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Check the last rpc time
var server *serverParts
if time.Now().Sub(c.lastRPCTime) < clientRPCCache {
server = c.lastServer
if server != nil {
goto TRY_RPC
}
}
// Bail if we can't find any servers
c.consulLock.RLock()
if len(c.consuls) == 0 {
c.consulLock.RUnlock()
server := c.serverMgr.FindServer()
if server == nil {
return structs.ErrNoServers
}
// Select a random addr
server = c.consuls[rand.Int31()%int32(len(c.consuls))]
c.consulLock.RUnlock()
// Forward to remote Consul
TRY_RPC:
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
c.lastServer = nil
c.lastRPCTime = time.Time{}
c.serverMgr.NotifyFailedServer(server)
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
return err
}
// Cache the last server
c.lastServer = server
c.lastRPCTime = time.Now()
return nil
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
numServers := c.serverMgr.NumServers()
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
stats := map[string]map[string]string{
"consul": map[string]string{
"server": "false",
"known_servers": toString(uint64(len(c.consuls))),
"known_servers": toString(uint64(numServers)),
},
"serf_lan": c.serf.Stats(),
"runtime": runtimeStats(),

View File

@ -83,6 +83,11 @@ func TestClient_JoinLAN(t *testing.T) {
if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
return c1.serverMgr.NumServers() == 1, nil
}, func(err error) {
t.Fatalf("expected consul server")
})
// Check the members
testutil.WaitForResult(func() (bool, error) {
@ -95,7 +100,7 @@ func TestClient_JoinLAN(t *testing.T) {
// Check we have a new consul
testutil.WaitForResult(func() (bool, error) {
return len(c1.consuls) == 1, nil
return c1.serverMgr.NumServers() == 1, nil
}, func(err error) {
t.Fatalf("expected consul server")
})

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
@ -349,7 +350,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
return true
}
if valid, parts := isConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
if valid, parts := server_details.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter {
return true
}
return false
@ -360,7 +361,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
func (s *Server) handleAliveMember(member serf.Member) error {
// Register consul service if a server
var service *structs.NodeService
if valid, parts := isConsulServer(member); valid {
if valid, parts := server_details.IsConsulServer(member); valid {
service = &structs.NodeService{
ID: ConsulServiceID,
Service: ConsulServiceName,
@ -496,7 +497,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
}
// Remove from Raft peers if this was a server
if valid, parts := isConsulServer(member); valid {
if valid, parts := server_details.IsConsulServer(member); valid {
if err := s.removeConsulServer(member, parts.Port); err != nil {
return err
}
@ -523,7 +524,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
}
// joinConsulServer is used to try to join another consul server
func (s *Server) joinConsulServer(m serf.Member, parts *serverParts) error {
func (s *Server) joinConsulServer(m serf.Member, parts *server_details.ServerDetails) error {
// Do not join ourself
if m.Name == s.config.NodeName {
return nil
@ -533,7 +534,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *serverParts) error {
if parts.Bootstrap {
members := s.serfLAN.Members()
for _, member := range members {
valid, p := isConsulServer(member)
valid, p := server_details.IsConsulServer(member)
if valid && member.Name != m.Name && p.Bootstrap {
s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
return nil

View File

@ -3,6 +3,7 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/serf/serf"
)
@ -24,7 +25,7 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
continue
}
ok, parts := isConsulServer(*m)
ok, parts := server_details.IsConsulServer(*m)
if ok && parts.Datacenter != md.dc {
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
m.Name, parts.Datacenter)
@ -41,7 +42,7 @@ type wanMergeDelegate struct {
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
for _, m := range members {
ok, _ := isConsulServer(*m)
ok, _ := server_details.IsConsulServer(*m)
if !ok {
return fmt.Errorf("Member '%s' is not a server", m.Name)
}

View File

@ -4,6 +4,7 @@ import (
"net"
"strings"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/serf/serf"
)
@ -140,7 +141,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
// lanNodeJoin is used to handle join events on the LAN pool.
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isConsulServer(m)
ok, parts := server_details.IsConsulServer(m)
if !ok {
continue
}
@ -163,7 +164,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {
// wanNodeJoin is used to handle join events on the WAN pool.
func (s *Server) wanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isConsulServer(m)
ok, parts := server_details.IsConsulServer(m)
if !ok {
s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name)
continue
@ -209,7 +210,7 @@ func (s *Server) maybeBootstrap() {
members := s.serfLAN.Members()
addrs := make([]string, 0)
for _, member := range members {
valid, p := isConsulServer(member)
valid, p := server_details.IsConsulServer(member)
if !valid {
continue
}
@ -247,7 +248,7 @@ func (s *Server) maybeBootstrap() {
// lanNodeFailed is used to handle fail events on the LAN pool.
func (s *Server) lanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isConsulServer(m)
ok, parts := server_details.IsConsulServer(m)
if !ok {
continue
}
@ -262,7 +263,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {
// wanNodeFailed is used to handle fail events on the WAN pool.
func (s *Server) wanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := isConsulServer(m)
ok, parts := server_details.IsConsulServer(m)
if !ok {
continue
}

View File

@ -15,6 +15,7 @@ import (
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/raft"
@ -97,7 +98,7 @@ type Server struct {
// localConsuls is used to track the known consuls
// in the local datacenter. Used to do leader forwarding.
localConsuls map[string]*serverParts
localConsuls map[string]*server_details.ServerDetails
localLock sync.RWMutex
// Logger uses the provided LogOutput
@ -119,7 +120,7 @@ type Server struct {
// remoteConsuls is used to track the known consuls in
// remote datacenters. Used to do DC forwarding.
remoteConsuls map[string][]*serverParts
remoteConsuls map[string][]*server_details.ServerDetails
remoteLock sync.RWMutex
// rpcListener is used to listen for incoming connections
@ -216,10 +217,10 @@ func NewServer(config *Config) (*Server, error) {
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
eventChLAN: make(chan serf.Event, 256),
eventChWAN: make(chan serf.Event, 256),
localConsuls: make(map[string]*serverParts),
localConsuls: make(map[string]*server_details.ServerDetails),
logger: logger,
reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*serverParts),
remoteConsuls: make(map[string][]*server_details.ServerDetails),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
tombstoneGC: gc,

View File

@ -0,0 +1,70 @@
package server_details
import (
"fmt"
"net"
"strconv"
"github.com/hashicorp/serf/serf"
)
// ServerDetails is used to return details of a consul server
type ServerDetails struct {
Name string
Datacenter string
Port int
Bootstrap bool
Expect int
Version int
Addr net.Addr
}
func (s *ServerDetails) String() string {
return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter)
}
// IsConsulServer returns true if a serf member is a consul server. Returns a
// bool and a pointer to the ServerDetails.
func IsConsulServer(m serf.Member) (bool, *ServerDetails) {
if m.Tags["role"] != "consul" {
return false, nil
}
datacenter := m.Tags["dc"]
_, bootstrap := m.Tags["bootstrap"]
expect := 0
expect_str, ok := m.Tags["expect"]
var err error
if ok {
expect, err = strconv.Atoi(expect_str)
if err != nil {
return false, nil
}
}
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str)
if err != nil {
return false, nil
}
vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
return false, nil
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &ServerDetails{
Name: m.Name,
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Version: vsn,
}
return true, parts
}

View File

@ -0,0 +1,66 @@
package server_details_test
import (
"net"
"testing"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/serf/serf"
)
func TestIsConsulServer(t *testing.T) {
m := serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
},
}
ok, parts := server_details.IsConsulServer(m)
if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 {
t.Fatalf("bad: %v %v", ok, parts)
}
if parts.Name != "foo" {
t.Fatalf("bad: %v", parts)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
if parts.Expect != 0 {
t.Fatalf("bad: %v", parts.Expect)
}
m.Tags["bootstrap"] = "1"
m.Tags["disabled"] = "1"
ok, parts = server_details.IsConsulServer(m)
if !ok {
t.Fatalf("expected a valid consul server")
}
if !parts.Bootstrap {
t.Fatalf("expected bootstrap")
}
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}
if parts.Version != 1 {
t.Fatalf("bad: %v", parts)
}
m.Tags["expect"] = "3"
delete(m.Tags, "bootstrap")
delete(m.Tags, "disabled")
ok, parts = server_details.IsConsulServer(m)
if !ok || parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
delete(m.Tags, "role")
ok, parts = server_details.IsConsulServer(m)
if ok {
t.Fatalf("unexpected ok server")
}
}

View File

@ -0,0 +1,311 @@
package server_manager
import (
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/lib"
)
type consulServerEventTypes int
const (
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
// connection is established in order to rebalance load across consul
// servers. The cluster-wide number of connections per second from
// rebalancing is applied after this jitter to ensure the CPU impact
// is always finite. See newRebalanceConnsPerSecPerServer's comment
// for additional commentary.
//
// For example, in a 10K consul cluster with 5x servers, this default
// averages out to ~13 new connections from rebalancing per server
// per second (each connection is reused for 120s to 180s).
clientRPCJitterFraction = 2
// clientRPCMinReuseDuration controls the minimum amount of time RPC
// queries are sent over an established connection to a single server
clientRPCMinReuseDuration = 120 * time.Second
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
// lower value comes at the cost of increased recovery time after a
// partition. This parameter begins to take effect when there are
// more than ~48K clients querying 5x servers or at lower server
// values when there is a partition.
//
// For example, in a 100K consul cluster with 5x servers, it will
// take ~5min for all servers to rebalance their connections. If
// 99,995 agents are in the minority talking to only one server, it
// will take ~26min for all servers to rebalance. A 10K cluster in
// the same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
)
type ConsulClusterInfo interface {
NumNodes() int
}
// serverCfg is the thread-safe configuration struct used to maintain the
// list of Consul servers in ServerManager.
//
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will
// be copied onto the stack. Please keep this structure light.
type serverConfig struct {
// servers tracks the locally known servers. List membership is
// maintained by Serf.
servers []*server_details.ServerDetails
}
type ServerManager struct {
// serverConfig provides the necessary load/store semantics for the
// server list.
serverConfigValue atomic.Value
serverConfigLock sync.Mutex
// shutdownCh is a copy of the channel in consul.Client
shutdownCh chan struct{}
logger *log.Logger
// clusterInfo is used to estimate the approximate number of nodes in
// a cluster and limit the rate at which it rebalances server
// connections. ConsulClusterInfo is an interface that wraps serf.
clusterInfo ConsulClusterInfo
// notifyFailedServersBarrier is acts as a barrier to prevent
// queueing behind serverConfigLog and acts as a TryLock().
notifyFailedBarrier int32
}
// AddServer takes out an internal write lock and adds a new server. If the
// server is not known, appends the server to the list. The new server will
// begin seeing use after the rebalance timer fires or enough servers fail
// organically. If the server is already known, merge the new server
// details.
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
serverCfg := sm.getServerConfig()
// Check if this server is known
found := false
for idx, existing := range serverCfg.servers {
if existing.Name == server.Name {
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers))
copy(newServers, serverCfg.servers)
// Overwrite the existing server details in order to
// possibly update metadata (e.g. server version)
newServers[idx] = server
serverCfg.servers = newServers
found = true
break
}
}
// Add to the list if not known
if !found {
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1)
copy(newServers, serverCfg.servers)
newServers = append(newServers, server)
serverCfg.servers = newServers
}
sm.saveServerConfig(serverCfg)
}
// cycleServers returns a new list of servers that has dequeued the first
// server and enqueued it at the end of the list. cycleServers assumes the
// caller is holding the serverConfigLock.
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
numServers := len(sc.servers)
if numServers < 2 {
return servers // No action required
}
newServers := make([]*server_details.ServerDetails, 0, numServers)
newServers = append(newServers, sc.servers[1:]...)
newServers = append(newServers, sc.servers[0])
return newServers
}
// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
// server list. If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list. If there are no
// servers available, return nil.
func (sm *ServerManager) FindServer() *server_details.ServerDetails {
serverCfg := sm.getServerConfig()
numServers := len(serverCfg.servers)
if numServers == 0 {
sm.logger.Printf("[WARN] consul: No servers available")
return nil
} else {
// Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
return serverCfg.servers[0]
}
}
// getServerConfig is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (sm *ServerManager) getServerConfig() serverConfig {
return sm.serverConfigValue.Load().(serverConfig)
}
// saveServerConfig is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (sm *ServerManager) saveServerConfig(sc serverConfig) {
sm.serverConfigValue.Store(sc)
}
// New is the only way to safely create a new ServerManager struct.
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
// NOTE(sean@): Can't pass *consul.Client due to an import cycle
sm = new(ServerManager)
sm.logger = logger
sm.clusterInfo = clusterInfo
sm.shutdownCh = shutdownCh
sc := serverConfig{}
sc.servers = make([]*server_details.ServerDetails, 0)
sm.saveServerConfig(sc)
return sm
}
// NotifyFailedServer marks the passed in server as "failed" by rotating it
// to the end of the server list.
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
serverCfg := sm.getServerConfig()
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, acquire the lock, retest, and take the penalty of moving
// the server to the end of the list.
// Only rotate the server list when there is more than one server
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server &&
// Use atomic.CAS to emulate a TryLock().
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
// Grab a lock, retest, and take the hit of cycling the first
// server to the end.
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
serverCfg = sm.getServerConfig()
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server {
serverCfg.servers = serverCfg.cycleServer()
sm.saveServerConfig(serverCfg)
}
}
}
// NumServers takes out an internal "read lock" and returns the number of
// servers. numServers includes both healthy and unhealthy servers.
func (sm *ServerManager) NumServers() (numServers int) {
serverCfg := sm.getServerConfig()
numServers = len(serverCfg.servers)
return numServers
}
// RebalanceServers takes out an internal write lock and shuffles the list of
// servers on this agent. This allows for a redistribution of work across
// consul servers and provides a guarantee that the order of the server list
// isn't related to the age at which the node was added to the cluster.
// Elsewhere we rely on the position in the server list as a hint regarding
// the stability of a server relative to its position in the server list.
// Servers at or near the front of the list are more stable than servers near
// the end of the list. Unhealthy servers are removed when serf notices the
// server has been deregistered.
func (sm *ServerManager) RebalanceServers() {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
serverCfg := sm.getServerConfig()
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers))
copy(newServers, serverCfg.servers)
// Shuffle the server list
for i := len(serverCfg.servers) - 1; i > 0; i-- {
j := rand.Int31n(int32(i + 1))
newServers[i], newServers[j] = newServers[j], newServers[i]
}
serverCfg.servers = newServers
sm.saveServerConfig(serverCfg)
}
// RemoveServer takes out an internal write lock and removes a server from
// the server list.
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
serverCfg := sm.getServerConfig()
// Remove the server if known
for i, _ := range serverCfg.servers {
if serverCfg.servers[i].Name == server.Name {
newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1)
newServers = append(newServers, serverCfg.servers[:i]...)
newServers = append(newServers, serverCfg.servers[i+1:]...)
serverCfg.servers = newServers
sm.saveServerConfig(serverCfg)
return
}
}
}
// refreshServerRebalanceTimer is only called once the rebalanceTimer
// expires. Historically this was an expensive routine and is intended to be
// run in isolation in a dedicated, non-concurrent task.
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration {
serverCfg := sm.getServerConfig()
numConsulServers := len(serverCfg.servers)
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers := sm.clusterInfo.NumNodes()
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
timer.Reset(connRebalanceTimeout)
return connRebalanceTimeout
}
// Start is used to start and manage the task of automatically shuffling and
// rebalancing the list of consul servers. This maintenance only happens
// periodically based on the expiration of the timer. Failed servers are
// automatically cycled to the end of the list. New servers are appended to
// the list. The order of the server list must be shuffled periodically to
// distribute load across all known and available consul servers.
func (sm *ServerManager) Start() {
var rebalanceTimer *time.Timer = time.NewTimer(clientRPCMinReuseDuration)
for {
select {
case <-rebalanceTimer.C:
sm.logger.Printf("[INFO] server manager: Rebalancing server connections")
sm.RebalanceServers()
sm.refreshServerRebalanceTimer(rebalanceTimer)
case <-sm.shutdownCh:
sm.logger.Printf("[INFO] server manager: shutting down")
return
}
}
}

View File

@ -0,0 +1,225 @@
package server_manager
import (
"bytes"
"fmt"
"log"
"os"
"testing"
"time"
"github.com/hashicorp/consul/consul/server_details"
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
type fauxSerf struct {
numNodes int
}
func (s *fauxSerf) NumNodes() int {
return s.numNodes
}
func testServerManager() (sm *ServerManager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384})
return sm
}
// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
func TestServerManagerInternal_cycleServer(t *testing.T) {
sm := testServerManager()
sc := sm.getServerConfig()
server0 := &server_details.ServerDetails{Name: "server1"}
server1 := &server_details.ServerDetails{Name: "server2"}
server2 := &server_details.ServerDetails{Name: "server3"}
sc.servers = append(sc.servers, server0, server1, server2)
sm.saveServerConfig(sc)
sc = sm.getServerConfig()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server0 &&
sc.servers[1] != server1 &&
sc.servers[2] != server2 {
t.Fatalf("initial server ordering not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server1 &&
sc.servers[1] != server2 &&
sc.servers[2] != server0 {
t.Fatalf("server ordering after one cycle not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server2 &&
sc.servers[1] != server0 &&
sc.servers[2] != server1 {
t.Fatalf("server ordering after two cycles not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server0 &&
sc.servers[1] != server1 &&
sc.servers[2] != server2 {
t.Fatalf("server ordering after three cycles not correct")
}
}
// func (sm *ServerManager) getServerConfig() serverConfig {
func TestServerManagerInternal_getServerConfig(t *testing.T) {
sm := testServerManager()
sc := sm.getServerConfig()
if sc.servers == nil {
t.Fatalf("serverConfig.servers nil")
}
if len(sc.servers) != 0 {
t.Fatalf("serverConfig.servers length not zero")
}
}
// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
func TestServerManagerInternal_New(t *testing.T) {
sm := testServerManager()
if sm == nil {
t.Fatalf("ServerManager nil")
}
if sm.clusterInfo == nil {
t.Fatalf("ServerManager.clusterInfo nil")
}
if sm.logger == nil {
t.Fatalf("ServerManager.logger nil")
}
if sm.shutdownCh == nil {
t.Fatalf("ServerManager.shutdownCh nil")
}
}
// func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) {
func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
sm := testServerManager()
timer := time.NewTimer(time.Duration(1 * time.Nanosecond))
time.Sleep(1 * time.Millisecond)
sm.refreshServerRebalanceTimer(timer)
logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
type clusterSizes struct {
numNodes int
numServers int
minRebalance time.Duration
}
clusters := []clusterSizes{
{0, 3, 2 * time.Minute},
{1, 0, 2 * time.Minute}, // partitioned cluster
{1, 3, 2 * time.Minute},
{2, 3, 2 * time.Minute},
{100, 0, 2 * time.Minute}, // partitioned
{100, 1, 2 * time.Minute}, // partitioned
{100, 3, 2 * time.Minute},
{1024, 1, 2 * time.Minute}, // partitioned
{1024, 3, 2 * time.Minute}, // partitioned
{1024, 5, 2 * time.Minute},
{16384, 1, 4 * time.Minute}, // partitioned
{16384, 2, 2 * time.Minute}, // partitioned
{16384, 3, 2 * time.Minute}, // partitioned
{16384, 5, 2 * time.Minute},
{65535, 0, 2 * time.Minute}, // partitioned
{65535, 1, 8 * time.Minute}, // partitioned
{65535, 2, 3 * time.Minute}, // partitioned
{65535, 3, 5 * time.Minute}, // partitioned
{65535, 5, 3 * time.Minute}, // partitioned
{65535, 7, 2 * time.Minute},
{1000000, 1, 4 * time.Hour}, // partitioned
{1000000, 2, 2 * time.Hour}, // partitioned
{1000000, 3, 80 * time.Minute}, // partitioned
{1000000, 5, 50 * time.Minute}, // partitioned
{1000000, 11, 20 * time.Minute}, // partitioned
{1000000, 19, 10 * time.Minute},
}
for _, s := range clusters {
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes})
for i := 0; i < s.numServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
}
d := sm.refreshServerRebalanceTimer(timer)
if d < s.minRebalance {
t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance)
}
}
}
// func (sm *ServerManager) saveServerConfig(sc serverConfig) {
func TestServerManagerInternal_saveServerConfig(t *testing.T) {
sm := testServerManager()
// Initial condition
func() {
sc := sm.getServerConfig()
if len(sc.servers) != 0 {
t.Fatalf("ServerManager.saveServerConfig failed to load init config")
}
newServer := new(server_details.ServerDetails)
sc.servers = append(sc.servers, newServer)
sm.saveServerConfig(sc)
}()
// Test that save works
func() {
sc1 := sm.getServerConfig()
t1NumServers := len(sc1.servers)
if t1NumServers != 1 {
t.Fatalf("ServerManager.saveServerConfig failed to save mutated config")
}
}()
// Verify mutation w/o a save doesn't alter the original
func() {
newServer := new(server_details.ServerDetails)
sc := sm.getServerConfig()
sc.servers = append(sc.servers, newServer)
sc_orig := sm.getServerConfig()
origNumServers := len(sc_orig.servers)
if origNumServers >= len(sc.servers) {
t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original")
}
}()
}

View File

@ -0,0 +1,359 @@
package server_manager_test
import (
"bytes"
"fmt"
"log"
"os"
"strings"
"testing"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/consul/server_manager"
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
type fauxSerf struct {
}
func (s *fauxSerf) NumNodes() int {
return 16384
}
func testServerManager() (sm *server_manager.ServerManager) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
sm = server_manager.New(logger, shutdownCh, &fauxSerf{})
return sm
}
// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
func TestServerManager_AddServer(t *testing.T) {
sm := testServerManager()
var num int
num = sm.NumServers()
if num != 0 {
t.Fatalf("Expected zero servers to start")
}
s1 := &server_details.ServerDetails{Name: "s1"}
sm.AddServer(s1)
num = sm.NumServers()
if num != 1 {
t.Fatalf("Expected one server")
}
sm.AddServer(s1)
num = sm.NumServers()
if num != 1 {
t.Fatalf("Expected one server (still)")
}
s2 := &server_details.ServerDetails{Name: "s2"}
sm.AddServer(s2)
num = sm.NumServers()
if num != 2 {
t.Fatalf("Expected two servers")
}
}
// func (sm *ServerManager) FindServer() (server *server_details.ServerDetails) {
func TestServerManager_FindServer(t *testing.T) {
sm := testServerManager()
if sm.FindServer() != nil {
t.Fatalf("Expected nil return")
}
sm.AddServer(&server_details.ServerDetails{Name: "s1"})
if sm.NumServers() != 1 {
t.Fatalf("Expected one server")
}
s1 := sm.FindServer()
if s1 == nil {
t.Fatalf("Expected non-nil server")
}
if s1.Name != "s1" {
t.Fatalf("Expected s1 server")
}
s1 = sm.FindServer()
if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server (still)")
}
sm.AddServer(&server_details.ServerDetails{Name: "s2"})
if sm.NumServers() != 2 {
t.Fatalf("Expected two servers")
}
s1 = sm.FindServer()
if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server (still)")
}
sm.NotifyFailedServer(s1)
s2 := sm.FindServer()
if s2 == nil || s2.Name != "s2" {
t.Fatalf("Expected s2 server")
}
sm.NotifyFailedServer(s2)
s1 = sm.FindServer()
if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server")
}
}
// func New(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
func TestServerManager_New(t *testing.T) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
sm := server_manager.New(logger, shutdownCh, &fauxSerf{})
if sm == nil {
t.Fatalf("ServerManager nil")
}
}
// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
func TestServerManager_NotifyFailedServer(t *testing.T) {
sm := testServerManager()
if sm.NumServers() != 0 {
t.Fatalf("Expected zero servers to start")
}
s1 := &server_details.ServerDetails{Name: "s1"}
s2 := &server_details.ServerDetails{Name: "s2"}
// Try notifying for a server that is not part of the server manager
sm.NotifyFailedServer(s1)
if sm.NumServers() != 0 {
t.Fatalf("Expected zero servers to start")
}
sm.AddServer(s1)
// Test again w/ a server not in the list
sm.NotifyFailedServer(s2)
if sm.NumServers() != 1 {
t.Fatalf("Expected one server")
}
sm.AddServer(s2)
if sm.NumServers() != 2 {
t.Fatalf("Expected two servers")
}
s1 = sm.FindServer()
if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server")
}
sm.NotifyFailedServer(s2)
s1 = sm.FindServer()
if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server (still)")
}
sm.NotifyFailedServer(s1)
s2 = sm.FindServer()
if s2 == nil || s2.Name != "s2" {
t.Fatalf("Expected s2 server")
}
sm.NotifyFailedServer(s2)
s1 = sm.FindServer()
if s1 == nil || s1.Name != "s1" {
t.Fatalf("Expected s1 server")
}
}
// func (sm *ServerManager) NumServers() (numServers int) {
func TestServerManager_NumServers(t *testing.T) {
sm := testServerManager()
var num int
num = sm.NumServers()
if num != 0 {
t.Fatalf("Expected zero servers to start")
}
s := &server_details.ServerDetails{}
sm.AddServer(s)
num = sm.NumServers()
if num != 1 {
t.Fatalf("Expected one server after AddServer")
}
}
// func (sm *ServerManager) RebalanceServers() {
func TestServerManager_RebalanceServers(t *testing.T) {
sm := testServerManager()
const maxServers = 100
const numShuffleTests = 100
const uniquePassRate = 0.5
// Make a huge list of nodes.
for i := 0; i < maxServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
}
// Keep track of how many unique shuffles we get.
uniques := make(map[string]struct{}, maxServers)
for i := 0; i < numShuffleTests; i++ {
sm.RebalanceServers()
var names []string
for j := 0; j < maxServers; j++ {
server := sm.FindServer()
sm.NotifyFailedServer(server)
names = append(names, server.Name)
}
key := strings.Join(names, "|")
uniques[key] = struct{}{}
}
// We have to allow for the fact that there won't always be a unique
// shuffle each pass, so we just look for smell here without the test
// being flaky.
if len(uniques) < int(maxServers*uniquePassRate) {
t.Fatalf("unique shuffle ratio too low: %d/%d", len(uniques), maxServers)
}
}
// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
func TestServerManager_RemoveServer(t *testing.T) {
const nodeNameFmt = "s%02d"
sm := testServerManager()
if sm.NumServers() != 0 {
t.Fatalf("Expected zero servers to start")
}
// Test removing server before its added
nodeName := fmt.Sprintf(nodeNameFmt, 1)
s1 := &server_details.ServerDetails{Name: nodeName}
sm.RemoveServer(s1)
sm.AddServer(s1)
nodeName = fmt.Sprintf(nodeNameFmt, 2)
s2 := &server_details.ServerDetails{Name: nodeName}
sm.RemoveServer(s2)
sm.AddServer(s2)
const maxServers = 19
servers := make([]*server_details.ServerDetails, maxServers)
// Already added two servers above
for i := maxServers; i > 2; i-- {
nodeName := fmt.Sprintf(nodeNameFmt, i)
server := &server_details.ServerDetails{Name: nodeName}
servers = append(servers, server)
sm.AddServer(server)
}
sm.RebalanceServers()
if sm.NumServers() != maxServers {
t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers())
}
findServer := func(server *server_details.ServerDetails) bool {
for i := sm.NumServers(); i > 0; i-- {
s := sm.FindServer()
if s == server {
return true
}
}
return false
}
expectedNumServers := maxServers
removedServers := make([]*server_details.ServerDetails, 0, maxServers)
// Remove servers from the front of the list
for i := 3; i > 0; i-- {
server := sm.FindServer()
if server == nil {
t.Fatalf("FindServer returned nil")
}
sm.RemoveServer(server)
expectedNumServers--
if sm.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers())
}
if findServer(server) == true {
t.Fatalf("Did not expect to find server %s after removal from the front", server.Name)
}
removedServers = append(removedServers, server)
}
// Remove server from the end of the list
for i := 3; i > 0; i-- {
server := sm.FindServer()
sm.NotifyFailedServer(server)
sm.RemoveServer(server)
expectedNumServers--
if sm.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers())
}
if findServer(server) == true {
t.Fatalf("Did not expect to find server %s", server.Name)
}
removedServers = append(removedServers, server)
}
// Remove server from the middle of the list
for i := 3; i > 0; i-- {
server := sm.FindServer()
sm.NotifyFailedServer(server)
server2 := sm.FindServer()
sm.NotifyFailedServer(server2) // server2 now at end of the list
sm.RemoveServer(server)
expectedNumServers--
if sm.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers())
}
if findServer(server) == true {
t.Fatalf("Did not expect to find server %s", server.Name)
}
removedServers = append(removedServers, server)
}
if sm.NumServers()+len(removedServers) != maxServers {
t.Fatalf("Expected %d+%d=%d servers", sm.NumServers(), len(removedServers), maxServers)
}
// Drain the remaining servers from the middle
for i := sm.NumServers(); i > 0; i-- {
server := sm.FindServer()
sm.NotifyFailedServer(server)
server2 := sm.FindServer()
sm.NotifyFailedServer(server2) // server2 now at end of the list
sm.RemoveServer(server)
removedServers = append(removedServers, server)
}
if sm.NumServers() != 0 {
t.Fatalf("Expected an empty server list")
}
if len(removedServers) != maxServers {
t.Fatalf("Expected all servers to be in removed server list")
}
}
// func (sm *ServerManager) Start() {

View File

@ -410,7 +410,7 @@ type CheckServiceNodes []CheckServiceNode
// Shuffle does an in-place random shuffle using the Fisher-Yates algorithm.
func (nodes CheckServiceNodes) Shuffle() {
for i := len(nodes) - 1; i > 0; i-- {
j := rand.Int31() % int32(i+1)
j := rand.Int31n(int32(i + 1))
nodes[i], nodes[j] = nodes[j], nodes[i]
}
}

View File

@ -23,21 +23,6 @@ import (
*/
var privateBlocks []*net.IPNet
// serverparts is used to return the parts of a server role
type serverParts struct {
Name string
Datacenter string
Port int
Bootstrap bool
Expect int
Version int
Addr net.Addr
}
func (s *serverParts) String() string {
return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter)
}
func init() {
// Add each private block
privateBlocks = make([]*net.IPNet, 6)
@ -116,52 +101,6 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e
return (numServers > 0) && (numWhoGrok == numServers), nil
}
// Returns if a member is a consul server. Returns a bool,
// the datacenter, and the rpc port
func isConsulServer(m serf.Member) (bool, *serverParts) {
if m.Tags["role"] != "consul" {
return false, nil
}
datacenter := m.Tags["dc"]
_, bootstrap := m.Tags["bootstrap"]
expect := 0
expect_str, ok := m.Tags["expect"]
var err error
if ok {
expect, err = strconv.Atoi(expect_str)
if err != nil {
return false, nil
}
}
port_str := m.Tags["port"]
port, err := strconv.Atoi(port_str)
if err != nil {
return false, nil
}
vsn_str := m.Tags["vsn"]
vsn, err := strconv.Atoi(vsn_str)
if err != nil {
return false, nil
}
addr := &net.TCPAddr{IP: m.Addr, Port: port}
parts := &serverParts{
Name: m.Name,
Datacenter: datacenter,
Port: port,
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,
Version: vsn,
}
return true, parts
}
// Returns if a member is a consul node. Returns a bool,
// and the datacenter.
func isConsulNode(m serf.Member) (bool, string) {

View File

@ -196,49 +196,6 @@ func TestUtil_CanServersUnderstandProtocol(t *testing.T) {
}
}
func TestIsConsulServer(t *testing.T) {
m := serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"dc": "east-aws",
"port": "10000",
"vsn": "1",
},
}
valid, parts := isConsulServer(m)
if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 {
t.Fatalf("bad: %v %v", valid, parts)
}
if parts.Name != "foo" {
t.Fatalf("bad: %v", parts)
}
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
if parts.Expect != 0 {
t.Fatalf("bad: %v", parts.Expect)
}
m.Tags["bootstrap"] = "1"
valid, parts = isConsulServer(m)
if !valid || !parts.Bootstrap {
t.Fatalf("expected bootstrap")
}
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}
if parts.Version != 1 {
t.Fatalf("bad: %v", parts)
}
m.Tags["expect"] = "3"
delete(m.Tags, "bootstrap")
valid, parts = isConsulServer(m)
if !valid || parts.Expect != 3 {
t.Fatalf("bad: %v", parts.Expect)
}
}
func TestIsConsulNode(t *testing.T) {
m := serf.Member{
Tags: map[string]string{

View File

@ -14,6 +14,10 @@ func RandomStagger(intv time.Duration) time.Duration {
// order to target an aggregate number of actions per second across the whole
// cluster.
func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
const minRate = 1 / 86400 // 1/(1 * time.Day)
if rate <= minRate {
return min
}
interval := time.Duration(float64(time.Second) * float64(n) / rate)
if interval < min {
return min

View File

@ -16,7 +16,7 @@ func TestRandomStagger(t *testing.T) {
}
func TestRateScaledInterval(t *testing.T) {
min := 1 * time.Second
const min = 1 * time.Second
rate := 200.0
if v := RateScaledInterval(rate, min, 0); v != min {
t.Fatalf("Bad: %v", v)
@ -36,4 +36,13 @@ func TestRateScaledInterval(t *testing.T) {
if v := RateScaledInterval(rate, min, 10000); v != 50*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(0, min, 10000); v != min {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(0.0, min, 10000); v != min {
t.Fatalf("Bad: %v", v)
}
if v := RateScaledInterval(-1, min, 10000); v != min {
t.Fatalf("Bad: %v", v)
}
}

View File

@ -12,4 +12,4 @@ go build -o $TEMPDIR/consul || exit 1
# Run the tests
echo "--> Running tests"
go list ./... | grep -v ^github.com/hashicorp/consul/vendor/ | PATH=$TEMPDIR:$PATH xargs -n1 go test
go list ./... | grep -v ^github.com/hashicorp/consul/vendor/ | PATH=$TEMPDIR:$PATH xargs -n1 go test ${GOTEST_FLAGS:-}

View File

@ -1,27 +0,0 @@
package coordinate
import (
"math"
"testing"
)
// verifyEqualFloats will compare f1 and f2 and fail if they are not
// "equal" within a threshold.
func verifyEqualFloats(t *testing.T, f1 float64, f2 float64) {
const zeroThreshold = 1.0e-6
if math.Abs(f1-f2) > zeroThreshold {
t.Fatalf("equal assertion fail, %9.6f != %9.6f", f1, f2)
}
}
// verifyEqualVectors will compare vec1 and vec2 and fail if they are not
// "equal" within a threshold.
func verifyEqualVectors(t *testing.T, vec1 []float64, vec2 []float64) {
if len(vec1) != len(vec2) {
t.Fatalf("vector length mismatch, %d != %d", len(vec1), len(vec2))
}
for i, _ := range vec1 {
verifyEqualFloats(t, vec1[i], vec2[i])
}
}

View File

@ -214,8 +214,8 @@ type queries struct {
}
const (
UserEventSizeLimit = 512 // Maximum byte size for event name and payload
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
UserEventSizeLimit = 512 // Maximum byte size for event name and payload
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
)
// Create creates a new Serf instance, starting all the background tasks
@ -1680,3 +1680,13 @@ func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, o
return nil, false
}
// NumNodes returns the number of nodes in the serf cluster, regardless of
// their health or status.
func (s *Serf) NumNodes() (numNodes int) {
s.memberLock.RLock()
numNodes = len(s.members)
s.memberLock.RUnlock()
return numNodes
}