From ad65d953f6c7b5039aa795e7640c5eece0d6f5cf Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 30 Jun 2015 12:02:05 -0700 Subject: [PATCH] Scales coordinate sends to hit a fixed aggregate rate across the cluster. --- command/agent/agent.go | 4 ++- command/agent/config.go | 48 ++++++++++++++++++++++------------- command/agent/local_test.go | 5 ++-- command/agent/util.go | 11 ++++++++ command/agent/util_test.go | 23 +++++++++++++++++ consul/config.go | 10 +++----- consul/coordinate_endpoint.go | 2 +- 7 files changed, 75 insertions(+), 28 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 573d78a61b..fd712f6a69 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -566,7 +566,9 @@ func (a *Agent) ResumeSync() { // to the server. Closing the agent's shutdownChannel will cause this to exit. func (a *Agent) sendCoordinate() { for { - intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers())) + rate := a.config.SyncCoordinateRateTarget + min := a.config.SyncCoordinateIntervalMin + intv := rateScaledInterval(rate, min, len(a.LANMembers())) intv = intv + randomStagger(intv) select { diff --git a/command/agent/config.go b/command/agent/config.go index 831ffa0286..72fe104ab1 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -375,12 +375,17 @@ type Config struct { // DisableCoordinates controls features related to network coordinates. DisableCoordinates bool `mapstructure:"disable_coordinates" json:"-"` - // SyncCoordinateInterval controls the interval for sending network - // coordinates to the server. Defaults to every 20s, but scales up as - // the number of nodes increases in the network, to prevent servers from - // being overwhelmed. If you update this, you may need to adjust the - // tuning of CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize. - SyncCoordinateInterval time.Duration `mapstructure:"-" json:"-"` + // SyncCoordinateRateTarget controls the rate for sending network + // coordinates to the server, in updates per second. This is the max rate + // that the server supports, so we scale our interval based on the size + // of the cluster to try to achieve this in aggregate at the server. + SyncCoordinateRateTarget float64 `mapstructure:"-" json:"-"` + + // SyncCoordinateIntervalMin sets the minimum interval that coordinates + // will be sent to the server. We scale the interval based on the cluster + // size, but below a certain interval it doesn't make sense send them any + // faster. + SyncCoordinateIntervalMin time.Duration `mapstructure:"-" json:"-"` // Checks holds the provided check definitions Checks []*CheckDefinition `mapstructure:"-" json:"-"` @@ -471,18 +476,25 @@ func DefaultConfig() *Config { DNSConfig: DNSConfig{ MaxStale: 5 * time.Second, }, - StatsitePrefix: "consul", - SyslogFacility: "LOCAL0", - Protocol: consul.ProtocolVersionMax, - CheckUpdateInterval: 5 * time.Minute, - AEInterval: time.Minute, - DisableCoordinates: false, - SyncCoordinateInterval: 20 * time.Second, - ACLTTL: 30 * time.Second, - ACLDownPolicy: "extend-cache", - ACLDefaultPolicy: "allow", - RetryInterval: 30 * time.Second, - RetryIntervalWan: 30 * time.Second, + StatsitePrefix: "consul", + SyslogFacility: "LOCAL0", + Protocol: consul.ProtocolVersionMax, + CheckUpdateInterval: 5 * time.Minute, + AEInterval: time.Minute, + DisableCoordinates: false, + + // SyncCoordinateRateTarget is set based on the rate that we want + // the server to handle as an aggregate across the entire cluster. + // If you update this, you'll need to adjust CoordinateUpdate* in + // the server-side config accordingly. + SyncCoordinateRateTarget: 100.0, // updates / second + SyncCoordinateIntervalMin: 5 * time.Second, + + ACLTTL: 30 * time.Second, + ACLDownPolicy: "extend-cache", + ACLDefaultPolicy: "allow", + RetryInterval: 30 * time.Second, + RetryIntervalWan: 30 * time.Second, } } diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 98871a9fce..9891abdd6e 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -806,9 +806,10 @@ func TestAgent_nestedPauseResume(t *testing.T) { func TestAgent_sendCoordinate(t *testing.T) { conf := nextConfig() - conf.SyncCoordinateInterval = 10 * time.Millisecond + conf.SyncCoordinateRateTarget = 10.0 // updates/sec + conf.SyncCoordinateIntervalMin = 1 * time.Millisecond conf.ConsulConfig.CoordinateUpdatePeriod = 100 * time.Millisecond - conf.ConsulConfig.CoordinateUpdateBatchSize = 15 + conf.ConsulConfig.CoordinateUpdateBatchSize = 10 conf.ConsulConfig.CoordinateUpdateMaxBatches = 1 dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) diff --git a/command/agent/util.go b/command/agent/util.go index a749836ff5..075cf0c730 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -39,6 +39,17 @@ func aeScale(interval time.Duration, n int) time.Duration { return time.Duration(multiplier) * interval } +// rateScaledInterval is used to choose an interval to perform an action in order +// to target an aggregate number of actions per second across the whole cluster. +func rateScaledInterval(rate float64, min time.Duration, n int) time.Duration { + interval := time.Duration(float64(time.Second) * float64(n) / rate) + if interval < min { + return min + } + + return interval +} + // Returns a random stagger interval between 0 and the duration func randomStagger(intv time.Duration) time.Duration { return time.Duration(uint64(rand.Int63()) % uint64(intv)) diff --git a/command/agent/util_test.go b/command/agent/util_test.go index c2442c289c..9466ced442 100644 --- a/command/agent/util_test.go +++ b/command/agent/util_test.go @@ -24,6 +24,29 @@ func TestAEScale(t *testing.T) { } } +func TestRateScaledInterval(t *testing.T) { + min := 1*time.Second + rate := 200.0 + if v := rateScaledInterval(rate, min, 0); v != min { + t.Fatalf("Bad: %v", v) + } + if v := rateScaledInterval(rate, min, 100); v != min { + t.Fatalf("Bad: %v", v) + } + if v := rateScaledInterval(rate, min, 200); v != 1*time.Second { + t.Fatalf("Bad: %v", v) + } + if v := rateScaledInterval(rate, min, 1000); v != 5*time.Second { + t.Fatalf("Bad: %v", v) + } + if v := rateScaledInterval(rate, min, 5000); v != 25*time.Second { + t.Fatalf("Bad: %v", v) + } + if v := rateScaledInterval(rate, min, 10000); v != 50*time.Second { + t.Fatalf("Bad: %v", v) + } +} + func TestRandomStagger(t *testing.T) { intv := time.Minute for i := 0; i < 10; i++ { diff --git a/consul/config.go b/consul/config.go index 6da921edd3..b72f8807db 100644 --- a/consul/config.go +++ b/consul/config.go @@ -276,13 +276,11 @@ func DefaultConfig() *Config { SessionTTLMin: 10 * time.Second, DisableCoordinates: false, - // SyncCoordinateInterval defaults to 20 seconds, and scales up - // as the number of nodes in the cluster goes up. For 100k nodes, - // it will move up to 201 seconds, which gives an update rate of - // just under 500 per second coming in from the clients. With this - // tuning we will be doing 1 transaction per second at this load. + // These are tuned to provide a total throughput of 128 updates + // per second. If you update these, you should update the client- + // side SyncCoordinateRateTarget parameter accordingly. CoordinateUpdatePeriod: 5 * time.Second, - CoordinateUpdateBatchSize: 512, + CoordinateUpdateBatchSize: 128, CoordinateUpdateMaxBatches: 5, } diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index f765a98d15..ddb8ecb3bf 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -60,7 +60,7 @@ func (c *Coordinate) batchApplyUpdates() error { limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches size := len(pending) if size > limit { - c.srv.logger.Printf("[ERR] consul.coordinate: Discarded %d coordinate updates; increase SyncCoordinateInterval", size - limit) + c.srv.logger.Printf("[WARN] consul.coordinate: Discarded %d coordinate updates", size - limit) size = limit }