diff --git a/agent/agent.go b/agent/agent.go index 88dbf8d990..caf5150844 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -73,6 +73,7 @@ type delegate interface { SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error Shutdown() error Stats() map[string]map[string]string + ReloadConfig(config *consul.Config) error } // notifier is called after a successful JoinLAN. @@ -2480,8 +2481,8 @@ func (a *Agent) DisableNodeMaintenance() { } func (a *Agent) loadLimits(conf *config.RuntimeConfig) { - a.config.RPCRateLimit = conf.RPCRateLimit - a.config.RPCMaxBurst = conf.RPCMaxBurst + a.config.RPCRateLimit = conf.RPCRateLimit + a.config.RPCMaxBurst = conf.RPCMaxBurst } func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { @@ -2518,7 +2519,17 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { return fmt.Errorf("Failed reloading watches: %v", err) } - a.loadLimits(newCfg) + a.loadLimits(newCfg) + + // create the config for the rpc server/client + consulCfg, err := a.consulConfig() + if err != nil { + return err + } + + if err := a.delegate.ReloadConfig(consulCfg); err != nil { + return err + } // Update filtered metrics metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes) diff --git a/agent/consul/client.go b/agent/consul/client.go index 96baeb1748..84e1a73199 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -56,7 +57,7 @@ type Client struct { // rpcLimiter is used to rate limit the total number of RPCs initiated // from an agent. - rpcLimiter *rate.Limiter + rpcLimiter atomic.Value // eventCh is used to receive events from the // serf cluster in the datacenter @@ -125,12 +126,13 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) { c := &Client{ config: config, connPool: connPool, - rpcLimiter: rate.NewLimiter(config.RPCRate, config.RPCMaxBurst), eventCh: make(chan serf.Event, serfEventBacklog), logger: logger, shutdownCh: make(chan struct{}), } + c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) + // Initialize the LAN Serf c.serf, err = c.setupSerf(config.SerfLANConfig, c.eventCh, serfLANSnapshot) @@ -251,7 +253,7 @@ TRY: // Enforce the RPC limit. metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) metrics.IncrCounter([]string{"client", "rpc"}, 1) - if !c.rpcLimiter.Allow() { + if !c.rpcLimiter.Load().(*rate.Limiter).Allow() { metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) return structs.ErrRPCRateExceeded @@ -295,7 +297,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io // Enforce the RPC limit. metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) metrics.IncrCounter([]string{"client", "rpc"}, 1) - if !c.rpcLimiter.Allow() { + if !c.rpcLimiter.Load().(*rate.Limiter).Allow() { metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) return structs.ErrRPCRateExceeded @@ -360,3 +362,10 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) { cs := lib.CoordinateSet{c.config.Segment: lan} return cs, nil } + +// ReloadConfig is used to have the Client do an online reload of +// relevant configuration information +func (c *Client) ReloadConfig(config *Config) error { + c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) + return nil +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 128f67081c..b1aca96a62 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -1040,6 +1040,12 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { return cs, nil } +// ReloadConfig is used to have the Server do an online reload of +// relevant configuration information +func (s *Server) ReloadConfig(config *Config) error { + return nil +} + // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write func (s *Server) setConsistentReadReady() { atomic.StoreInt32(&s.readyForConsistentReads, 1)