From ff25d033a69178fef3e2894c8fbd49b01123e5c7 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 24 Feb 2016 15:38:03 -0800 Subject: [PATCH 1/2] Revert "Merge pull request #1667 from hashicorp/b-redistribute-clients" This reverts commit 8f30dea4209491ebbe4ef9ab94dd8052d17bdbe9, reversing changes made to eb27a02956e7e052c0bec6f96a0c0f7f6675f6a6. --- consul/client.go | 94 +++++++----------------------------------------- 1 file changed, 13 insertions(+), 81 deletions(-) diff --git a/consul/client.go b/consul/client.go index e7155a1444..b3fdb080e8 100644 --- a/consul/client.go +++ b/consul/client.go @@ -12,51 +12,14 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) const ( - // clientRPCMinReuseDuration controls the minimum amount of time RPC - // queries are sent over an established connection to a single server - clientRPCMinReuseDuration = 120 * time.Second - - // clientRPCJitterFraction determines the amount of jitter added to - // clientRPCMinReuseDuration before a connection is expired and a new - // connection is established in order to rebalance load across consul - // servers. The cluster-wide number of connections per second from - // rebalancing is applied after this jitter to ensure the CPU impact - // is always finite. See newRebalanceConnsPerSecPerServer's comment - // for additional commentary. - // - // For example, in a 10K consul cluster with 5x servers, this default - // averages out to ~13 new connections from rebalancing per server - // per second (each connection is reused for 120s to 180s). - clientRPCJitterFraction = 2 - - // Limit the number of new connections a server receives per second - // for connection rebalancing. This limit caps the load caused by - // continual rebalancing efforts when a cluster is in equilibrium. A - // lower value comes at the cost of increased recovery time after a - // partition. This parameter begins to take effect when there are - // more than ~48K clients querying 5x servers or at lower server - // values when there is a partition. - // - // For example, in a 100K consul cluster with 5x servers, it will - // take ~5min for all servers to rebalance their connections. If - // 99,995 agents are in the minority talking to only one server, it - // will take ~26min for all servers to rebalance. A 10K cluster in - // the same scenario will take ~2.6min to rebalance. - newRebalanceConnsPerSecPerServer = 64 - - // clientRPCConnMaxIdle controls how long we keep an idle connection - // open to a server. 127s was chosen as the first prime above 120s - // (arbitrarily chose to use a prime) with the intent of reusing - // connections who are used by once-a-minute cron(8) jobs *and* who - // use a 60s jitter window (e.g. in vixie cron job execution can - // drift by up to 59s per job, or 119s for a once-a-minute cron job). - clientRPCConnMaxIdle = 127 * time.Second + // clientRPCCache controls how long we keep an idle connection + // open to a server + clientRPCCache = 30 * time.Second // clientMaxStreams controls how many idle streams we keep // open to a server @@ -93,10 +56,6 @@ type Client struct { lastServer *serverParts lastRPCTime time.Time - // connRebalanceTime is the time at which we should change the server - // we query for RPC requests. - connRebalanceTime time.Time - // Logger uses the provided LogOutput logger *log.Logger @@ -144,7 +103,7 @@ func NewClient(config *Config) (*Client, error) { // Create server c := &Client{ config: config, - connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap), + connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), eventCh: make(chan serf.Event, 256), logger: logger, shutdownCh: make(chan struct{}), @@ -369,64 +328,37 @@ func (c *Client) localEvent(event serf.UserEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { - // Check to make sure we haven't spent too much time querying a - // single server - now := time.Now() - if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) { - c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr) - c.lastServer = nil - } - - // Allocate these vars on the stack before the goto - var numConsulServers int - var clusterWideRebalanceConnsPerSec float64 - var connReuseLowWaterMark time.Duration - var numLANMembers int - - // Check the last RPC time, continue to reuse cached connection for - // up to clientRPCMinReuseDuration unless exceeded - // clientRPCConnMaxIdle - lastRPCTime := now.Sub(c.lastRPCTime) + // Check the last rpc time var server *serverParts - if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle { + if time.Now().Sub(c.lastRPCTime) < clientRPCCache { server = c.lastServer - goto TRY_RPC + if server != nil { + goto TRY_RPC + } } // Bail if we can't find any servers c.consulLock.RLock() - numConsulServers = len(c.consuls) - if numConsulServers == 0 { + if len(c.consuls) == 0 { c.consulLock.RUnlock() return structs.ErrNoServers } // Select a random addr - server = c.consuls[rand.Int31n(int32(numConsulServers))] + server = c.consuls[rand.Int31()%int32(len(c.consuls))] c.consulLock.RUnlock() - // Limit this connection's life based on the size (and health) of the - // cluster. Never rebalance a connection more frequently than - // connReuseLowWaterMark, and make sure we never exceed - // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. - clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer) - connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers = len(c.LANMembers()) - c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers)) - c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", server.Addr, c.connRebalanceTime) - // Forward to remote Consul TRY_RPC: if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { - c.connRebalanceTime = time.Time{} - c.lastRPCTime = time.Time{} c.lastServer = nil + c.lastRPCTime = time.Time{} return err } // Cache the last server c.lastServer = server - c.lastRPCTime = now + c.lastRPCTime = time.Now() return nil } From 93a9ed1a792502cd06612c7139011492037c5405 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 24 Feb 2016 15:39:58 -0800 Subject: [PATCH 2/2] Reverts the change log for GH-1667. --- CHANGELOG.md | 8 -------- 1 file changed, 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b62c172b41..1121f49a78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,14 +14,6 @@ BACKWARDS INCOMPATIBILITIES: IMPROVEMENTS: -* Consul agents will now periodically reconnect to available Consul servers - in order to redistribute their RPC query load. Consul clients will, by - default, attempt to establish a new connection every 120s to 180s, however - the rate at which agents begin to query new servers is proportional to the - size of the Consul cluster (servers should never receive more than 64 new - connections per second per Consul server as a result of rebalancing). - Clusters in stable environments who use `allow_stale` should see a more - even distribution of query load across all of their Consul servers. [GH-1667] * Added a new `disable_hostname` configuration option to control whether Consul's runtime telemetry gets prepended with the host name. All of the telemetry configuration has also been moved to a `telemetry` nested structure, but the old