mirror of https://github.com/status-im/consul.git
Move consul.serverConfig out of the consul package
Relocated to its own package, server_manager. This now greatly simplifies the RPC() call path and appropriately hides the locking behind the package boundary. More work is needed to be done here
This commit is contained in:
parent
0925b26250
commit
2ca4cc58ce
|
@ -8,19 +8,23 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/server_details"
|
"github.com/hashicorp/consul/consul/server_details"
|
||||||
|
"github.com/hashicorp/consul/consul/server_manager"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/serf/coordinate"
|
"github.com/hashicorp/serf/coordinate"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// clientRPCCache controls how long we keep an idle connection
|
// clientRPCConnMaxIdle controls how long we keep an idle connection
|
||||||
// open to a server
|
// open to a server. 127s was chosen as the first prime above 120s
|
||||||
clientRPCCache = 30 * time.Second
|
// (arbitrarily chose to use a prime) with the intent of reusing
|
||||||
|
// connections who are used by once-a-minute cron(8) jobs *and* who
|
||||||
|
// use a 60s jitter window (e.g. in vixie cron job execution can
|
||||||
|
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
||||||
|
clientRPCConnMaxIdle = 127 * time.Second
|
||||||
|
|
||||||
// clientMaxStreams controls how many idle streams we keep
|
// clientMaxStreams controls how many idle streams we keep
|
||||||
// open to a server
|
// open to a server
|
||||||
|
@ -54,14 +58,8 @@ type Client struct {
|
||||||
// Connection pool to consul servers
|
// Connection pool to consul servers
|
||||||
connPool *ConnPool
|
connPool *ConnPool
|
||||||
|
|
||||||
// serverConfig provides the necessary load/store semantics to
|
// serverManager
|
||||||
// serverConfig
|
serverMgr *server_manager.ServerManager
|
||||||
serverConfigValue atomic.Value
|
|
||||||
serverConfigLock sync.Mutex
|
|
||||||
|
|
||||||
// consulServersCh is used to receive events related to the
|
|
||||||
// maintenance of the list of consulServers
|
|
||||||
consulServersCh chan consulServerEventTypes
|
|
||||||
|
|
||||||
// eventCh is used to receive events from the
|
// eventCh is used to receive events from the
|
||||||
// serf cluster in the datacenter
|
// serf cluster in the datacenter
|
||||||
|
@ -120,12 +118,10 @@ func NewClient(config *Config) (*Client, error) {
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the initial serverConfig
|
c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh)
|
||||||
serverCfg := serverConfig{}
|
|
||||||
c.serverConfigValue.Store(serverCfg)
|
|
||||||
|
|
||||||
// Start consulServers maintenance
|
// Start consulServers maintenance
|
||||||
go c.consulServersManager()
|
go c.serverMgr.StartServerManager()
|
||||||
|
|
||||||
// Start the Serf listeners to prevent a deadlock
|
// Start the Serf listeners to prevent a deadlock
|
||||||
go c.lanEventHandler()
|
go c.lanEventHandler()
|
||||||
|
@ -274,7 +270,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.logger.Printf("[INFO] consul: adding server %s", parts)
|
c.logger.Printf("[INFO] consul: adding server %s", parts)
|
||||||
c.AddServer(parts)
|
c.serverMgr.AddServer(parts)
|
||||||
|
|
||||||
// Trigger the callback
|
// Trigger the callback
|
||||||
if c.config.ServerUp != nil {
|
if c.config.ServerUp != nil {
|
||||||
|
@ -291,7 +287,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.logger.Printf("[INFO] consul: removing server %s", parts)
|
c.logger.Printf("[INFO] consul: removing server %s", parts)
|
||||||
c.RemoveServer(parts)
|
c.serverMgr.RemoveServer(parts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,32 +321,7 @@ func (c *Client) localEvent(event serf.UserEvent) {
|
||||||
|
|
||||||
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
||||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||||
serverCfgPtr := c.serverConfigValue.Load()
|
server := c.serverMgr.FindHealthyServer()
|
||||||
if serverCfgPtr == nil {
|
|
||||||
c.logger.Printf("[ERR] consul: Failed to load a server config")
|
|
||||||
return structs.ErrNoServers
|
|
||||||
}
|
|
||||||
serverCfg := serverCfgPtr.(serverConfig)
|
|
||||||
|
|
||||||
numServers := len(serverCfg.servers)
|
|
||||||
if numServers == 0 {
|
|
||||||
c.logger.Printf("[ERR] consul: No servers found in the server config")
|
|
||||||
return structs.ErrNoServers
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the first non-failing server in the server list. If this is
|
|
||||||
// not the first server a prior RPC call marked the first server as
|
|
||||||
// failed and we're waiting for the server management task to reorder
|
|
||||||
// a working server to the front of the list.
|
|
||||||
var server *serverParts
|
|
||||||
for i := range serverCfg.servers {
|
|
||||||
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
|
|
||||||
if failCount == 0 {
|
|
||||||
server = serverCfg.servers[i]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if server == nil {
|
if server == nil {
|
||||||
c.logger.Printf("[ERR] consul: No healthy servers found in the server config")
|
c.logger.Printf("[ERR] consul: No healthy servers found in the server config")
|
||||||
return structs.ErrNoServers
|
return structs.ErrNoServers
|
||||||
|
@ -358,9 +329,8 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||||
|
|
||||||
// Forward to remote Consul
|
// Forward to remote Consul
|
||||||
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
||||||
atomic.AddUint64(&server.Disabled, 1)
|
c.serverMgr.NotifyFailedServer(server)
|
||||||
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
||||||
c.consulServersCh <- consulServersRPCError
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,7 +340,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||||
// Stats is used to return statistics for debugging and insight
|
// Stats is used to return statistics for debugging and insight
|
||||||
// for various sub-systems
|
// for various sub-systems
|
||||||
func (c *Client) Stats() map[string]map[string]string {
|
func (c *Client) Stats() map[string]map[string]string {
|
||||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
numServers := c.serverMgr.GetNumServers()
|
||||||
|
|
||||||
toString := func(v uint64) string {
|
toString := func(v uint64) string {
|
||||||
return strconv.FormatUint(v, 10)
|
return strconv.FormatUint(v, 10)
|
||||||
|
@ -378,7 +348,7 @@ func (c *Client) Stats() map[string]map[string]string {
|
||||||
stats := map[string]map[string]string{
|
stats := map[string]map[string]string{
|
||||||
"consul": map[string]string{
|
"consul": map[string]string{
|
||||||
"server": "false",
|
"server": "false",
|
||||||
"known_servers": toString(uint64(len(serverCfg.servers))),
|
"known_servers": toString(uint64(numServers)),
|
||||||
},
|
},
|
||||||
"serf_lan": c.serf.Stats(),
|
"serf_lan": c.serf.Stats(),
|
||||||
"runtime": runtimeStats(),
|
"runtime": runtimeStats(),
|
||||||
|
|
|
@ -95,8 +95,7 @@ func TestClient_JoinLAN(t *testing.T) {
|
||||||
|
|
||||||
// Check we have a new consul
|
// Check we have a new consul
|
||||||
testutil.WaitForResult(func() (bool, error) {
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
serverCfg := c1.serverConfigValue.Load().(serverConfig)
|
return c1.serverMgr.GetNumServers() == 1, nil
|
||||||
return len(serverCfg.servers) == 1, nil
|
|
||||||
}, func(err error) {
|
}, func(err error) {
|
||||||
t.Fatalf("expected consul server")
|
t.Fatalf("expected consul server")
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,212 +0,0 @@
|
||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/lib"
|
|
||||||
)
|
|
||||||
|
|
||||||
type consulServerEventTypes int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// consulServersNodeJoin is used to notify of a new consulServer.
|
|
||||||
// The primary effect of this is a reshuffling of consulServers and
|
|
||||||
// finding a new preferredServer.
|
|
||||||
consulServersNodeJoin = iota
|
|
||||||
|
|
||||||
// consulServersRebalance is used to signal we should rebalance our
|
|
||||||
// connection load across servers
|
|
||||||
consulServersRebalance
|
|
||||||
|
|
||||||
// consulServersRPCError is used to signal when a server has either
|
|
||||||
// timed out or returned an error and we would like to have the
|
|
||||||
// server manager find a new preferredServer.
|
|
||||||
consulServersRPCError
|
|
||||||
)
|
|
||||||
|
|
||||||
// serverCfg is the thread-safe configuration structure that is used to
|
|
||||||
// maintain the list of consul servers in Client.
|
|
||||||
//
|
|
||||||
// NOTE(sean@): We are explicitly relying on the fact that this is copied.
|
|
||||||
// Please keep this structure light.
|
|
||||||
type serverConfig struct {
|
|
||||||
// servers tracks the locally known servers
|
|
||||||
servers []*serverParts
|
|
||||||
|
|
||||||
// Timer used to control rebalancing of servers
|
|
||||||
rebalanceTimer *time.Timer
|
|
||||||
}
|
|
||||||
|
|
||||||
// consulServersManager is used to automatically shuffle and rebalance the
|
|
||||||
// list of consulServers. This maintenance happens either when a new server
|
|
||||||
// is added or when a duration has been exceed.
|
|
||||||
func (c *Client) consulServersManager() {
|
|
||||||
defaultTimeout := 5 * time.Second // FIXME(sean@): This is a bullshit value
|
|
||||||
var rebalanceTimer *time.Timer
|
|
||||||
func(c *Client) {
|
|
||||||
c.serverConfigLock.Lock()
|
|
||||||
defer c.serverConfigLock.Unlock()
|
|
||||||
|
|
||||||
serverCfgPtr := c.serverConfigValue.Load()
|
|
||||||
if serverCfgPtr == nil {
|
|
||||||
panic("server config has not been initialized")
|
|
||||||
}
|
|
||||||
var serverCfg serverConfig
|
|
||||||
serverCfg = serverCfgPtr.(serverConfig)
|
|
||||||
rebalanceTimer = time.NewTimer(defaultTimeout)
|
|
||||||
serverCfg.rebalanceTimer = rebalanceTimer
|
|
||||||
}(c)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case e := <-c.consulServersCh:
|
|
||||||
switch e {
|
|
||||||
case consulServersNodeJoin:
|
|
||||||
c.logger.Printf("[INFO] consul: new node joined cluster")
|
|
||||||
c.RebalanceServers()
|
|
||||||
case consulServersRebalance:
|
|
||||||
c.logger.Printf("[INFO] consul: rebalancing servers by request")
|
|
||||||
c.RebalanceServers()
|
|
||||||
case consulServersRPCError:
|
|
||||||
c.logger.Printf("[INFO] consul: need to find a new server to talk with")
|
|
||||||
c.CycleFailedServers()
|
|
||||||
// FIXME(sean@): wtb preemptive Status.Ping
|
|
||||||
// of servers, ideally parallel fan-out of N
|
|
||||||
// nodes, then settle on the first node which
|
|
||||||
// responds successfully.
|
|
||||||
//
|
|
||||||
// Is there a distinction between slow and
|
|
||||||
// offline? Do we run the Status.Ping with a
|
|
||||||
// fixed timeout (say 30s) that way we can
|
|
||||||
// alert administrators that they've set
|
|
||||||
// their RPC time too low even though the
|
|
||||||
// Ping did return successfully?
|
|
||||||
default:
|
|
||||||
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
|
||||||
}
|
|
||||||
case <-rebalanceTimer.C:
|
|
||||||
c.logger.Printf("[INFO] consul: server rebalance timeout")
|
|
||||||
c.RebalanceServers()
|
|
||||||
|
|
||||||
case <-c.shutdownCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) AddServer(server *serverParts) {
|
|
||||||
c.serverConfigLock.Lock()
|
|
||||||
defer c.serverConfigLock.Unlock()
|
|
||||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
|
||||||
|
|
||||||
// Check if this server is known
|
|
||||||
found := false
|
|
||||||
for idx, existing := range serverCfg.servers {
|
|
||||||
if existing.Name == server.Name {
|
|
||||||
// Overwrite the existing server parts in order to
|
|
||||||
// possibly update metadata (i.e. server version)
|
|
||||||
serverCfg.servers[idx] = server
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add to the list if not known
|
|
||||||
if !found {
|
|
||||||
serverCfg.servers = append(serverCfg.servers, server)
|
|
||||||
|
|
||||||
// Notify the server maintenance task of a new server
|
|
||||||
c.consulServersCh <- consulServersNodeJoin
|
|
||||||
}
|
|
||||||
|
|
||||||
c.serverConfigValue.Store(serverCfg)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) CycleFailedServers() {
|
|
||||||
c.serverConfigLock.Lock()
|
|
||||||
defer c.serverConfigLock.Unlock()
|
|
||||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
|
||||||
|
|
||||||
for i := range serverCfg.servers {
|
|
||||||
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
|
|
||||||
if failCount == 0 {
|
|
||||||
break
|
|
||||||
} else if failCount > 0 {
|
|
||||||
serverCfg.servers = serverCfg.cycleServer()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
serverCfg.resetRebalanceTimer(c)
|
|
||||||
c.serverConfigValue.Store(serverCfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sc *serverConfig) cycleServer() (servers []*serverParts) {
|
|
||||||
numServers := len(servers)
|
|
||||||
if numServers < 2 {
|
|
||||||
// No action required for zero or one server situations
|
|
||||||
return servers
|
|
||||||
}
|
|
||||||
|
|
||||||
var failedNode *serverParts
|
|
||||||
failedNode, servers = servers[0], servers[1:]
|
|
||||||
servers = append(servers, failedNode)
|
|
||||||
return servers
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) RebalanceServers() {
|
|
||||||
c.serverConfigLock.Lock()
|
|
||||||
defer c.serverConfigLock.Unlock()
|
|
||||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
|
||||||
|
|
||||||
// Shuffle the server list on server join. Servers are selected from
|
|
||||||
// the head of the list and are moved to the end of the list on
|
|
||||||
// failure.
|
|
||||||
for i := len(serverCfg.servers) - 1; i > 0; i-- {
|
|
||||||
j := rand.Int31n(int32(i + 1))
|
|
||||||
serverCfg.servers[i], serverCfg.servers[j] = serverCfg.servers[j], serverCfg.servers[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
serverCfg.resetRebalanceTimer(c)
|
|
||||||
c.serverConfigValue.Store(serverCfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) RemoveServer(server *serverParts) {
|
|
||||||
c.serverConfigLock.Lock()
|
|
||||||
defer c.serverConfigLock.Unlock()
|
|
||||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
|
||||||
|
|
||||||
// Remove the server if known
|
|
||||||
n := len(serverCfg.servers)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
if serverCfg.servers[i].Name == server.Name {
|
|
||||||
serverCfg.servers[i], serverCfg.servers[n-1] = serverCfg.servers[n-1], nil
|
|
||||||
serverCfg.servers = serverCfg.servers[:n-1]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.serverConfigValue.Store(serverCfg)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// resetRebalanceTimer assumes:
|
|
||||||
//
|
|
||||||
// 1) the serverConfigLock is already held by the caller.
|
|
||||||
// 2) the caller will call serverConfigValue.Store()
|
|
||||||
func (sc *serverConfig) resetRebalanceTimer(c *Client) {
|
|
||||||
numConsulServers := len(sc.servers)
|
|
||||||
// Limit this connection's life based on the size (and health) of the
|
|
||||||
// cluster. Never rebalance a connection more frequently than
|
|
||||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
|
||||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
|
||||||
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
|
||||||
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
|
||||||
numLANMembers := len(c.LANMembers())
|
|
||||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
|
||||||
c.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout)
|
|
||||||
|
|
||||||
sc.rebalanceTimer.Reset(connRebalanceTimeout)
|
|
||||||
}
|
|
|
@ -0,0 +1,323 @@
|
||||||
|
package server_manager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/server_details"
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
|
)
|
||||||
|
|
||||||
|
type consulServerEventTypes int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// consulServersNodeJoin is used to notify of a new consulServer.
|
||||||
|
// The primary effect of this is a reshuffling of consulServers and
|
||||||
|
// finding a new preferredServer.
|
||||||
|
consulServersNodeJoin = iota
|
||||||
|
|
||||||
|
// consulServersRebalance is used to signal we should rebalance our
|
||||||
|
// connection load across servers
|
||||||
|
consulServersRebalance
|
||||||
|
|
||||||
|
// consulServersRPCError is used to signal when a server has either
|
||||||
|
// timed out or returned an error and we would like to have the
|
||||||
|
// server manager find a new preferredServer.
|
||||||
|
consulServersRPCError
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// clientRPCJitterFraction determines the amount of jitter added to
|
||||||
|
// clientRPCMinReuseDuration before a connection is expired and a new
|
||||||
|
// connection is established in order to rebalance load across consul
|
||||||
|
// servers. The cluster-wide number of connections per second from
|
||||||
|
// rebalancing is applied after this jitter to ensure the CPU impact
|
||||||
|
// is always finite. See newRebalanceConnsPerSecPerServer's comment
|
||||||
|
// for additional commentary.
|
||||||
|
//
|
||||||
|
// For example, in a 10K consul cluster with 5x servers, this default
|
||||||
|
// averages out to ~13 new connections from rebalancing per server
|
||||||
|
// per second (each connection is reused for 120s to 180s).
|
||||||
|
clientRPCJitterFraction = 2
|
||||||
|
|
||||||
|
// clientRPCMinReuseDuration controls the minimum amount of time RPC
|
||||||
|
// queries are sent over an established connection to a single server
|
||||||
|
clientRPCMinReuseDuration = 120 * time.Second
|
||||||
|
|
||||||
|
// Limit the number of new connections a server receives per second
|
||||||
|
// for connection rebalancing. This limit caps the load caused by
|
||||||
|
// continual rebalancing efforts when a cluster is in equilibrium. A
|
||||||
|
// lower value comes at the cost of increased recovery time after a
|
||||||
|
// partition. This parameter begins to take effect when there are
|
||||||
|
// more than ~48K clients querying 5x servers or at lower server
|
||||||
|
// values when there is a partition.
|
||||||
|
//
|
||||||
|
// For example, in a 100K consul cluster with 5x servers, it will
|
||||||
|
// take ~5min for all servers to rebalance their connections. If
|
||||||
|
// 99,995 agents are in the minority talking to only one server, it
|
||||||
|
// will take ~26min for all servers to rebalance. A 10K cluster in
|
||||||
|
// the same scenario will take ~2.6min to rebalance.
|
||||||
|
newRebalanceConnsPerSecPerServer = 64
|
||||||
|
)
|
||||||
|
|
||||||
|
// serverCfg is the thread-safe configuration structure that is used to
|
||||||
|
// maintain the list of consul servers in Client.
|
||||||
|
//
|
||||||
|
// NOTE(sean@): We are explicitly relying on the fact that this is copied.
|
||||||
|
// Please keep this structure light.
|
||||||
|
type serverConfig struct {
|
||||||
|
// servers tracks the locally known servers
|
||||||
|
servers []*server_details.ServerDetails
|
||||||
|
|
||||||
|
// Timer used to control rebalancing of servers
|
||||||
|
rebalanceTimer *time.Timer
|
||||||
|
}
|
||||||
|
|
||||||
|
type ServerManager struct {
|
||||||
|
// serverConfig provides the necessary load/store semantics to
|
||||||
|
// serverConfig
|
||||||
|
serverConfigValue atomic.Value
|
||||||
|
serverConfigLock sync.Mutex
|
||||||
|
|
||||||
|
// consulServersCh is used to receive events related to the
|
||||||
|
// maintenance of the list of consulServers
|
||||||
|
consulServersCh chan consulServerEventTypes
|
||||||
|
|
||||||
|
// shutdownCh is a copy of the channel in consul.Client
|
||||||
|
shutdownCh chan struct{}
|
||||||
|
|
||||||
|
// Logger uses the provided LogOutput
|
||||||
|
logger *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
||||||
|
sm.serverConfigLock.Lock()
|
||||||
|
defer sm.serverConfigLock.Unlock()
|
||||||
|
serverCfg := sm.serverConfigValue.Load().(serverConfig)
|
||||||
|
|
||||||
|
// Check if this server is known
|
||||||
|
found := false
|
||||||
|
for idx, existing := range serverCfg.servers {
|
||||||
|
if existing.Name == server.Name {
|
||||||
|
// Overwrite the existing server parts in order to
|
||||||
|
// possibly update metadata (i.e. server version)
|
||||||
|
serverCfg.servers[idx] = server
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to the list if not known
|
||||||
|
if !found {
|
||||||
|
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers)+1)
|
||||||
|
copy(newServers, serverCfg.servers)
|
||||||
|
serverCfg.servers = newServers
|
||||||
|
|
||||||
|
// Notify the server maintenance task of a new server
|
||||||
|
sm.consulServersCh <- consulServersNodeJoin
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.serverConfigValue.Store(serverCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *ServerManager) CycleFailedServers() {
|
||||||
|
sm.serverConfigLock.Lock()
|
||||||
|
defer sm.serverConfigLock.Unlock()
|
||||||
|
serverCfg := sm.serverConfigValue.Load().(serverConfig)
|
||||||
|
|
||||||
|
for i := range serverCfg.servers {
|
||||||
|
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
|
||||||
|
if failCount == 0 {
|
||||||
|
break
|
||||||
|
} else if failCount > 0 {
|
||||||
|
serverCfg.servers = serverCfg.cycleServer()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serverCfg.resetRebalanceTimer(sm)
|
||||||
|
sm.serverConfigValue.Store(serverCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
|
||||||
|
numServers := len(servers)
|
||||||
|
if numServers < 2 {
|
||||||
|
// No action required for zero or one server situations
|
||||||
|
return servers
|
||||||
|
}
|
||||||
|
|
||||||
|
newServers := make([]*server_details.ServerDetails, len(servers)+1)
|
||||||
|
var dequeuedServer *server_details.ServerDetails
|
||||||
|
dequeuedServer, newServers = servers[0], servers[1:]
|
||||||
|
servers = append(newServers, dequeuedServer)
|
||||||
|
return servers
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetails) {
|
||||||
|
serverCfg := sm.getServerConfig()
|
||||||
|
numServers := len(serverCfg.servers)
|
||||||
|
if numServers == 0 {
|
||||||
|
sm.logger.Printf("[ERR] consul: No servers found in the server config")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the first non-failing server in the server list. If this is
|
||||||
|
// not the first server a prior RPC call marked the first server as
|
||||||
|
// failed and we're waiting for the server management task to reorder
|
||||||
|
// a working server to the front of the list.
|
||||||
|
for i := range serverCfg.servers {
|
||||||
|
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
|
||||||
|
if failCount == 0 {
|
||||||
|
server = serverCfg.servers[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return server
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *ServerManager) GetNumServers() (numServers int) {
|
||||||
|
serverCfg := sm.getServerConfig()
|
||||||
|
numServers = len(serverCfg.servers)
|
||||||
|
return numServers
|
||||||
|
}
|
||||||
|
|
||||||
|
// getServerConfig shorthand method to hide the locking semantics of
|
||||||
|
// atomic.Value
|
||||||
|
func (sm *ServerManager) getServerConfig() serverConfig {
|
||||||
|
return sm.serverConfigValue.Load().(serverConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServerManager is the only way to safely create a new ServerManager
|
||||||
|
// struct.
|
||||||
|
//
|
||||||
|
// NOTE(sean@): We don't simply pass in a consul.Client struct to avoid a
|
||||||
|
// cyclic import
|
||||||
|
func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
|
||||||
|
sm = new(ServerManager)
|
||||||
|
// Create the initial serverConfig
|
||||||
|
serverCfg := serverConfig{}
|
||||||
|
sm.logger = logger
|
||||||
|
sm.shutdownCh = shutdownCh
|
||||||
|
sm.serverConfigValue.Store(serverCfg)
|
||||||
|
return sm
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
||||||
|
atomic.AddUint64(&server.Disabled, 1)
|
||||||
|
sm.consulServersCh <- consulServersRPCError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *ServerManager) RebalanceServers() {
|
||||||
|
sm.serverConfigLock.Lock()
|
||||||
|
defer sm.serverConfigLock.Unlock()
|
||||||
|
serverCfg := sm.serverConfigValue.Load().(serverConfig)
|
||||||
|
|
||||||
|
// Shuffle the server list on server join. Servers are selected from
|
||||||
|
// the head of the list and are moved to the end of the list on
|
||||||
|
// failure.
|
||||||
|
for i := len(serverCfg.servers) - 1; i > 0; i-- {
|
||||||
|
j := rand.Int31n(int32(i + 1))
|
||||||
|
serverCfg.servers[i], serverCfg.servers[j] = serverCfg.servers[j], serverCfg.servers[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
serverCfg.resetRebalanceTimer(sm)
|
||||||
|
sm.serverConfigValue.Store(serverCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
||||||
|
sm.serverConfigLock.Lock()
|
||||||
|
defer sm.serverConfigLock.Unlock()
|
||||||
|
serverCfg := sm.serverConfigValue.Load().(serverConfig)
|
||||||
|
|
||||||
|
// Remove the server if known
|
||||||
|
n := len(serverCfg.servers)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
if serverCfg.servers[i].Name == server.Name {
|
||||||
|
serverCfg.servers[i], serverCfg.servers[n-1] = serverCfg.servers[n-1], nil
|
||||||
|
serverCfg.servers = serverCfg.servers[:n-1]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.serverConfigValue.Store(serverCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetRebalanceTimer assumes:
|
||||||
|
//
|
||||||
|
// 1) the serverConfigLock is already held by the caller.
|
||||||
|
// 2) the caller will call serverConfigValue.Store()
|
||||||
|
func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) {
|
||||||
|
numConsulServers := len(sc.servers)
|
||||||
|
// Limit this connection's life based on the size (and health) of the
|
||||||
|
// cluster. Never rebalance a connection more frequently than
|
||||||
|
// connReuseLowWatermarkDuration, and make sure we never exceed
|
||||||
|
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||||
|
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
||||||
|
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||||
|
numLANMembers := 16384 // Assume sufficiently large for now. FIXME: numLanMembers := len(c.LANMembers())
|
||||||
|
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
||||||
|
sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout)
|
||||||
|
|
||||||
|
sc.rebalanceTimer.Reset(connRebalanceTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartServerManager is used to start and manage the task of automatically
|
||||||
|
// shuffling and rebalance the list of consul servers. This maintenance
|
||||||
|
// happens either when a new server is added or when a duration has been
|
||||||
|
// exceed.
|
||||||
|
func (sm *ServerManager) StartServerManager() {
|
||||||
|
defaultTimeout := 5 * time.Second // FIXME(sean@): This is a bullshit value
|
||||||
|
var rebalanceTimer *time.Timer
|
||||||
|
func() {
|
||||||
|
sm.serverConfigLock.Lock()
|
||||||
|
defer sm.serverConfigLock.Unlock()
|
||||||
|
|
||||||
|
serverCfgPtr := sm.serverConfigValue.Load()
|
||||||
|
if serverCfgPtr == nil {
|
||||||
|
panic("server config has not been initialized")
|
||||||
|
}
|
||||||
|
var serverCfg serverConfig
|
||||||
|
serverCfg = serverCfgPtr.(serverConfig)
|
||||||
|
rebalanceTimer = time.NewTimer(defaultTimeout)
|
||||||
|
serverCfg.rebalanceTimer = rebalanceTimer
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e := <-sm.consulServersCh:
|
||||||
|
switch e {
|
||||||
|
case consulServersNodeJoin:
|
||||||
|
sm.logger.Printf("[INFO] consul: new node joined cluster")
|
||||||
|
sm.RebalanceServers()
|
||||||
|
case consulServersRebalance:
|
||||||
|
sm.logger.Printf("[INFO] consul: rebalancing servers by request")
|
||||||
|
sm.RebalanceServers()
|
||||||
|
case consulServersRPCError:
|
||||||
|
sm.logger.Printf("[INFO] consul: need to find a new server to talk with")
|
||||||
|
sm.CycleFailedServers()
|
||||||
|
// FIXME(sean@): wtb preemptive Status.Ping
|
||||||
|
// of servers, ideally parallel fan-out of N
|
||||||
|
// nodes, then settle on the first node which
|
||||||
|
// responds successfully.
|
||||||
|
//
|
||||||
|
// Is there a distinction between slow and
|
||||||
|
// offline? Do we run the Status.Ping with a
|
||||||
|
// fixed timeout (say 30s) that way we can
|
||||||
|
// alert administrators that they've set
|
||||||
|
// their RPC time too low even though the
|
||||||
|
// Ping did return successfully?
|
||||||
|
default:
|
||||||
|
sm.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
||||||
|
}
|
||||||
|
case <-rebalanceTimer.C:
|
||||||
|
sm.logger.Printf("[INFO] consul: server rebalance timeout")
|
||||||
|
sm.RebalanceServers()
|
||||||
|
|
||||||
|
case <-sm.shutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue