diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a35efd364..c20811a7c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ ## 0.7.0 (UNRELEASED) +IMPROVEMENTS: + +* Consul agents will now periodically reconnect to available Consul servers + in order to redistribute their RPC query load. Consul clients will, by + default, attempt to establish a new connection every 120s to 180s unless + the size of the cluster is sufficiently large. The rate at which agents + begin to query new servers is proportional to the size of the Consul + cluster (servers should never receive more than 64 new connections per + second per Consul server as a result of rebalancing). Clusters in stable + environments who use `allow_stale` should see a more even distribution of + query load across all of their Consul servers. [GH-1743] + ## 0.6.4 (March 16, 2016) BACKWARDS INCOMPATIBILITIES: diff --git a/command/agent/config.go b/command/agent/config.go index ba72ddeba1..6d4ebf2bd8 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -360,7 +360,7 @@ type Config struct { // * deny - Deny all requests // * extend-cache - Ignore the cache expiration, and allow cached // ACL's to be used to service requests. This - // is the default. If the ACL is not in the cache, + // is the default. If the ACL is not in the cache, // this acts like deny. ACLDownPolicy string `mapstructure:"acl_down_policy"` diff --git a/consul/client.go b/consul/client.go index b3fdb080e8..0b536a8d52 100644 --- a/consul/client.go +++ b/consul/client.go @@ -3,7 +3,6 @@ package consul import ( "fmt" "log" - "math/rand" "os" "path/filepath" "strconv" @@ -11,19 +10,35 @@ import ( "sync" "time" + "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/server_manager" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) const ( - // clientRPCCache controls how long we keep an idle connection - // open to a server - clientRPCCache = 30 * time.Second + // clientRPCConnMaxIdle controls how long we keep an idle connection + // open to a server. 127s was chosen as the first prime above 120s + // (arbitrarily chose to use a prime) with the intent of reusing + // connections who are used by once-a-minute cron(8) jobs *and* who + // use a 60s jitter window (e.g. in vixie cron job execution can + // drift by up to 59s per job, or 119s for a once-a-minute cron job). + clientRPCConnMaxIdle = 127 * time.Second // clientMaxStreams controls how many idle streams we keep // open to a server clientMaxStreams = 32 + + // serfEventBacklog is the maximum number of unprocessed Serf Events + // that will be held in queue before new serf events block. A + // blocking serf event queue is a bad thing. + serfEventBacklog = 256 + + // serfEventBacklogWarning is the threshold at which point log + // warnings will be emitted indicating a problem when processing serf + // events. + serfEventBacklogWarning = 200 ) // Interface is used to provide either a Client or Server, @@ -43,19 +58,14 @@ type Client struct { // Connection pool to consul servers connPool *ConnPool - // consuls tracks the locally known servers - consuls []*serverParts - consulLock sync.RWMutex + // serverMgr is responsible for the selection and maintenance of + // Consul servers this agent uses for RPC requests + serverMgr *server_manager.ServerManager // eventCh is used to receive events from the // serf cluster in the datacenter eventCh chan serf.Event - // lastServer is the last server we made an RPC call to, - // this is used to re-use the last connection - lastServer *serverParts - lastRPCTime time.Time - // Logger uses the provided LogOutput logger *log.Logger @@ -103,12 +113,17 @@ func NewClient(config *Config) (*Client, error) { // Create server c := &Client{ config: config, - connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), - eventCh: make(chan serf.Event, 256), + connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap), + eventCh: make(chan serf.Event, serfEventBacklog), logger: logger, shutdownCh: make(chan struct{}), } + c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf) + + // Start maintenance task for serverMgr + go c.serverMgr.Start() + // Start the Serf listeners to prevent a deadlock go c.lanEventHandler() @@ -215,7 +230,13 @@ func (c *Client) Encrypted() bool { // lanEventHandler is used to handle events from the lan Serf cluster func (c *Client) lanEventHandler() { + var numQueuedEvents int for { + numQueuedEvents = len(c.eventCh) + if numQueuedEvents > serfEventBacklogWarning { + c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning) + } + select { case e := <-c.eventCh: switch e.EventType() { @@ -240,7 +261,7 @@ func (c *Client) lanEventHandler() { // nodeJoin is used to handle join events on the serf cluster func (c *Client) nodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := isConsulServer(m) + ok, parts := server_details.IsConsulServer(m) if !ok { continue } @@ -250,23 +271,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { continue } c.logger.Printf("[INFO] consul: adding server %s", parts) - - // Check if this server is known - found := false - c.consulLock.Lock() - for idx, existing := range c.consuls { - if existing.Name == parts.Name { - c.consuls[idx] = parts - found = true - break - } - } - - // Add to the list if not known - if !found { - c.consuls = append(c.consuls, parts) - } - c.consulLock.Unlock() + c.serverMgr.AddServer(parts) // Trigger the callback if c.config.ServerUp != nil { @@ -278,23 +283,12 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { // nodeFail is used to handle fail events on the serf cluster func (c *Client) nodeFail(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := isConsulServer(m) + ok, parts := server_details.IsConsulServer(m) if !ok { continue } c.logger.Printf("[INFO] consul: removing server %s", parts) - - // Remove the server if known - c.consulLock.Lock() - n := len(c.consuls) - for i := 0; i < n; i++ { - if c.consuls[i].Name == parts.Name { - c.consuls[i], c.consuls[n-1] = c.consuls[n-1], nil - c.consuls = c.consuls[:n-1] - break - } - } - c.consulLock.Unlock() + c.serverMgr.RemoveServer(parts) } } @@ -328,50 +322,33 @@ func (c *Client) localEvent(event serf.UserEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { - // Check the last rpc time - var server *serverParts - if time.Now().Sub(c.lastRPCTime) < clientRPCCache { - server = c.lastServer - if server != nil { - goto TRY_RPC - } - } - - // Bail if we can't find any servers - c.consulLock.RLock() - if len(c.consuls) == 0 { - c.consulLock.RUnlock() + server := c.serverMgr.FindServer() + if server == nil { return structs.ErrNoServers } - // Select a random addr - server = c.consuls[rand.Int31()%int32(len(c.consuls))] - c.consulLock.RUnlock() - // Forward to remote Consul -TRY_RPC: if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { - c.lastServer = nil - c.lastRPCTime = time.Time{} + c.serverMgr.NotifyFailedServer(server) + c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) return err } - // Cache the last server - c.lastServer = server - c.lastRPCTime = time.Now() return nil } // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { + numServers := c.serverMgr.NumServers() + toString := func(v uint64) string { return strconv.FormatUint(v, 10) } stats := map[string]map[string]string{ "consul": map[string]string{ "server": "false", - "known_servers": toString(uint64(len(c.consuls))), + "known_servers": toString(uint64(numServers)), }, "serf_lan": c.serf.Stats(), "runtime": runtimeStats(), diff --git a/consul/client_test.go b/consul/client_test.go index 02a8db0bc8..5fe6194e35 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -83,6 +83,11 @@ func TestClient_JoinLAN(t *testing.T) { if _, err := c1.JoinLAN([]string{addr}); err != nil { t.Fatalf("err: %v", err) } + testutil.WaitForResult(func() (bool, error) { + return c1.serverMgr.NumServers() == 1, nil + }, func(err error) { + t.Fatalf("expected consul server") + }) // Check the members testutil.WaitForResult(func() (bool, error) { @@ -95,7 +100,7 @@ func TestClient_JoinLAN(t *testing.T) { // Check we have a new consul testutil.WaitForResult(func() (bool, error) { - return len(c1.consuls) == 1, nil + return c1.serverMgr.NumServers() == 1, nil }, func(err error) { t.Fatalf("expected consul server") }) diff --git a/consul/leader.go b/consul/leader.go index 55c487b4fd..3f4bc14844 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -8,6 +8,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -349,7 +350,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { return true } - if valid, parts := isConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { + if valid, parts := server_details.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { return true } return false @@ -360,7 +361,7 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { func (s *Server) handleAliveMember(member serf.Member) error { // Register consul service if a server var service *structs.NodeService - if valid, parts := isConsulServer(member); valid { + if valid, parts := server_details.IsConsulServer(member); valid { service = &structs.NodeService{ ID: ConsulServiceID, Service: ConsulServiceName, @@ -496,7 +497,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } // Remove from Raft peers if this was a server - if valid, parts := isConsulServer(member); valid { + if valid, parts := server_details.IsConsulServer(member); valid { if err := s.removeConsulServer(member, parts.Port); err != nil { return err } @@ -523,7 +524,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error } // joinConsulServer is used to try to join another consul server -func (s *Server) joinConsulServer(m serf.Member, parts *serverParts) error { +func (s *Server) joinConsulServer(m serf.Member, parts *server_details.ServerDetails) error { // Do not join ourself if m.Name == s.config.NodeName { return nil @@ -533,7 +534,7 @@ func (s *Server) joinConsulServer(m serf.Member, parts *serverParts) error { if parts.Bootstrap { members := s.serfLAN.Members() for _, member := range members { - valid, p := isConsulServer(member) + valid, p := server_details.IsConsulServer(member) if valid && member.Name != m.Name && p.Bootstrap { s.logger.Printf("[ERR] consul: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name) return nil diff --git a/consul/merge.go b/consul/merge.go index c61b79f418..ed86c69843 100644 --- a/consul/merge.go +++ b/consul/merge.go @@ -3,6 +3,7 @@ package consul import ( "fmt" + "github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/serf/serf" ) @@ -24,7 +25,7 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { continue } - ok, parts := isConsulServer(*m) + ok, parts := server_details.IsConsulServer(*m) if ok && parts.Datacenter != md.dc { return fmt.Errorf("Member '%s' part of wrong datacenter '%s'", m.Name, parts.Datacenter) @@ -41,7 +42,7 @@ type wanMergeDelegate struct { func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error { for _, m := range members { - ok, _ := isConsulServer(*m) + ok, _ := server_details.IsConsulServer(*m) if !ok { return fmt.Errorf("Member '%s' is not a server", m.Name) } diff --git a/consul/serf.go b/consul/serf.go index 2a2cb6944e..d9f27309c0 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -4,6 +4,7 @@ import ( "net" "strings" + "github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/serf/serf" ) @@ -140,7 +141,7 @@ func (s *Server) localEvent(event serf.UserEvent) { // lanNodeJoin is used to handle join events on the LAN pool. func (s *Server) lanNodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := isConsulServer(m) + ok, parts := server_details.IsConsulServer(m) if !ok { continue } @@ -163,7 +164,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) { // wanNodeJoin is used to handle join events on the WAN pool. func (s *Server) wanNodeJoin(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := isConsulServer(m) + ok, parts := server_details.IsConsulServer(m) if !ok { s.logger.Printf("[WARN] consul: non-server in WAN pool: %s", m.Name) continue @@ -209,7 +210,7 @@ func (s *Server) maybeBootstrap() { members := s.serfLAN.Members() addrs := make([]string, 0) for _, member := range members { - valid, p := isConsulServer(member) + valid, p := server_details.IsConsulServer(member) if !valid { continue } @@ -247,7 +248,7 @@ func (s *Server) maybeBootstrap() { // lanNodeFailed is used to handle fail events on the LAN pool. func (s *Server) lanNodeFailed(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := isConsulServer(m) + ok, parts := server_details.IsConsulServer(m) if !ok { continue } @@ -262,7 +263,7 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) { // wanNodeFailed is used to handle fail events on the WAN pool. func (s *Server) wanNodeFailed(me serf.MemberEvent) { for _, m := range me.Members { - ok, parts := isConsulServer(m) + ok, parts := server_details.IsConsulServer(m) if !ok { continue } diff --git a/consul/server.go b/consul/server.go index b9adc88337..347cc6f26a 100644 --- a/consul/server.go +++ b/consul/server.go @@ -15,6 +15,7 @@ import ( "time" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/consul/server_details" "github.com/hashicorp/consul/consul/state" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/raft" @@ -97,7 +98,7 @@ type Server struct { // localConsuls is used to track the known consuls // in the local datacenter. Used to do leader forwarding. - localConsuls map[string]*serverParts + localConsuls map[string]*server_details.ServerDetails localLock sync.RWMutex // Logger uses the provided LogOutput @@ -119,7 +120,7 @@ type Server struct { // remoteConsuls is used to track the known consuls in // remote datacenters. Used to do DC forwarding. - remoteConsuls map[string][]*serverParts + remoteConsuls map[string][]*server_details.ServerDetails remoteLock sync.RWMutex // rpcListener is used to listen for incoming connections @@ -216,10 +217,10 @@ func NewServer(config *Config) (*Server, error) { connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), - localConsuls: make(map[string]*serverParts), + localConsuls: make(map[string]*server_details.ServerDetails), logger: logger, reconcileCh: make(chan serf.Member, 32), - remoteConsuls: make(map[string][]*serverParts), + remoteConsuls: make(map[string][]*server_details.ServerDetails), rpcServer: rpc.NewServer(), rpcTLS: incomingTLS, tombstoneGC: gc, diff --git a/consul/server_details/server_details.go b/consul/server_details/server_details.go new file mode 100644 index 0000000000..e9386e8ca3 --- /dev/null +++ b/consul/server_details/server_details.go @@ -0,0 +1,70 @@ +package server_details + +import ( + "fmt" + "net" + "strconv" + + "github.com/hashicorp/serf/serf" +) + +// ServerDetails is used to return details of a consul server +type ServerDetails struct { + Name string + Datacenter string + Port int + Bootstrap bool + Expect int + Version int + Addr net.Addr +} + +func (s *ServerDetails) String() string { + return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter) +} + +// IsConsulServer returns true if a serf member is a consul server. Returns a +// bool and a pointer to the ServerDetails. +func IsConsulServer(m serf.Member) (bool, *ServerDetails) { + if m.Tags["role"] != "consul" { + return false, nil + } + + datacenter := m.Tags["dc"] + _, bootstrap := m.Tags["bootstrap"] + + expect := 0 + expect_str, ok := m.Tags["expect"] + var err error + if ok { + expect, err = strconv.Atoi(expect_str) + if err != nil { + return false, nil + } + } + + port_str := m.Tags["port"] + port, err := strconv.Atoi(port_str) + if err != nil { + return false, nil + } + + vsn_str := m.Tags["vsn"] + vsn, err := strconv.Atoi(vsn_str) + if err != nil { + return false, nil + } + + addr := &net.TCPAddr{IP: m.Addr, Port: port} + + parts := &ServerDetails{ + Name: m.Name, + Datacenter: datacenter, + Port: port, + Bootstrap: bootstrap, + Expect: expect, + Addr: addr, + Version: vsn, + } + return true, parts +} diff --git a/consul/server_details/server_details_test.go b/consul/server_details/server_details_test.go new file mode 100644 index 0000000000..09b87ec165 --- /dev/null +++ b/consul/server_details/server_details_test.go @@ -0,0 +1,66 @@ +package server_details_test + +import ( + "net" + "testing" + + "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/serf/serf" +) + +func TestIsConsulServer(t *testing.T) { + m := serf.Member{ + Name: "foo", + Addr: net.IP([]byte{127, 0, 0, 1}), + Tags: map[string]string{ + "role": "consul", + "dc": "east-aws", + "port": "10000", + "vsn": "1", + }, + } + ok, parts := server_details.IsConsulServer(m) + if !ok || parts.Datacenter != "east-aws" || parts.Port != 10000 { + t.Fatalf("bad: %v %v", ok, parts) + } + if parts.Name != "foo" { + t.Fatalf("bad: %v", parts) + } + if parts.Bootstrap { + t.Fatalf("unexpected bootstrap") + } + if parts.Expect != 0 { + t.Fatalf("bad: %v", parts.Expect) + } + m.Tags["bootstrap"] = "1" + m.Tags["disabled"] = "1" + ok, parts = server_details.IsConsulServer(m) + if !ok { + t.Fatalf("expected a valid consul server") + } + if !parts.Bootstrap { + t.Fatalf("expected bootstrap") + } + if parts.Addr.String() != "127.0.0.1:10000" { + t.Fatalf("bad addr: %v", parts.Addr) + } + if parts.Version != 1 { + t.Fatalf("bad: %v", parts) + } + m.Tags["expect"] = "3" + delete(m.Tags, "bootstrap") + delete(m.Tags, "disabled") + ok, parts = server_details.IsConsulServer(m) + if !ok || parts.Expect != 3 { + t.Fatalf("bad: %v", parts.Expect) + } + if parts.Bootstrap { + t.Fatalf("unexpected bootstrap") + } + + delete(m.Tags, "role") + ok, parts = server_details.IsConsulServer(m) + if ok { + t.Fatalf("unexpected ok server") + } +} diff --git a/consul/server_manager/server_manager.go b/consul/server_manager/server_manager.go new file mode 100644 index 0000000000..504c5ce2bf --- /dev/null +++ b/consul/server_manager/server_manager.go @@ -0,0 +1,311 @@ +package server_manager + +import ( + "log" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/lib" +) + +type consulServerEventTypes int + +const ( + // clientRPCJitterFraction determines the amount of jitter added to + // clientRPCMinReuseDuration before a connection is expired and a new + // connection is established in order to rebalance load across consul + // servers. The cluster-wide number of connections per second from + // rebalancing is applied after this jitter to ensure the CPU impact + // is always finite. See newRebalanceConnsPerSecPerServer's comment + // for additional commentary. + // + // For example, in a 10K consul cluster with 5x servers, this default + // averages out to ~13 new connections from rebalancing per server + // per second (each connection is reused for 120s to 180s). + clientRPCJitterFraction = 2 + + // clientRPCMinReuseDuration controls the minimum amount of time RPC + // queries are sent over an established connection to a single server + clientRPCMinReuseDuration = 120 * time.Second + + // Limit the number of new connections a server receives per second + // for connection rebalancing. This limit caps the load caused by + // continual rebalancing efforts when a cluster is in equilibrium. A + // lower value comes at the cost of increased recovery time after a + // partition. This parameter begins to take effect when there are + // more than ~48K clients querying 5x servers or at lower server + // values when there is a partition. + // + // For example, in a 100K consul cluster with 5x servers, it will + // take ~5min for all servers to rebalance their connections. If + // 99,995 agents are in the minority talking to only one server, it + // will take ~26min for all servers to rebalance. A 10K cluster in + // the same scenario will take ~2.6min to rebalance. + newRebalanceConnsPerSecPerServer = 64 +) + +type ConsulClusterInfo interface { + NumNodes() int +} + +// serverCfg is the thread-safe configuration struct used to maintain the +// list of Consul servers in ServerManager. +// +// NOTE(sean@): We are explicitly relying on the fact that serverConfig will +// be copied onto the stack. Please keep this structure light. +type serverConfig struct { + // servers tracks the locally known servers. List membership is + // maintained by Serf. + servers []*server_details.ServerDetails +} + +type ServerManager struct { + // serverConfig provides the necessary load/store semantics for the + // server list. + serverConfigValue atomic.Value + serverConfigLock sync.Mutex + + // shutdownCh is a copy of the channel in consul.Client + shutdownCh chan struct{} + + logger *log.Logger + + // clusterInfo is used to estimate the approximate number of nodes in + // a cluster and limit the rate at which it rebalances server + // connections. ConsulClusterInfo is an interface that wraps serf. + clusterInfo ConsulClusterInfo + + // notifyFailedServersBarrier is acts as a barrier to prevent + // queueing behind serverConfigLog and acts as a TryLock(). + notifyFailedBarrier int32 +} + +// AddServer takes out an internal write lock and adds a new server. If the +// server is not known, appends the server to the list. The new server will +// begin seeing use after the rebalance timer fires or enough servers fail +// organically. If the server is already known, merge the new server +// details. +func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { + sm.serverConfigLock.Lock() + defer sm.serverConfigLock.Unlock() + serverCfg := sm.getServerConfig() + + // Check if this server is known + found := false + for idx, existing := range serverCfg.servers { + if existing.Name == server.Name { + newServers := make([]*server_details.ServerDetails, len(serverCfg.servers)) + copy(newServers, serverCfg.servers) + + // Overwrite the existing server details in order to + // possibly update metadata (e.g. server version) + newServers[idx] = server + + serverCfg.servers = newServers + found = true + break + } + } + + // Add to the list if not known + if !found { + newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1) + copy(newServers, serverCfg.servers) + newServers = append(newServers, server) + serverCfg.servers = newServers + } + + sm.saveServerConfig(serverCfg) +} + +// cycleServers returns a new list of servers that has dequeued the first +// server and enqueued it at the end of the list. cycleServers assumes the +// caller is holding the serverConfigLock. +func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { + numServers := len(sc.servers) + if numServers < 2 { + return servers // No action required + } + + newServers := make([]*server_details.ServerDetails, 0, numServers) + newServers = append(newServers, sc.servers[1:]...) + newServers = append(newServers, sc.servers[0]) + return newServers +} + +// FindServer takes out an internal "read lock" and searches through the list +// of servers to find a "healthy" server. If the server is actually +// unhealthy, we rely on Serf to detect this and remove the node from the +// server list. If the server at the front of the list has failed or fails +// during an RPC call, it is rotated to the end of the list. If there are no +// servers available, return nil. +func (sm *ServerManager) FindServer() *server_details.ServerDetails { + serverCfg := sm.getServerConfig() + numServers := len(serverCfg.servers) + if numServers == 0 { + sm.logger.Printf("[WARN] consul: No servers available") + return nil + } else { + // Return whatever is at the front of the list because it is + // assumed to be the oldest in the server list (unless - + // hypothetically - the server list was rotated right after a + // server was added). + return serverCfg.servers[0] + } +} + +// getServerConfig is a convenience method which hides the locking semantics +// of atomic.Value from the caller. +func (sm *ServerManager) getServerConfig() serverConfig { + return sm.serverConfigValue.Load().(serverConfig) +} + +// saveServerConfig is a convenience method which hides the locking semantics +// of atomic.Value from the caller. +func (sm *ServerManager) saveServerConfig(sc serverConfig) { + sm.serverConfigValue.Store(sc) +} + +// New is the only way to safely create a new ServerManager struct. +func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) { + // NOTE(sean@): Can't pass *consul.Client due to an import cycle + sm = new(ServerManager) + sm.logger = logger + sm.clusterInfo = clusterInfo + sm.shutdownCh = shutdownCh + + sc := serverConfig{} + sc.servers = make([]*server_details.ServerDetails, 0) + sm.saveServerConfig(sc) + return sm +} + +// NotifyFailedServer marks the passed in server as "failed" by rotating it +// to the end of the server list. +func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { + serverCfg := sm.getServerConfig() + + // If the server being failed is not the first server on the list, + // this is a noop. If, however, the server is failed and first on + // the list, acquire the lock, retest, and take the penalty of moving + // the server to the end of the list. + + // Only rotate the server list when there is more than one server + if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server && + // Use atomic.CAS to emulate a TryLock(). + atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { + defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0) + + // Grab a lock, retest, and take the hit of cycling the first + // server to the end. + sm.serverConfigLock.Lock() + defer sm.serverConfigLock.Unlock() + serverCfg = sm.getServerConfig() + + if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server { + serverCfg.servers = serverCfg.cycleServer() + sm.saveServerConfig(serverCfg) + } + } +} + +// NumServers takes out an internal "read lock" and returns the number of +// servers. numServers includes both healthy and unhealthy servers. +func (sm *ServerManager) NumServers() (numServers int) { + serverCfg := sm.getServerConfig() + numServers = len(serverCfg.servers) + return numServers +} + +// RebalanceServers takes out an internal write lock and shuffles the list of +// servers on this agent. This allows for a redistribution of work across +// consul servers and provides a guarantee that the order of the server list +// isn't related to the age at which the node was added to the cluster. +// Elsewhere we rely on the position in the server list as a hint regarding +// the stability of a server relative to its position in the server list. +// Servers at or near the front of the list are more stable than servers near +// the end of the list. Unhealthy servers are removed when serf notices the +// server has been deregistered. +func (sm *ServerManager) RebalanceServers() { + sm.serverConfigLock.Lock() + defer sm.serverConfigLock.Unlock() + serverCfg := sm.getServerConfig() + + newServers := make([]*server_details.ServerDetails, len(serverCfg.servers)) + copy(newServers, serverCfg.servers) + + // Shuffle the server list + for i := len(serverCfg.servers) - 1; i > 0; i-- { + j := rand.Int31n(int32(i + 1)) + newServers[i], newServers[j] = newServers[j], newServers[i] + } + serverCfg.servers = newServers + + sm.saveServerConfig(serverCfg) +} + +// RemoveServer takes out an internal write lock and removes a server from +// the server list. +func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { + sm.serverConfigLock.Lock() + defer sm.serverConfigLock.Unlock() + serverCfg := sm.getServerConfig() + + // Remove the server if known + for i, _ := range serverCfg.servers { + if serverCfg.servers[i].Name == server.Name { + newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1) + newServers = append(newServers, serverCfg.servers[:i]...) + newServers = append(newServers, serverCfg.servers[i+1:]...) + serverCfg.servers = newServers + + sm.saveServerConfig(serverCfg) + return + } + } +} + +// refreshServerRebalanceTimer is only called once the rebalanceTimer +// expires. Historically this was an expensive routine and is intended to be +// run in isolation in a dedicated, non-concurrent task. +func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration { + serverCfg := sm.getServerConfig() + numConsulServers := len(serverCfg.servers) + // Limit this connection's life based on the size (and health) of the + // cluster. Never rebalance a connection more frequently than + // connReuseLowWatermarkDuration, and make sure we never exceed + // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. + clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) + connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) + numLANMembers := sm.clusterInfo.NumNodes() + connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) + + timer.Reset(connRebalanceTimeout) + return connRebalanceTimeout +} + +// Start is used to start and manage the task of automatically shuffling and +// rebalancing the list of consul servers. This maintenance only happens +// periodically based on the expiration of the timer. Failed servers are +// automatically cycled to the end of the list. New servers are appended to +// the list. The order of the server list must be shuffled periodically to +// distribute load across all known and available consul servers. +func (sm *ServerManager) Start() { + var rebalanceTimer *time.Timer = time.NewTimer(clientRPCMinReuseDuration) + + for { + select { + case <-rebalanceTimer.C: + sm.logger.Printf("[INFO] server manager: Rebalancing server connections") + sm.RebalanceServers() + sm.refreshServerRebalanceTimer(rebalanceTimer) + + case <-sm.shutdownCh: + sm.logger.Printf("[INFO] server manager: shutting down") + return + } + } +} diff --git a/consul/server_manager/server_manager_internal_test.go b/consul/server_manager/server_manager_internal_test.go new file mode 100644 index 0000000000..6ea93c3e9d --- /dev/null +++ b/consul/server_manager/server_manager_internal_test.go @@ -0,0 +1,225 @@ +package server_manager + +import ( + "bytes" + "fmt" + "log" + "os" + "testing" + "time" + + "github.com/hashicorp/consul/consul/server_details" +) + +var ( + localLogger *log.Logger + localLogBuffer *bytes.Buffer +) + +func init() { + localLogBuffer = new(bytes.Buffer) + localLogger = log.New(localLogBuffer, "", 0) +} + +func GetBufferedLogger() *log.Logger { + return localLogger +} + +type fauxSerf struct { + numNodes int +} + +func (s *fauxSerf) NumNodes() int { + return s.numNodes +} + +func testServerManager() (sm *ServerManager) { + logger := GetBufferedLogger() + shutdownCh := make(chan struct{}) + sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}) + return sm +} + +// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { +func TestServerManagerInternal_cycleServer(t *testing.T) { + sm := testServerManager() + sc := sm.getServerConfig() + + server0 := &server_details.ServerDetails{Name: "server1"} + server1 := &server_details.ServerDetails{Name: "server2"} + server2 := &server_details.ServerDetails{Name: "server3"} + sc.servers = append(sc.servers, server0, server1, server2) + sm.saveServerConfig(sc) + + sc = sm.getServerConfig() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server0 && + sc.servers[1] != server1 && + sc.servers[2] != server2 { + t.Fatalf("initial server ordering not correct") + } + + sc.servers = sc.cycleServer() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server1 && + sc.servers[1] != server2 && + sc.servers[2] != server0 { + t.Fatalf("server ordering after one cycle not correct") + } + + sc.servers = sc.cycleServer() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server2 && + sc.servers[1] != server0 && + sc.servers[2] != server1 { + t.Fatalf("server ordering after two cycles not correct") + } + + sc.servers = sc.cycleServer() + if len(sc.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + } + if sc.servers[0] != server0 && + sc.servers[1] != server1 && + sc.servers[2] != server2 { + t.Fatalf("server ordering after three cycles not correct") + } +} + +// func (sm *ServerManager) getServerConfig() serverConfig { +func TestServerManagerInternal_getServerConfig(t *testing.T) { + sm := testServerManager() + sc := sm.getServerConfig() + if sc.servers == nil { + t.Fatalf("serverConfig.servers nil") + } + + if len(sc.servers) != 0 { + t.Fatalf("serverConfig.servers length not zero") + } +} + +// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) { +func TestServerManagerInternal_New(t *testing.T) { + sm := testServerManager() + if sm == nil { + t.Fatalf("ServerManager nil") + } + + if sm.clusterInfo == nil { + t.Fatalf("ServerManager.clusterInfo nil") + } + + if sm.logger == nil { + t.Fatalf("ServerManager.logger nil") + } + + if sm.shutdownCh == nil { + t.Fatalf("ServerManager.shutdownCh nil") + } +} + +// func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) { +func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { + sm := testServerManager() + + timer := time.NewTimer(time.Duration(1 * time.Nanosecond)) + time.Sleep(1 * time.Millisecond) + sm.refreshServerRebalanceTimer(timer) + + logger := log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + + type clusterSizes struct { + numNodes int + numServers int + minRebalance time.Duration + } + clusters := []clusterSizes{ + {0, 3, 2 * time.Minute}, + {1, 0, 2 * time.Minute}, // partitioned cluster + {1, 3, 2 * time.Minute}, + {2, 3, 2 * time.Minute}, + {100, 0, 2 * time.Minute}, // partitioned + {100, 1, 2 * time.Minute}, // partitioned + {100, 3, 2 * time.Minute}, + {1024, 1, 2 * time.Minute}, // partitioned + {1024, 3, 2 * time.Minute}, // partitioned + {1024, 5, 2 * time.Minute}, + {16384, 1, 4 * time.Minute}, // partitioned + {16384, 2, 2 * time.Minute}, // partitioned + {16384, 3, 2 * time.Minute}, // partitioned + {16384, 5, 2 * time.Minute}, + {65535, 0, 2 * time.Minute}, // partitioned + {65535, 1, 8 * time.Minute}, // partitioned + {65535, 2, 3 * time.Minute}, // partitioned + {65535, 3, 5 * time.Minute}, // partitioned + {65535, 5, 3 * time.Minute}, // partitioned + {65535, 7, 2 * time.Minute}, + {1000000, 1, 4 * time.Hour}, // partitioned + {1000000, 2, 2 * time.Hour}, // partitioned + {1000000, 3, 80 * time.Minute}, // partitioned + {1000000, 5, 50 * time.Minute}, // partitioned + {1000000, 11, 20 * time.Minute}, // partitioned + {1000000, 19, 10 * time.Minute}, + } + + for _, s := range clusters { + sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}) + + for i := 0; i < s.numServers; i++ { + nodeName := fmt.Sprintf("s%02d", i) + sm.AddServer(&server_details.ServerDetails{Name: nodeName}) + } + + d := sm.refreshServerRebalanceTimer(timer) + if d < s.minRebalance { + t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) + } + } +} + +// func (sm *ServerManager) saveServerConfig(sc serverConfig) { +func TestServerManagerInternal_saveServerConfig(t *testing.T) { + sm := testServerManager() + + // Initial condition + func() { + sc := sm.getServerConfig() + if len(sc.servers) != 0 { + t.Fatalf("ServerManager.saveServerConfig failed to load init config") + } + + newServer := new(server_details.ServerDetails) + sc.servers = append(sc.servers, newServer) + sm.saveServerConfig(sc) + }() + + // Test that save works + func() { + sc1 := sm.getServerConfig() + t1NumServers := len(sc1.servers) + if t1NumServers != 1 { + t.Fatalf("ServerManager.saveServerConfig failed to save mutated config") + } + }() + + // Verify mutation w/o a save doesn't alter the original + func() { + newServer := new(server_details.ServerDetails) + sc := sm.getServerConfig() + sc.servers = append(sc.servers, newServer) + + sc_orig := sm.getServerConfig() + origNumServers := len(sc_orig.servers) + if origNumServers >= len(sc.servers) { + t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original") + } + }() +} diff --git a/consul/server_manager/server_manager_test.go b/consul/server_manager/server_manager_test.go new file mode 100644 index 0000000000..8b292a2ddc --- /dev/null +++ b/consul/server_manager/server_manager_test.go @@ -0,0 +1,359 @@ +package server_manager_test + +import ( + "bytes" + "fmt" + "log" + "os" + "strings" + "testing" + + "github.com/hashicorp/consul/consul/server_details" + "github.com/hashicorp/consul/consul/server_manager" +) + +var ( + localLogger *log.Logger + localLogBuffer *bytes.Buffer +) + +func init() { + localLogBuffer = new(bytes.Buffer) + localLogger = log.New(localLogBuffer, "", 0) +} + +func GetBufferedLogger() *log.Logger { + return localLogger +} + +type fauxSerf struct { +} + +func (s *fauxSerf) NumNodes() int { + return 16384 +} + +func testServerManager() (sm *server_manager.ServerManager) { + logger := GetBufferedLogger() + logger = log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + sm = server_manager.New(logger, shutdownCh, &fauxSerf{}) + return sm +} + +// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { +func TestServerManager_AddServer(t *testing.T) { + sm := testServerManager() + var num int + num = sm.NumServers() + if num != 0 { + t.Fatalf("Expected zero servers to start") + } + + s1 := &server_details.ServerDetails{Name: "s1"} + sm.AddServer(s1) + num = sm.NumServers() + if num != 1 { + t.Fatalf("Expected one server") + } + + sm.AddServer(s1) + num = sm.NumServers() + if num != 1 { + t.Fatalf("Expected one server (still)") + } + + s2 := &server_details.ServerDetails{Name: "s2"} + sm.AddServer(s2) + num = sm.NumServers() + if num != 2 { + t.Fatalf("Expected two servers") + } +} + +// func (sm *ServerManager) FindServer() (server *server_details.ServerDetails) { +func TestServerManager_FindServer(t *testing.T) { + sm := testServerManager() + + if sm.FindServer() != nil { + t.Fatalf("Expected nil return") + } + + sm.AddServer(&server_details.ServerDetails{Name: "s1"}) + if sm.NumServers() != 1 { + t.Fatalf("Expected one server") + } + + s1 := sm.FindServer() + if s1 == nil { + t.Fatalf("Expected non-nil server") + } + if s1.Name != "s1" { + t.Fatalf("Expected s1 server") + } + + s1 = sm.FindServer() + if s1 == nil || s1.Name != "s1" { + t.Fatalf("Expected s1 server (still)") + } + + sm.AddServer(&server_details.ServerDetails{Name: "s2"}) + if sm.NumServers() != 2 { + t.Fatalf("Expected two servers") + } + s1 = sm.FindServer() + if s1 == nil || s1.Name != "s1" { + t.Fatalf("Expected s1 server (still)") + } + + sm.NotifyFailedServer(s1) + s2 := sm.FindServer() + if s2 == nil || s2.Name != "s2" { + t.Fatalf("Expected s2 server") + } + + sm.NotifyFailedServer(s2) + s1 = sm.FindServer() + if s1 == nil || s1.Name != "s1" { + t.Fatalf("Expected s1 server") + } +} + +// func New(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { +func TestServerManager_New(t *testing.T) { + logger := GetBufferedLogger() + logger = log.New(os.Stderr, "", log.LstdFlags) + shutdownCh := make(chan struct{}) + sm := server_manager.New(logger, shutdownCh, &fauxSerf{}) + if sm == nil { + t.Fatalf("ServerManager nil") + } +} + +// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { +func TestServerManager_NotifyFailedServer(t *testing.T) { + sm := testServerManager() + + if sm.NumServers() != 0 { + t.Fatalf("Expected zero servers to start") + } + + s1 := &server_details.ServerDetails{Name: "s1"} + s2 := &server_details.ServerDetails{Name: "s2"} + + // Try notifying for a server that is not part of the server manager + sm.NotifyFailedServer(s1) + if sm.NumServers() != 0 { + t.Fatalf("Expected zero servers to start") + } + sm.AddServer(s1) + + // Test again w/ a server not in the list + sm.NotifyFailedServer(s2) + if sm.NumServers() != 1 { + t.Fatalf("Expected one server") + } + + sm.AddServer(s2) + if sm.NumServers() != 2 { + t.Fatalf("Expected two servers") + } + + s1 = sm.FindServer() + if s1 == nil || s1.Name != "s1" { + t.Fatalf("Expected s1 server") + } + + sm.NotifyFailedServer(s2) + s1 = sm.FindServer() + if s1 == nil || s1.Name != "s1" { + t.Fatalf("Expected s1 server (still)") + } + + sm.NotifyFailedServer(s1) + s2 = sm.FindServer() + if s2 == nil || s2.Name != "s2" { + t.Fatalf("Expected s2 server") + } + + sm.NotifyFailedServer(s2) + s1 = sm.FindServer() + if s1 == nil || s1.Name != "s1" { + t.Fatalf("Expected s1 server") + } +} + +// func (sm *ServerManager) NumServers() (numServers int) { +func TestServerManager_NumServers(t *testing.T) { + sm := testServerManager() + var num int + num = sm.NumServers() + if num != 0 { + t.Fatalf("Expected zero servers to start") + } + + s := &server_details.ServerDetails{} + sm.AddServer(s) + num = sm.NumServers() + if num != 1 { + t.Fatalf("Expected one server after AddServer") + } +} + +// func (sm *ServerManager) RebalanceServers() { +func TestServerManager_RebalanceServers(t *testing.T) { + sm := testServerManager() + const maxServers = 100 + const numShuffleTests = 100 + const uniquePassRate = 0.5 + + // Make a huge list of nodes. + for i := 0; i < maxServers; i++ { + nodeName := fmt.Sprintf("s%02d", i) + sm.AddServer(&server_details.ServerDetails{Name: nodeName}) + } + + // Keep track of how many unique shuffles we get. + uniques := make(map[string]struct{}, maxServers) + for i := 0; i < numShuffleTests; i++ { + sm.RebalanceServers() + + var names []string + for j := 0; j < maxServers; j++ { + server := sm.FindServer() + sm.NotifyFailedServer(server) + names = append(names, server.Name) + } + key := strings.Join(names, "|") + uniques[key] = struct{}{} + } + + // We have to allow for the fact that there won't always be a unique + // shuffle each pass, so we just look for smell here without the test + // being flaky. + if len(uniques) < int(maxServers*uniquePassRate) { + t.Fatalf("unique shuffle ratio too low: %d/%d", len(uniques), maxServers) + } +} + +// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { +func TestServerManager_RemoveServer(t *testing.T) { + const nodeNameFmt = "s%02d" + sm := testServerManager() + + if sm.NumServers() != 0 { + t.Fatalf("Expected zero servers to start") + } + + // Test removing server before its added + nodeName := fmt.Sprintf(nodeNameFmt, 1) + s1 := &server_details.ServerDetails{Name: nodeName} + sm.RemoveServer(s1) + sm.AddServer(s1) + + nodeName = fmt.Sprintf(nodeNameFmt, 2) + s2 := &server_details.ServerDetails{Name: nodeName} + sm.RemoveServer(s2) + sm.AddServer(s2) + + const maxServers = 19 + servers := make([]*server_details.ServerDetails, maxServers) + // Already added two servers above + for i := maxServers; i > 2; i-- { + nodeName := fmt.Sprintf(nodeNameFmt, i) + server := &server_details.ServerDetails{Name: nodeName} + servers = append(servers, server) + sm.AddServer(server) + } + sm.RebalanceServers() + + if sm.NumServers() != maxServers { + t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) + } + + findServer := func(server *server_details.ServerDetails) bool { + for i := sm.NumServers(); i > 0; i-- { + s := sm.FindServer() + if s == server { + return true + } + } + return false + } + + expectedNumServers := maxServers + removedServers := make([]*server_details.ServerDetails, 0, maxServers) + + // Remove servers from the front of the list + for i := 3; i > 0; i-- { + server := sm.FindServer() + if server == nil { + t.Fatalf("FindServer returned nil") + } + sm.RemoveServer(server) + expectedNumServers-- + if sm.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + } + if findServer(server) == true { + t.Fatalf("Did not expect to find server %s after removal from the front", server.Name) + } + removedServers = append(removedServers, server) + } + + // Remove server from the end of the list + for i := 3; i > 0; i-- { + server := sm.FindServer() + sm.NotifyFailedServer(server) + sm.RemoveServer(server) + expectedNumServers-- + if sm.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + } + if findServer(server) == true { + t.Fatalf("Did not expect to find server %s", server.Name) + } + removedServers = append(removedServers, server) + } + + // Remove server from the middle of the list + for i := 3; i > 0; i-- { + server := sm.FindServer() + sm.NotifyFailedServer(server) + server2 := sm.FindServer() + sm.NotifyFailedServer(server2) // server2 now at end of the list + + sm.RemoveServer(server) + expectedNumServers-- + if sm.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + } + if findServer(server) == true { + t.Fatalf("Did not expect to find server %s", server.Name) + } + removedServers = append(removedServers, server) + } + + if sm.NumServers()+len(removedServers) != maxServers { + t.Fatalf("Expected %d+%d=%d servers", sm.NumServers(), len(removedServers), maxServers) + } + + // Drain the remaining servers from the middle + for i := sm.NumServers(); i > 0; i-- { + server := sm.FindServer() + sm.NotifyFailedServer(server) + server2 := sm.FindServer() + sm.NotifyFailedServer(server2) // server2 now at end of the list + sm.RemoveServer(server) + removedServers = append(removedServers, server) + } + + if sm.NumServers() != 0 { + t.Fatalf("Expected an empty server list") + } + if len(removedServers) != maxServers { + t.Fatalf("Expected all servers to be in removed server list") + } +} + +// func (sm *ServerManager) Start() { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 3e7ef5955e..d582560cd8 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -410,7 +410,7 @@ type CheckServiceNodes []CheckServiceNode // Shuffle does an in-place random shuffle using the Fisher-Yates algorithm. func (nodes CheckServiceNodes) Shuffle() { for i := len(nodes) - 1; i > 0; i-- { - j := rand.Int31() % int32(i+1) + j := rand.Int31n(int32(i + 1)) nodes[i], nodes[j] = nodes[j], nodes[i] } } diff --git a/consul/util.go b/consul/util.go index 8959ee66a2..02dda3c116 100644 --- a/consul/util.go +++ b/consul/util.go @@ -23,21 +23,6 @@ import ( */ var privateBlocks []*net.IPNet -// serverparts is used to return the parts of a server role -type serverParts struct { - Name string - Datacenter string - Port int - Bootstrap bool - Expect int - Version int - Addr net.Addr -} - -func (s *serverParts) String() string { - return fmt.Sprintf("%s (Addr: %s) (DC: %s)", s.Name, s.Addr, s.Datacenter) -} - func init() { // Add each private block privateBlocks = make([]*net.IPNet, 6) @@ -116,52 +101,6 @@ func CanServersUnderstandProtocol(members []serf.Member, version uint8) (bool, e return (numServers > 0) && (numWhoGrok == numServers), nil } -// Returns if a member is a consul server. Returns a bool, -// the datacenter, and the rpc port -func isConsulServer(m serf.Member) (bool, *serverParts) { - if m.Tags["role"] != "consul" { - return false, nil - } - - datacenter := m.Tags["dc"] - _, bootstrap := m.Tags["bootstrap"] - - expect := 0 - expect_str, ok := m.Tags["expect"] - var err error - if ok { - expect, err = strconv.Atoi(expect_str) - if err != nil { - return false, nil - } - } - - port_str := m.Tags["port"] - port, err := strconv.Atoi(port_str) - if err != nil { - return false, nil - } - - vsn_str := m.Tags["vsn"] - vsn, err := strconv.Atoi(vsn_str) - if err != nil { - return false, nil - } - - addr := &net.TCPAddr{IP: m.Addr, Port: port} - - parts := &serverParts{ - Name: m.Name, - Datacenter: datacenter, - Port: port, - Bootstrap: bootstrap, - Expect: expect, - Addr: addr, - Version: vsn, - } - return true, parts -} - // Returns if a member is a consul node. Returns a bool, // and the datacenter. func isConsulNode(m serf.Member) (bool, string) { diff --git a/consul/util_test.go b/consul/util_test.go index 1011ec896a..042a4f6779 100644 --- a/consul/util_test.go +++ b/consul/util_test.go @@ -196,49 +196,6 @@ func TestUtil_CanServersUnderstandProtocol(t *testing.T) { } } -func TestIsConsulServer(t *testing.T) { - m := serf.Member{ - Name: "foo", - Addr: net.IP([]byte{127, 0, 0, 1}), - Tags: map[string]string{ - "role": "consul", - "dc": "east-aws", - "port": "10000", - "vsn": "1", - }, - } - valid, parts := isConsulServer(m) - if !valid || parts.Datacenter != "east-aws" || parts.Port != 10000 { - t.Fatalf("bad: %v %v", valid, parts) - } - if parts.Name != "foo" { - t.Fatalf("bad: %v", parts) - } - if parts.Bootstrap { - t.Fatalf("unexpected bootstrap") - } - if parts.Expect != 0 { - t.Fatalf("bad: %v", parts.Expect) - } - m.Tags["bootstrap"] = "1" - valid, parts = isConsulServer(m) - if !valid || !parts.Bootstrap { - t.Fatalf("expected bootstrap") - } - if parts.Addr.String() != "127.0.0.1:10000" { - t.Fatalf("bad addr: %v", parts.Addr) - } - if parts.Version != 1 { - t.Fatalf("bad: %v", parts) - } - m.Tags["expect"] = "3" - delete(m.Tags, "bootstrap") - valid, parts = isConsulServer(m) - if !valid || parts.Expect != 3 { - t.Fatalf("bad: %v", parts.Expect) - } -} - func TestIsConsulNode(t *testing.T) { m := serf.Member{ Tags: map[string]string{ diff --git a/lib/cluster.go b/lib/cluster.go index 0062c61375..33a7788c78 100644 --- a/lib/cluster.go +++ b/lib/cluster.go @@ -14,6 +14,10 @@ func RandomStagger(intv time.Duration) time.Duration { // order to target an aggregate number of actions per second across the whole // cluster. func RateScaledInterval(rate float64, min time.Duration, n int) time.Duration { + const minRate = 1 / 86400 // 1/(1 * time.Day) + if rate <= minRate { + return min + } interval := time.Duration(float64(time.Second) * float64(n) / rate) if interval < min { return min diff --git a/lib/cluster_test.go b/lib/cluster_test.go index 40949d0204..d517e4677f 100644 --- a/lib/cluster_test.go +++ b/lib/cluster_test.go @@ -16,7 +16,7 @@ func TestRandomStagger(t *testing.T) { } func TestRateScaledInterval(t *testing.T) { - min := 1 * time.Second + const min = 1 * time.Second rate := 200.0 if v := RateScaledInterval(rate, min, 0); v != min { t.Fatalf("Bad: %v", v) @@ -36,4 +36,13 @@ func TestRateScaledInterval(t *testing.T) { if v := RateScaledInterval(rate, min, 10000); v != 50*time.Second { t.Fatalf("Bad: %v", v) } + if v := RateScaledInterval(0, min, 10000); v != min { + t.Fatalf("Bad: %v", v) + } + if v := RateScaledInterval(0.0, min, 10000); v != min { + t.Fatalf("Bad: %v", v) + } + if v := RateScaledInterval(-1, min, 10000); v != min { + t.Fatalf("Bad: %v", v) + } } diff --git a/scripts/test.sh b/scripts/test.sh index b1ee7add71..a892469f09 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -12,4 +12,4 @@ go build -o $TEMPDIR/consul || exit 1 # Run the tests echo "--> Running tests" -go list ./... | grep -v ^github.com/hashicorp/consul/vendor/ | PATH=$TEMPDIR:$PATH xargs -n1 go test +go list ./... | grep -v ^github.com/hashicorp/consul/vendor/ | PATH=$TEMPDIR:$PATH xargs -n1 go test ${GOTEST_FLAGS:-} diff --git a/vendor/github.com/hashicorp/serf/coordinate/test_util.go b/vendor/github.com/hashicorp/serf/coordinate/test_util.go deleted file mode 100644 index 116e949337..0000000000 --- a/vendor/github.com/hashicorp/serf/coordinate/test_util.go +++ /dev/null @@ -1,27 +0,0 @@ -package coordinate - -import ( - "math" - "testing" -) - -// verifyEqualFloats will compare f1 and f2 and fail if they are not -// "equal" within a threshold. -func verifyEqualFloats(t *testing.T, f1 float64, f2 float64) { - const zeroThreshold = 1.0e-6 - if math.Abs(f1-f2) > zeroThreshold { - t.Fatalf("equal assertion fail, %9.6f != %9.6f", f1, f2) - } -} - -// verifyEqualVectors will compare vec1 and vec2 and fail if they are not -// "equal" within a threshold. -func verifyEqualVectors(t *testing.T, vec1 []float64, vec2 []float64) { - if len(vec1) != len(vec2) { - t.Fatalf("vector length mismatch, %d != %d", len(vec1), len(vec2)) - } - - for i, _ := range vec1 { - verifyEqualFloats(t, vec1[i], vec2[i]) - } -} diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 4f1921f775..613b915dc4 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -214,8 +214,8 @@ type queries struct { } const ( - UserEventSizeLimit = 512 // Maximum byte size for event name and payload - snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot + UserEventSizeLimit = 512 // Maximum byte size for event name and payload + snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot ) // Create creates a new Serf instance, starting all the background tasks @@ -1680,3 +1680,13 @@ func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, o return nil, false } + +// NumNodes returns the number of nodes in the serf cluster, regardless of +// their health or status. +func (s *Serf) NumNodes() (numNodes int) { + s.memberLock.RLock() + numNodes = len(s.members) + s.memberLock.RUnlock() + + return numNodes +}