Merge pull request #1667 from hashicorp/b-redistribute-clients

Continually redistribute client RPC connections
This commit is contained in:
sean- 2016-02-02 11:15:19 -08:00
commit 8f30dea420
1 changed files with 81 additions and 13 deletions

View File

@ -12,14 +12,51 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
const ( const (
// clientRPCCache controls how long we keep an idle connection // clientRPCMinReuseDuration controls the minimum amount of time RPC
// open to a server // queries are sent over an established connection to a single server
clientRPCCache = 30 * time.Second clientRPCMinReuseDuration = 120 * time.Second
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
// connection is established in order to rebalance load across consul
// servers. The cluster-wide number of connections per second from
// rebalancing is applied after this jitter to ensure the CPU impact
// is always finite. See newRebalanceConnsPerSecPerServer's comment
// for additional commentary.
//
// For example, in a 10K consul cluster with 5x servers, this default
// averages out to ~13 new connections from rebalancing per server
// per second (each connection is reused for 120s to 180s).
clientRPCJitterFraction = 2
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
// lower value comes at the cost of increased recovery time after a
// partition. This parameter begins to take effect when there are
// more than ~48K clients querying 5x servers or at lower server
// values when there is a partition.
//
// For example, in a 100K consul cluster with 5x servers, it will
// take ~5min for all servers to rebalance their connections. If
// 99,995 agents are in the minority talking to only one server, it
// will take ~26min for all servers to rebalance. A 10K cluster in
// the same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
// clientRPCConnMaxIdle controls how long we keep an idle connection
// open to a server. 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
clientRPCConnMaxIdle = 127 * time.Second
// clientMaxStreams controls how many idle streams we keep // clientMaxStreams controls how many idle streams we keep
// open to a server // open to a server
@ -56,6 +93,10 @@ type Client struct {
lastServer *serverParts lastServer *serverParts
lastRPCTime time.Time lastRPCTime time.Time
// connRebalanceTime is the time at which we should change the server
// we query for RPC requests.
connRebalanceTime time.Time
// Logger uses the provided LogOutput // Logger uses the provided LogOutput
logger *log.Logger logger *log.Logger
@ -103,7 +144,7 @@ func NewClient(config *Config) (*Client, error) {
// Create server // Create server
c := &Client{ c := &Client{
config: config, config: config,
connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, 256), eventCh: make(chan serf.Event, 256),
logger: logger, logger: logger,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
@ -328,37 +369,64 @@ func (c *Client) localEvent(event serf.UserEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers // RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error { func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Check the last rpc time // Check to make sure we haven't spent too much time querying a
var server *serverParts // single server
if time.Now().Sub(c.lastRPCTime) < clientRPCCache { now := time.Now()
server = c.lastServer if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) {
if server != nil { c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr)
goto TRY_RPC c.lastServer = nil
} }
// Allocate these vars on the stack before the goto
var numConsulServers int
var clusterWideRebalanceConnsPerSec float64
var connReuseLowWaterMark time.Duration
var numLANMembers int
// Check the last RPC time, continue to reuse cached connection for
// up to clientRPCMinReuseDuration unless exceeded
// clientRPCConnMaxIdle
lastRPCTime := now.Sub(c.lastRPCTime)
var server *serverParts
if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle {
server = c.lastServer
goto TRY_RPC
} }
// Bail if we can't find any servers // Bail if we can't find any servers
c.consulLock.RLock() c.consulLock.RLock()
if len(c.consuls) == 0 { numConsulServers = len(c.consuls)
if numConsulServers == 0 {
c.consulLock.RUnlock() c.consulLock.RUnlock()
return structs.ErrNoServers return structs.ErrNoServers
} }
// Select a random addr // Select a random addr
server = c.consuls[rand.Int31()%int32(len(c.consuls))] server = c.consuls[rand.Int31n(int32(numConsulServers))]
c.consulLock.RUnlock() c.consulLock.RUnlock()
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWaterMark, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers = len(c.LANMembers())
c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers))
c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", c.lastServer.Addr, c.connRebalanceTime)
// Forward to remote Consul // Forward to remote Consul
TRY_RPC: TRY_RPC:
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
c.lastServer = nil c.connRebalanceTime = time.Time{}
c.lastRPCTime = time.Time{} c.lastRPCTime = time.Time{}
c.lastServer = nil
return err return err
} }
// Cache the last server // Cache the last server
c.lastServer = server c.lastServer = server
c.lastRPCTime = time.Now() c.lastRPCTime = now
return nil return nil
} }