From bcc9aea18f570040c8282bf5e6b25a596a4fa739 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 6 Dec 2017 17:06:08 -0800 Subject: [PATCH 1/2] Updates Serf to pull in new queue depth controls. --- .../github.com/hashicorp/serf/serf/config.go | 11 +++++++++ vendor/github.com/hashicorp/serf/serf/serf.go | 24 +++++++++++++++---- vendor/vendor.json | 2 +- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index 74f21ffbdf..ad4f51b18a 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -112,6 +112,10 @@ type Config struct { // node. FlapTimeout time.Duration + // QueueCheckInterval is the interval at which we check the message + // queue to apply the warning and max depth. + QueueCheckInterval time.Duration + // QueueDepthWarning is used to generate warning message if the // number of queued messages to broadcast exceeds this number. This // is to provide the user feedback if events are being triggered @@ -123,6 +127,12 @@ type Config struct { // prevent an unbounded growth of memory utilization MaxQueueDepth int + // MinQueueDepth, if >0 will enforce a lower limit for dropping messages + // and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This + // defaults to 0 which disables this dynamic sizing feature. If this is + // >0 then MaxQueueDepth will be ignored. + MinQueueDepth int + // RecentIntentTimeout is used to determine how long we store recent // join and leave intents. This is used to guard against the case where // Serf broadcasts an intent that arrives before the Memberlist event. @@ -253,6 +263,7 @@ func DefaultConfig() *Config { RecentIntentTimeout: 5 * time.Minute, ReconnectInterval: 30 * time.Second, ReconnectTimeout: 24 * time.Hour, + QueueCheckInterval: 30 * time.Second, QueueDepthWarning: 128, MaxQueueDepth: 4096, TombstoneTimeout: 24 * time.Hour, diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 9278850230..67440c1a2b 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -1515,21 +1515,37 @@ func (s *Serf) reconnect() { s.memberlist.Join([]string{addr.String()}) } +// getQueueMax will get the maximum queue depth, which might be dynamic depending +// on how Serf is configured. +func (s *Serf) getQueueMax() int { + max := s.config.MaxQueueDepth + if s.config.MinQueueDepth > 0 { + s.memberLock.RLock() + max = 2 * len(s.members) + s.memberLock.RUnlock() + + if max < s.config.MinQueueDepth { + max = s.config.MinQueueDepth + } + } + return max +} + // checkQueueDepth periodically checks the size of a queue to see if // it is too large func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) { for { select { - case <-time.After(time.Second): + case <-time.After(s.config.QueueCheckInterval): numq := queue.NumQueued() metrics.AddSample([]string{"serf", "queue", name}, float32(numq)) if numq >= s.config.QueueDepthWarning { s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq) } - if numq > s.config.MaxQueueDepth { + if max := s.getQueueMax(); numq > max { s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!", - name, numq, s.config.MaxQueueDepth) - queue.Prune(s.config.MaxQueueDepth) + name, numq, max) + queue.Prune(max) } case <-s.shutdownCh: return diff --git a/vendor/vendor.json b/vendor/vendor.json index 9093bc09e6..0c8c98be0e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -64,7 +64,7 @@ {"path":"github.com/hashicorp/raft","checksumSHA1":"JjJtGJi1ywWhVhs/PvTXxe4TeD8=","revision":"6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f","revisionTime":"2017-10-03T22:09:13Z","version":"v1.0.0","versionExact":"v1.0.0"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"mS15CkImPzXYsgNwl3Mt9Gh3Vb0=","comment":"v0.7.0-66-g6c4672d","revision":"c20a0b1b1ea9eb8168bcdec0116688fa9254e449","revisionTime":"2017-10-22T02:00:50Z"}, - {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"iYhCWgAAUcQjU0JocsKgak5C8tY=","comment":"v0.7.0-66-g6c4672d","revision":"c20a0b1b1ea9eb8168bcdec0116688fa9254e449","revisionTime":"2017-10-22T02:00:50Z"}, + {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"NegZzEwYOlfkbvy+jTBkX3OBcRM=","comment":"v0.7.0-66-g6c4672d","revision":"a110af454b635c75adc2b7eee541af2c68666d97","revisionTime":"2017-12-07T01:04:04Z"}, {"path":"github.com/hashicorp/yamux","checksumSHA1":"ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=","revision":"d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd","revisionTime":"2016-07-20T23:31:40Z"}, {"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"}, {"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"}, From 5065f3d82efca167136ff250450d819e8416911a Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 7 Dec 2017 16:27:06 -0800 Subject: [PATCH 2/2] Turns of intent queue warnings and enables dynamic queue sizing. --- agent/consul/config.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/agent/consul/config.go b/agent/consul/config.go index c37dd8d67d..d0b26d93b6 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -376,6 +376,21 @@ func (c *Config) CheckACL() error { return nil } +// SerfDefaultConfig returns a Consul-flavored Serf default configuration, +// suitable as a basis for a LAN, WAN, segment, or area. +func SerfDefaultConfig() *serf.Config { + base := serf.DefaultConfig() + + // This effectively disables the annoying queue depth warnings. + base.QueueDepthWarning = 1000000 + + // This enables dynamic sizing of the message queue depth based on the + // cluster size. + base.MinQueueDepth = 4096 + + return base +} + // DefaultConfig returns a sane default configuration. func DefaultConfig() *Config { hostname, err := os.Hostname() @@ -389,8 +404,8 @@ func DefaultConfig() *Config { NodeName: hostname, RPCAddr: DefaultRPCAddr, RaftConfig: raft.DefaultConfig(), - SerfLANConfig: serf.DefaultConfig(), - SerfWANConfig: serf.DefaultConfig(), + SerfLANConfig: SerfDefaultConfig(), + SerfWANConfig: SerfDefaultConfig(), SerfFloodInterval: 60 * time.Second, ReconcileInterval: 60 * time.Second, ProtocolVersion: ProtocolVersion2Compatible,