mirror of https://github.com/status-im/consul.git
connect: tame thundering herd of CSRs on CA rotation (#5228)
* Support rate limiting and concurrency limiting CSR requests on servers; handle CA rotations gracefully with jitter and backoff-on-rate-limit in client * Add CSR rate limiting docs * Fix config naming and add tests for new CA configs
This commit is contained in:
parent
3e45da1414
commit
ef9f27cbc8
|
@ -1084,15 +1084,12 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
|
|||
|
||||
if a.config.ConnectCAProvider != "" {
|
||||
base.CAConfig.Provider = a.config.ConnectCAProvider
|
||||
}
|
||||
|
||||
// Merge with the default config if it's the consul provider.
|
||||
if a.config.ConnectCAProvider == "consul" {
|
||||
for k, v := range a.config.ConnectCAConfig {
|
||||
base.CAConfig.Config[k] = v
|
||||
}
|
||||
} else {
|
||||
base.CAConfig.Config = a.config.ConnectCAConfig
|
||||
}
|
||||
// Merge connect CA Config regardless of provider (since there are some
|
||||
// common config options valid to all like leaf TTL).
|
||||
for k, v := range a.config.ConnectCAConfig {
|
||||
base.CAConfig.Config[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,21 +12,38 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// Recommended name for registration.
|
||||
const ConnectCALeafName = "connect-ca-leaf"
|
||||
|
||||
// caChangeInitialSpreadDefault is the jitter we apply after noticing the CA
|
||||
// changed before requesting a new cert. Since we don't know how many services
|
||||
// are in the cluster we can't be too smart about setting this so it's a
|
||||
// tradeoff between not making root rotations take unnecessarily long on small
|
||||
// clusters and not hammering the servers to hard on large ones. Note that
|
||||
// server's will soon have CSR rate limiting that will limit the impact on big
|
||||
// clusters, but a small spread in the initial requests still seems like a good
|
||||
// idea and limits how many clients will hit the rate limit.
|
||||
const caChangeInitialSpreadDefault = 20 * time.Second
|
||||
// caChangeJitterWindow is the time over which we spread each round of retries
|
||||
// when attempting to get a new certificate following a root rotation. It's
|
||||
// selected to be a trade-off between not making rotation unnecessarily slow on
|
||||
// a tiny cluster while not hammering the servers on a huge cluster
|
||||
// unnecessarily hard. Servers rate limit to protect themselves from the
|
||||
// expensive crypto work, but in practice have 10k+ RPCs all in the same second
|
||||
// will cause a major disruption even on large servers due to downloading the
|
||||
// payloads, parsing msgpack etc. Instead we pick a window that for now is fixed
|
||||
// but later might be either user configurable (not nice since it would become
|
||||
// another hard-to-tune value) or set dynamically by the server based on it's
|
||||
// knowledge of how many certs need to be rotated. Currently the server doesn't
|
||||
// know that so we pick something that is reasonable. We err on the side of
|
||||
// being slower that we need in trivial cases but gentler for large deployments.
|
||||
// 30s means that even with a cluster of 10k service instances, the server only
|
||||
// has to cope with ~333 RPCs a second which shouldn't be too bad if it's rate
|
||||
// limiting the actual expensive crypto work.
|
||||
//
|
||||
// The actual backoff strategy when we are rate limited is to have each cert
|
||||
// only retry once with each window of this size, at a point in the window
|
||||
// selected at random. This performs much better than exponential backoff in
|
||||
// terms of getting things rotated quickly with more predictable load and so
|
||||
// fewer rate limited requests. See the full simulation this is based on at
|
||||
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md for
|
||||
// more detail.
|
||||
const caChangeJitterWindow = 30 * time.Second
|
||||
|
||||
// ConnectCALeaf supports fetching and generating Connect leaf
|
||||
// certificates.
|
||||
|
@ -75,6 +92,9 @@ type ConnectCALeaf struct {
|
|||
// Fetch. Pointers themselves are OK, but if we point to another struct that we
|
||||
// call a method or modify in some way that would directly mutate the cache and
|
||||
// cause problems. We'd need to deep-clone in that case in Fetch below.
|
||||
// time.Time technically contains a pointer to the Location but we ignore that
|
||||
// since all times we get from our wall clock should point to the same Location
|
||||
// anyway.
|
||||
type fetchState struct {
|
||||
// authorityKeyID is the key ID of the CA root that signed the current cert.
|
||||
// This is just to save parsing the whole cert everytime we have to check if
|
||||
|
@ -84,6 +104,18 @@ type fetchState struct {
|
|||
// forceExpireAfter is used to coordinate renewing certs after a CA rotation
|
||||
// in a staggered way so that we don't overwhelm the servers.
|
||||
forceExpireAfter time.Time
|
||||
|
||||
// activeRootRotationStart is set when the root has changed and we need to get
|
||||
// a new cert but haven't got one yet. forceExpireAfter will be set to the
|
||||
// next scheduled time we should try our CSR, but this is needed to calculate
|
||||
// the retry windows if we are rate limited when we try. See comment on
|
||||
// caChangeJitterWindow above for more.
|
||||
activeRootRotationStart time.Time
|
||||
|
||||
// consecutiveRateLimitErrs stores how many rate limit errors we've hit. We
|
||||
// use this to choose a new window for the next retry. See comment on
|
||||
// caChangeJitterWindow above for more.
|
||||
consecutiveRateLimitErrs int
|
||||
}
|
||||
|
||||
// fetchStart is called on each fetch that is about to block and wait for
|
||||
|
@ -277,22 +309,44 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
|||
existing, ok = opts.LastResult.Value.(*structs.IssuedCert)
|
||||
if !ok {
|
||||
return result, fmt.Errorf(
|
||||
"Internal cache failure: last value wrong type: %T", req)
|
||||
"Internal cache failure: last value wrong type: %T", opts.LastResult.Value)
|
||||
}
|
||||
state, ok = opts.LastResult.State.(fetchState)
|
||||
if !ok {
|
||||
return result, fmt.Errorf(
|
||||
"Internal cache failure: last state wrong type: %T", req)
|
||||
if opts.LastResult.State != nil {
|
||||
state, ok = opts.LastResult.State.(fetchState)
|
||||
if !ok {
|
||||
return result, fmt.Errorf(
|
||||
"Internal cache failure: last state wrong type: %T", opts.LastResult.State)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
state = fetchState{}
|
||||
}
|
||||
|
||||
// Handle brand new request first as it's simplest.
|
||||
if existing == nil {
|
||||
return c.generateNewLeaf(reqReal, &state)
|
||||
return c.generateNewLeaf(reqReal, result)
|
||||
}
|
||||
|
||||
// Setup result to mirror the current value for if we timeout or hit a rate
|
||||
// limit. This allows us to update the state (e.g. for backoff or retry
|
||||
// coordination on root change) even if we don't get a new cert.
|
||||
result.Value = existing
|
||||
result.Index = existing.ModifyIndex
|
||||
result.State = state
|
||||
|
||||
// Since state is not a pointer, we can't just set it once in result and then
|
||||
// continue to update it later since we will be updating only our copy.
|
||||
// Instead we have a helper function that is used to make sure the state is
|
||||
// updated in the result when we return.
|
||||
lastResultWithNewState := func() cache.FetchResult {
|
||||
return cache.FetchResult{
|
||||
Value: existing,
|
||||
Index: existing.ModifyIndex,
|
||||
State: state,
|
||||
}
|
||||
}
|
||||
|
||||
// Beyond this point we need to only return lastResultWithNewState() not just
|
||||
// result since otherwise we might "loose" state updates we expect not to.
|
||||
|
||||
// We have a certificate in cache already. Check it's still valid.
|
||||
now := time.Now()
|
||||
minExpire, maxExpire := calculateSoftExpiry(now, existing)
|
||||
|
@ -306,7 +360,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
|||
|
||||
if expiresAt == now || expiresAt.Before(now) {
|
||||
// Already expired, just make a new one right away
|
||||
return c.generateNewLeaf(reqReal, &state)
|
||||
return c.generateNewLeaf(reqReal, lastResultWithNewState())
|
||||
}
|
||||
|
||||
// We are about to block and wait for a change or timeout.
|
||||
|
@ -318,16 +372,18 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
|||
// reload latest CA from cache.
|
||||
rootUpdateCh := make(chan struct{}, 1)
|
||||
|
||||
// The roots may have changed in between blocking calls. We need to verify
|
||||
// that the existing cert was signed by the current root. If it was we still
|
||||
// want to do the whole jitter thing. We could code that again here but it's
|
||||
// identical to the select case below so we just trigger our own update chan
|
||||
// and let the logic below handle checking if the CA actually changed in the
|
||||
// common case where it didn't it is a no-op anyway.
|
||||
rootUpdateCh <- struct{}{}
|
||||
|
||||
// Subscribe our chan to get root update notification.
|
||||
c.fetchStart(rootUpdateCh)
|
||||
defer c.fetchDone(rootUpdateCh)
|
||||
|
||||
// Setup result to mirror the current value for if we timeout. This allows us
|
||||
// to update the state even if we don't generate a new cert.
|
||||
result.Value = existing
|
||||
result.Index = existing.ModifyIndex
|
||||
result.State = state
|
||||
|
||||
// Setup the timeout chan outside the loop so we don't keep bumping the timout
|
||||
// later if we loop around.
|
||||
timeoutCh := time.After(opts.Timeout)
|
||||
|
@ -341,31 +397,35 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
|||
select {
|
||||
case <-timeoutCh:
|
||||
// We timed out the request with same cert.
|
||||
return result, nil
|
||||
return lastResultWithNewState(), nil
|
||||
|
||||
case <-expiresCh:
|
||||
// Cert expired or was force-expired by a root change.
|
||||
return c.generateNewLeaf(reqReal, &state)
|
||||
return c.generateNewLeaf(reqReal, lastResultWithNewState())
|
||||
|
||||
case <-rootUpdateCh:
|
||||
// A root cache change occurred, reload roots from cache.
|
||||
roots, err := c.rootsFromCache()
|
||||
if err != nil {
|
||||
return result, err
|
||||
return lastResultWithNewState(), err
|
||||
}
|
||||
|
||||
// Handle _possibly_ changed roots. We still need to verify the new active
|
||||
// root is not the same as the one our current cert was signed by since we
|
||||
// can be notified spuriously if we are the first request since the
|
||||
// rootsWatcher didn't know about the CA we were signed by.
|
||||
// rootsWatcher didn't know about the CA we were signed by. We also rely
|
||||
// on this on every request to do the initial check that the current roots
|
||||
// are the same ones the current cert was signed by.
|
||||
if activeRootHasKey(roots, state.authorityKeyID) {
|
||||
// Current active CA is the same one that signed our current cert so
|
||||
// keep waiting for a change.
|
||||
continue
|
||||
}
|
||||
state.activeRootRotationStart = time.Now()
|
||||
|
||||
// CA root changed. We add some jitter here to avoid a thundering herd.
|
||||
// See docs on caChangeInitialJitter const.
|
||||
delay := lib.RandomStagger(caChangeInitialSpreadDefault)
|
||||
// See docs on caChangeJitterWindow const.
|
||||
delay := lib.RandomStagger(caChangeJitterWindow)
|
||||
if c.TestOverrideCAChangeInitialDelay > 0 {
|
||||
delay = c.TestOverrideCAChangeInitialDelay
|
||||
}
|
||||
|
@ -374,7 +434,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
|
|||
// the cache state so the next request will notice we still need to renew
|
||||
// and do it at the right time. This is cleared once a new cert is
|
||||
// returned by generateNewLeaf.
|
||||
state.forceExpireAfter = time.Now().Add(delay)
|
||||
state.forceExpireAfter = state.activeRootRotationStart.Add(delay)
|
||||
// If the delay time is within the current timeout, we want to renew the
|
||||
// as soon as it's up. We change the expire time and chan so that when we
|
||||
// loop back around, we'll wait at most delay until generating a new cert.
|
||||
|
@ -416,9 +476,20 @@ func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) {
|
|||
}
|
||||
|
||||
// generateNewLeaf does the actual work of creating a new private key,
|
||||
// generating a CSR and getting it signed by the servers.
|
||||
func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchState) (cache.FetchResult, error) {
|
||||
var result cache.FetchResult
|
||||
// generating a CSR and getting it signed by the servers. result argument
|
||||
// represents the last result currently in cache if any along with it's state.
|
||||
func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest,
|
||||
result cache.FetchResult) (cache.FetchResult, error) {
|
||||
|
||||
var state fetchState
|
||||
if result.State != nil {
|
||||
var ok bool
|
||||
state, ok = result.State.(fetchState)
|
||||
if !ok {
|
||||
return result, fmt.Errorf(
|
||||
"Internal cache failure: result state wrong type: %T", result.State)
|
||||
}
|
||||
}
|
||||
|
||||
// Need to lookup RootCAs response to discover trust domain. This should be a
|
||||
// cache hit.
|
||||
|
@ -458,12 +529,55 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchS
|
|||
CSR: csr,
|
||||
}
|
||||
if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil {
|
||||
if err.Error() == consul.ErrRateLimited.Error() {
|
||||
if result.Value == nil {
|
||||
// This was a first fetch - we have no good value in cache. In this case
|
||||
// we just return the error to the caller rather than rely on surprising
|
||||
// semi-blocking until the rate limit is appeased or we timeout
|
||||
// behavior. It's likely the caller isn't expecting this to block since
|
||||
// it's an initial fetch. This also massively simplifies this edge case.
|
||||
return result, err
|
||||
}
|
||||
|
||||
if state.activeRootRotationStart.IsZero() {
|
||||
// We hit a rate limit error by chance - for example a cert expired
|
||||
// before the root rotation was observed (not triggered by rotation) but
|
||||
// while server is working through high load from a recent rotation.
|
||||
// Just pretend there is a rotation and the retry logic here will start
|
||||
// jittering and retrying in the same way from now.
|
||||
state.activeRootRotationStart = time.Now()
|
||||
}
|
||||
|
||||
// Increment the errors in the state
|
||||
state.consecutiveRateLimitErrs++
|
||||
|
||||
delay := lib.RandomStagger(caChangeJitterWindow)
|
||||
if c.TestOverrideCAChangeInitialDelay > 0 {
|
||||
delay = c.TestOverrideCAChangeInitialDelay
|
||||
}
|
||||
|
||||
// Find the start of the next window we can retry in. See comment on
|
||||
// caChangeJitterWindow for details of why we use this strategy.
|
||||
windowStart := state.activeRootRotationStart.Add(
|
||||
time.Duration(state.consecutiveRateLimitErrs) * delay)
|
||||
|
||||
// Pick a random time in that window
|
||||
state.forceExpireAfter = windowStart.Add(delay)
|
||||
|
||||
// Return a result with the existing cert but the new state - the cache
|
||||
// will see this as no change. Note that we always have an existing result
|
||||
// here due to the nil value check above.
|
||||
result.State = state
|
||||
return result, nil
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
reply.PrivateKeyPEM = pkPEM
|
||||
|
||||
// Reset the forcedExpiry in the state
|
||||
// Reset rotation state
|
||||
state.forceExpireAfter = time.Time{}
|
||||
state.consecutiveRateLimitErrs = 0
|
||||
state.activeRootRotationStart = time.Time{}
|
||||
|
||||
cert, err := connect.ParseCert(reply.CertPEM)
|
||||
if err != nil {
|
||||
|
@ -475,7 +589,7 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchS
|
|||
result.Value = &reply
|
||||
// Store value not pointer so we don't accidentally mutate the cache entry
|
||||
// state in Fetch.
|
||||
result.State = *state
|
||||
result.State = state
|
||||
result.Index = reply.ModifyIndex
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -162,17 +163,28 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
|
|||
QueryMeta: structs.QueryMeta{Index: 1},
|
||||
}
|
||||
|
||||
// We need this later but needs to be defined so we sign second CSR with it
|
||||
// otherwise we break the cert root checking.
|
||||
caRoot2 := connect.TestCA(t, nil)
|
||||
|
||||
// Instrument ConnectCA.Sign to return signed cert
|
||||
var resp *structs.IssuedCert
|
||||
var idx uint64
|
||||
|
||||
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
ca := caRoot
|
||||
cIdx := atomic.AddUint64(&idx, 1)
|
||||
if cIdx > 1 {
|
||||
// Second time round use the new CA
|
||||
ca = caRoot2
|
||||
}
|
||||
reply := args.Get(2).(*structs.IssuedCert)
|
||||
leaf, _ := connect.TestLeaf(t, "web", caRoot)
|
||||
leaf, _ := connect.TestLeaf(t, "web", ca)
|
||||
reply.CertPEM = leaf
|
||||
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
|
||||
reply.ValidBefore = time.Now().Add(11 * time.Hour)
|
||||
reply.CreateIndex = atomic.AddUint64(&idx, 1)
|
||||
reply.CreateIndex = cIdx
|
||||
reply.ModifyIndex = reply.CreateIndex
|
||||
resp = reply
|
||||
})
|
||||
|
@ -205,7 +217,6 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
|
|||
|
||||
// Let's send in new roots, which should trigger the sign req. We need to take
|
||||
// care to set the new root as active
|
||||
caRoot2 := connect.TestCA(t, nil)
|
||||
caRoot2.Active = true
|
||||
caRoot.Active = false
|
||||
rootsCh <- structs.IndexedCARoots{
|
||||
|
@ -225,6 +236,9 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
|
|||
require.Equal(resp, v.Value)
|
||||
// 3 since the second CA "update" used up 2
|
||||
require.Equal(uint64(3), v.Index)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
opts.MinIndex = 3
|
||||
}
|
||||
|
||||
// Third fetch should block
|
||||
|
@ -305,7 +319,14 @@ func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) {
|
|||
}
|
||||
|
||||
// Let's send in new roots, which should eventually trigger the sign req. We
|
||||
// need to take care to set the new root as active
|
||||
// need to take care to set the new root as active. Note that this is
|
||||
// implicitly testing that root updates that happen in between leaf blocking
|
||||
// queries are still noticed too. At this point no leaf blocking query is
|
||||
// running so the root watch should be stopped. By pushing this update, the
|
||||
// next blocking query will _immediately_ see the new root which means it
|
||||
// needs to correctly notice that it is not the same one that generated the
|
||||
// current cert and start the rotation. This is good, just not obvious that
|
||||
// the behavior is actually well tested here when it is.
|
||||
caRoot2 := connect.TestCA(t, nil)
|
||||
caRoot2.Active = true
|
||||
caRoot.Active = false
|
||||
|
@ -380,6 +401,291 @@ func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Tests that if the root changes in between blocking calls we still pick it up.
|
||||
func TestConnectCALeaf_changingRootsBetweenBlockingCalls(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
rpc := TestRPC(t)
|
||||
defer rpc.AssertExpectations(t)
|
||||
|
||||
typ, rootsCh := testCALeafType(t, rpc)
|
||||
defer close(rootsCh)
|
||||
|
||||
caRoot := connect.TestCA(t, nil)
|
||||
caRoot.Active = true
|
||||
rootsCh <- structs.IndexedCARoots{
|
||||
ActiveRootID: caRoot.ID,
|
||||
TrustDomain: "fake-trust-domain.consul",
|
||||
Roots: []*structs.CARoot{
|
||||
caRoot,
|
||||
},
|
||||
QueryMeta: structs.QueryMeta{Index: 1},
|
||||
}
|
||||
|
||||
// Instrument ConnectCA.Sign to return signed cert
|
||||
var resp *structs.IssuedCert
|
||||
var idx uint64
|
||||
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
reply := args.Get(2).(*structs.IssuedCert)
|
||||
leaf, _ := connect.TestLeaf(t, "web", caRoot)
|
||||
reply.CertPEM = leaf
|
||||
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
|
||||
reply.ValidBefore = time.Now().Add(11 * time.Hour)
|
||||
reply.CreateIndex = atomic.AddUint64(&idx, 1)
|
||||
reply.ModifyIndex = reply.CreateIndex
|
||||
resp = reply
|
||||
})
|
||||
|
||||
// We'll reuse the fetch options and request. Short timeout important since we
|
||||
// wait the full timeout before chaning roots.
|
||||
opts := cache.FetchOptions{MinIndex: 0, Timeout: 35 * time.Millisecond}
|
||||
req := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web"}
|
||||
|
||||
// First fetch should return immediately
|
||||
fetchCh := TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("shouldn't block waiting for fetch")
|
||||
case result := <-fetchCh:
|
||||
v := mustFetchResult(t, result)
|
||||
require.Equal(resp, v.Value)
|
||||
require.Equal(uint64(1), v.Index)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
}
|
||||
|
||||
// Next fetch should block for the full timeout
|
||||
start := time.Now()
|
||||
fetchCh = TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("shouldn't block for too long waiting for fetch")
|
||||
case result := <-fetchCh:
|
||||
v := mustFetchResult(t, result)
|
||||
require.Equal(resp, v.Value)
|
||||
// Still the initial cached result
|
||||
require.Equal(uint64(1), v.Index)
|
||||
// Sanity check that it waited
|
||||
require.True(time.Since(start) > opts.Timeout)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
}
|
||||
|
||||
// No active requests, simulate root change now
|
||||
caRoot2 := connect.TestCA(t, nil)
|
||||
caRoot2.Active = true
|
||||
caRoot.Active = false
|
||||
rootsCh <- structs.IndexedCARoots{
|
||||
ActiveRootID: caRoot2.ID,
|
||||
TrustDomain: "fake-trust-domain.consul",
|
||||
Roots: []*structs.CARoot{
|
||||
caRoot2,
|
||||
caRoot,
|
||||
},
|
||||
QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)},
|
||||
}
|
||||
earliestRootDelivery := time.Now()
|
||||
|
||||
// We should get the new cert immediately on next fetch (since test override
|
||||
// root change jitter to be 1 nanosecond so no delay expected).
|
||||
fetchCh = TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("shouldn't block too long waiting for fetch")
|
||||
case result := <-fetchCh:
|
||||
v := mustFetchResult(t, result)
|
||||
require.Equal(resp, v.Value)
|
||||
// Index should be 3 since root change consumed 2
|
||||
require.Equal(uint64(3), v.Index)
|
||||
// Sanity check that we didn't wait too long
|
||||
require.True(time.Since(earliestRootDelivery) < opts.Timeout)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestConnectCALeaf_CSRRateLimiting(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
rpc := TestRPC(t)
|
||||
defer rpc.AssertExpectations(t)
|
||||
|
||||
typ, rootsCh := testCALeafType(t, rpc)
|
||||
defer close(rootsCh)
|
||||
|
||||
// Each jitter window will be only 100 ms long to make testing quick but
|
||||
// highly likely not to fail based on scheduling issues.
|
||||
typ.TestOverrideCAChangeInitialDelay = 100 * time.Millisecond
|
||||
|
||||
// Setup root that will be returned by the mocked Root cache fetch
|
||||
caRoot := connect.TestCA(t, nil)
|
||||
caRoot.Active = true
|
||||
rootsCh <- structs.IndexedCARoots{
|
||||
ActiveRootID: caRoot.ID,
|
||||
TrustDomain: "fake-trust-domain.consul",
|
||||
Roots: []*structs.CARoot{
|
||||
caRoot,
|
||||
},
|
||||
QueryMeta: structs.QueryMeta{Index: 1},
|
||||
}
|
||||
|
||||
// Instrument ConnectCA.Sign
|
||||
var resp *structs.IssuedCert
|
||||
var idx, rateLimitedRPCs uint64
|
||||
|
||||
genCert := func(args mock.Arguments) {
|
||||
reply := args.Get(2).(*structs.IssuedCert)
|
||||
leaf, _ := connect.TestLeaf(t, "web", caRoot)
|
||||
reply.CertPEM = leaf
|
||||
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
|
||||
reply.ValidBefore = time.Now().Add(11 * time.Hour)
|
||||
reply.CreateIndex = atomic.AddUint64(&idx, 1)
|
||||
reply.ModifyIndex = reply.CreateIndex
|
||||
resp = reply
|
||||
}
|
||||
|
||||
incRateLimit := func(args mock.Arguments) {
|
||||
atomic.AddUint64(&rateLimitedRPCs, 1)
|
||||
}
|
||||
|
||||
// First call return rate limit error. This is important as it checks
|
||||
// behavior when cache is empty and we have to return a nil Value but need to
|
||||
// save state to do the right thing for retry.
|
||||
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
|
||||
Return(consul.ErrRateLimited).Once().Run(incRateLimit)
|
||||
// Then succeed on second call
|
||||
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
|
||||
Return(nil).Run(genCert).Once()
|
||||
// Then be rate limited again on several further calls
|
||||
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
|
||||
Return(consul.ErrRateLimited).Twice().Run(incRateLimit)
|
||||
// Then fine after that
|
||||
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
|
||||
Return(nil).Run(genCert)
|
||||
|
||||
opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute}
|
||||
req := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web"}
|
||||
|
||||
// First fetch should return rate limit error directly - client is expected to
|
||||
// backoff itself.
|
||||
fetchCh := TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatal("shouldn't block longer than one jitter window for success")
|
||||
case result := <-fetchCh:
|
||||
switch v := result.(type) {
|
||||
case error:
|
||||
require.Error(v)
|
||||
require.Equal(consul.ErrRateLimited.Error(), v.Error())
|
||||
case cache.FetchResult:
|
||||
t.Fatalf("Expected error")
|
||||
}
|
||||
}
|
||||
|
||||
// Second call should return correct cert immediately.
|
||||
fetchCh = TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("shouldn't block waiting for fetch")
|
||||
case result := <-fetchCh:
|
||||
v := mustFetchResult(t, result)
|
||||
require.Equal(resp, v.Value)
|
||||
require.Equal(uint64(1), v.Index)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
// Set MinIndex
|
||||
opts.MinIndex = 1
|
||||
}
|
||||
|
||||
// Send in new roots, which should trigger the next sign req. We need to take
|
||||
// care to set the new root as active
|
||||
caRoot2 := connect.TestCA(t, nil)
|
||||
caRoot2.Active = true
|
||||
caRoot.Active = false
|
||||
rootsCh <- structs.IndexedCARoots{
|
||||
ActiveRootID: caRoot2.ID,
|
||||
TrustDomain: "fake-trust-domain.consul",
|
||||
Roots: []*structs.CARoot{
|
||||
caRoot2,
|
||||
caRoot,
|
||||
},
|
||||
QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)},
|
||||
}
|
||||
earliestRootDelivery := time.Now()
|
||||
|
||||
// Sanity check state
|
||||
require.Equal(uint64(1), atomic.LoadUint64(&rateLimitedRPCs))
|
||||
|
||||
// After root rotation jitter has been waited out, a new CSR will
|
||||
// be attempted but will fail and return the previous cached result with no
|
||||
// error since we will try again soon.
|
||||
fetchCh = TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatal("shouldn't block too long waiting for fetch")
|
||||
case result := <-fetchCh:
|
||||
// We should block for _at least_ one jitter period since we set that to
|
||||
// 100ms and in test override mode we always pick the max jitter not a
|
||||
// random amount.
|
||||
require.True(time.Since(earliestRootDelivery) > 100*time.Millisecond)
|
||||
require.Equal(uint64(2), atomic.LoadUint64(&rateLimitedRPCs))
|
||||
|
||||
v := mustFetchResult(t, result)
|
||||
require.Equal(resp, v.Value)
|
||||
// 1 since this should still be the original cached result as we failed to
|
||||
// get a new cert.
|
||||
require.Equal(uint64(1), v.Index)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
}
|
||||
|
||||
// Root rotation state is now only captured in the opts.LastResult.State so a
|
||||
// subsequent call should also wait for 100ms and then attempt to generate a
|
||||
// new cert since we failed last time.
|
||||
fetchCh = TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatal("shouldn't block too long waiting for fetch")
|
||||
case result := <-fetchCh:
|
||||
// We should block for _at least_ two jitter periods now.
|
||||
require.True(time.Since(earliestRootDelivery) > 200*time.Millisecond)
|
||||
require.Equal(uint64(3), atomic.LoadUint64(&rateLimitedRPCs))
|
||||
|
||||
v := mustFetchResult(t, result)
|
||||
require.Equal(resp, v.Value)
|
||||
// 1 since this should still be the original cached result as we failed to
|
||||
// get a new cert.
|
||||
require.Equal(uint64(1), v.Index)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
}
|
||||
|
||||
// Now we've had two rate limit failures and seen root rotation state work
|
||||
// across both the blocking request that observed the rotation and the
|
||||
// subsequent one. The next request should wait out the rest of the backoff
|
||||
// and then actually fetch a new cert at last!
|
||||
fetchCh = TestFetchCh(t, typ, opts, req)
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatal("shouldn't block too long waiting for fetch")
|
||||
case result := <-fetchCh:
|
||||
// We should block for _at least_ three jitter periods now.
|
||||
require.True(time.Since(earliestRootDelivery) > 300*time.Millisecond)
|
||||
require.Equal(uint64(3), atomic.LoadUint64(&rateLimitedRPCs))
|
||||
|
||||
v := mustFetchResult(t, result)
|
||||
require.Equal(resp, v.Value)
|
||||
// 3 since the rootCA change used 2
|
||||
require.Equal(uint64(3), v.Index)
|
||||
// Set the LastResult for subsequent fetches
|
||||
opts.LastResult = &v
|
||||
}
|
||||
}
|
||||
|
||||
// This test runs multiple concurrent callers watching different leaf certs and
|
||||
// tries to ensure that the background root watch activity behaves correctly.
|
||||
func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) {
|
||||
|
|
|
@ -501,6 +501,15 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
|||
|
||||
// This is a valid entry with a result
|
||||
newEntry.Valid = true
|
||||
} else if result.State != nil && err == nil {
|
||||
// Also set state if it's non-nil but Value is nil. This is important in the
|
||||
// case we are returning nil due to a timeout or a transient error like rate
|
||||
// limiting that we want to mask from the user - there is no result yet but
|
||||
// we want to manage retrying internally before we return an error to user.
|
||||
// The retrying state is in State so we need to still update that in the
|
||||
// entry even if we don't have an actual result yet (e.g. hit a rate limit
|
||||
// on first request for a leaf certificate).
|
||||
newEntry.State = result.State
|
||||
}
|
||||
|
||||
// Error handling
|
||||
|
|
|
@ -383,7 +383,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
|||
|
||||
// Configure the type
|
||||
typ.Static(FetchResult{Value: 42, State: 31, Index: 1}, nil).Times(1)
|
||||
// Return different State, it should be ignored
|
||||
// Return different State, it should NOT be ignored
|
||||
typ.Static(FetchResult{Value: nil, State: 32}, nil).Run(func(args mock.Arguments) {
|
||||
// We should get back the original state
|
||||
opts := args.Get(0).(FetchOptions)
|
||||
|
@ -414,8 +414,8 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
|||
t.Fatal("timed out")
|
||||
}
|
||||
|
||||
// Next request should get the first returned state too since last Fetch
|
||||
// returned nil result.
|
||||
// Next request should get the SECOND returned state even though the fetch
|
||||
// returns nil and so the previous result is used.
|
||||
req = TestRequest(t, RequestInfo{
|
||||
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
||||
result, meta, err = c.Get("t", req)
|
||||
|
@ -424,7 +424,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
|||
require.False(meta.Hit)
|
||||
select {
|
||||
case state := <-stateCh:
|
||||
require.Equal(31, state)
|
||||
require.Equal(32, state)
|
||||
case <-time.After(20 * time.Millisecond):
|
||||
t.Fatal("timed out")
|
||||
}
|
||||
|
|
|
@ -26,7 +26,8 @@ type Type interface {
|
|||
// Consul so this allows cache types to be implemented with no extra logic.
|
||||
// Second, FetchResult can return an unset value and index. In this case, the
|
||||
// cache will reuse the last value automatically. If an unset Value is
|
||||
// returned, the State field will also be ignored currently.
|
||||
// returned, the State field will still be updated which allows maintaining
|
||||
// metadata even when there is no result.
|
||||
Fetch(FetchOptions, Request) (FetchResult, error)
|
||||
|
||||
// SupportsBlocking should return true if the type supports blocking queries.
|
||||
|
|
|
@ -576,7 +576,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||
"tls_skip_verify": "TLSSkipVerify",
|
||||
|
||||
// Common CA config
|
||||
"leaf_cert_ttl": "LeafCertTTL",
|
||||
"leaf_cert_ttl": "LeafCertTTL",
|
||||
"csr_max_per_second": "CSRMaxPerSecond",
|
||||
"csr_max_concurrent": "CSRMaxConcurrent",
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -3009,8 +3009,10 @@ func TestFullConfig(t *testing.T) {
|
|||
"connect": {
|
||||
"ca_provider": "consul",
|
||||
"ca_config": {
|
||||
"RotationPeriod": "90h",
|
||||
"LeafCertTTL": "1h"
|
||||
"rotation_period": "90h",
|
||||
"leaf_cert_ttl": "1h",
|
||||
"csr_max_per_second": 100,
|
||||
"csr_max_concurrent": 2
|
||||
},
|
||||
"enabled": true,
|
||||
"proxy_defaults": {
|
||||
|
@ -3558,6 +3560,10 @@ func TestFullConfig(t *testing.T) {
|
|||
ca_config {
|
||||
rotation_period = "90h"
|
||||
leaf_cert_ttl = "1h"
|
||||
# hack float since json parses numbers as float and we have to
|
||||
# assert against the same thing
|
||||
csr_max_per_second = 100.0
|
||||
csr_max_concurrent = 2.0
|
||||
}
|
||||
enabled = true
|
||||
proxy_defaults {
|
||||
|
@ -4211,8 +4217,10 @@ func TestFullConfig(t *testing.T) {
|
|||
ConnectSidecarMaxPort: 9999,
|
||||
ConnectCAProvider: "consul",
|
||||
ConnectCAConfig: map[string]interface{}{
|
||||
"RotationPeriod": "90h",
|
||||
"LeafCertTTL": "1h",
|
||||
"RotationPeriod": "90h",
|
||||
"LeafCertTTL": "1h",
|
||||
"CSRMaxPerSecond": float64(100),
|
||||
"CSRMaxConcurrent": float64(2),
|
||||
},
|
||||
ConnectProxyAllowManagedRoot: false,
|
||||
ConnectProxyAllowManagedAPIRegistration: false,
|
||||
|
|
|
@ -1,12 +1,18 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib/semaphore"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
|
@ -14,12 +20,84 @@ import (
|
|||
"github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
var ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint")
|
||||
var (
|
||||
// Err strings. net/rpc doesn't have a way to transport typed/rich errors so
|
||||
// we currently rely on sniffing the error string in a few cases where we need
|
||||
// to change client behavior. These are the canonical error strings to use.
|
||||
// Note though that client code can't use `err == consul.Err*` directly since
|
||||
// the error returned by RPC will be a plain error.errorString created by
|
||||
// net/rpc client so will not be the same _instance_ that this package
|
||||
// variable points to. Clients need to compare using `err.Error() ==
|
||||
// consul.ErrRateLimited.Error()` which is very sad. Short of replacing our
|
||||
// RPC mechanism it's hard to know how to make that much better though.
|
||||
ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint")
|
||||
ErrRateLimited = errors.New("Rate limit reached, try again later")
|
||||
)
|
||||
|
||||
const (
|
||||
// csrLimitWait is the maximum time we'll wait for a slot when CSR concurrency
|
||||
// limiting or rate limiting is occuring. It's intentionally short so small
|
||||
// batches of requests can be accommodated when server has capacity (assuming
|
||||
// signing one cert takes much less than this) but failing requests fast when
|
||||
// a thundering herd comes along.
|
||||
csrLimitWait = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
// ConnectCA manages the Connect CA.
|
||||
type ConnectCA struct {
|
||||
// srv is a pointer back to the server.
|
||||
srv *Server
|
||||
|
||||
// csrRateLimiter limits the rate of signing new certs if configured. Lazily
|
||||
// initialized from current config to support dynamic changes.
|
||||
// csrRateLimiterMu must be held while dereferencing the pointer or storing a
|
||||
// new one, but methods can be called on the limiter object outside of the
|
||||
// locked section. This is done only in the getCSRRateLimiterWithLimit method.
|
||||
csrRateLimiter *rate.Limiter
|
||||
csrRateLimiterMu sync.RWMutex
|
||||
|
||||
// csrConcurrencyLimiter is a dynamically resizable semaphore used to limit
|
||||
// Sign RPC concurrency if configured. The zero value is usable as soon as
|
||||
// SetSize is called which we do dynamically in the RPC handler to avoid
|
||||
// having to hook elaborate synchronization mechanisms through the CA config
|
||||
// endpoint and config reload etc.
|
||||
csrConcurrencyLimiter semaphore.Dynamic
|
||||
}
|
||||
|
||||
// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set.
|
||||
// It uses the shared server-wide limiter unless the limit has been changed in
|
||||
// config or the limiter has not been setup yet in which case it just-in-time
|
||||
// configures the new limiter. We assume that limit changes are relatively rare
|
||||
// and that all callers (there is currently only one) use the same config value
|
||||
// as the limit. There might be some flapping if there are multiple concurrent
|
||||
// requests in flight at the time the config changes where A sees the new value
|
||||
// and updates, B sees the old but then gets this lock second and changes back.
|
||||
// Eventually though and very soon (once all current RPCs are complete) we are
|
||||
// guaranteed to have the correct limit set by the next RPC that comes in so I
|
||||
// assume this is fine. If we observe strange behavior because of it, we could
|
||||
// add hysteresis that prevents changes too soon after a previous change but
|
||||
// that seems unnecessary for now.
|
||||
func (s *ConnectCA) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter {
|
||||
s.csrRateLimiterMu.RLock()
|
||||
lim := s.csrRateLimiter
|
||||
s.csrRateLimiterMu.RUnlock()
|
||||
|
||||
// If there is a current limiter with the same limit, return it. This should
|
||||
// be the common case.
|
||||
if lim != nil && lim.Limit() == limit {
|
||||
return lim
|
||||
}
|
||||
|
||||
// Need to change limiter, get write lock
|
||||
s.csrRateLimiterMu.Lock()
|
||||
defer s.csrRateLimiterMu.Unlock()
|
||||
// No limiter yet, or limit changed in CA config, reconfigure a new limiter.
|
||||
// We use burst of 1 for a hard limit. Note that either bursting or waiting is
|
||||
// necessary to get expected behavior in fact of random arrival times, but we
|
||||
// don't need both and we use Wait with a small delay to smooth noise. See
|
||||
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md.
|
||||
s.csrRateLimiter = rate.NewLimiter(limit, 1)
|
||||
return s.csrRateLimiter
|
||||
}
|
||||
|
||||
// ConfigurationGet returns the configuration for the CA.
|
||||
|
@ -370,6 +448,28 @@ func (s *ConnectCA) Sign(
|
|||
"we are %s", serviceID.Datacenter, s.srv.config.Datacenter)
|
||||
}
|
||||
|
||||
commonCfg, err := config.GetCommonConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if commonCfg.CSRMaxPerSecond > 0 {
|
||||
lim := s.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond))
|
||||
// Wait up to the small threshold we allow for a token.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
|
||||
defer cancel()
|
||||
if lim.Wait(ctx) != nil {
|
||||
return ErrRateLimited
|
||||
}
|
||||
} else if commonCfg.CSRMaxConcurrent > 0 {
|
||||
s.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
|
||||
defer cancel()
|
||||
if err := s.csrConcurrencyLimiter.Acquire(ctx); err != nil {
|
||||
return ErrRateLimited
|
||||
}
|
||||
defer s.csrConcurrencyLimiter.Release()
|
||||
}
|
||||
|
||||
// All seems to be in order, actually sign it.
|
||||
pem, err := provider.Sign(csr)
|
||||
if err != nil {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/pem"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -348,6 +349,201 @@ func TestConnectCASign(t *testing.T) {
|
|||
assert.Equal(spiffeId.URI().String(), reply.ServiceURI)
|
||||
}
|
||||
|
||||
// Bench how long Signing RPC takes. This was used to ballpark reasonable
|
||||
// default rate limit to protect servers from thundering herds of signing
|
||||
// requests on root rotation.
|
||||
func BenchmarkConnectCASign(b *testing.B) {
|
||||
t := &testing.T{}
|
||||
|
||||
require := require.New(b)
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Generate a CSR and request signing
|
||||
spiffeID := connect.TestSpiffeIDService(b, "web")
|
||||
csr, _ := connect.TestCSR(b, spiffeID)
|
||||
args := &structs.CASignRequest{
|
||||
Datacenter: "dc1",
|
||||
CSR: csr,
|
||||
}
|
||||
var reply structs.IssuedCert
|
||||
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply))
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectCASign_rateLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.CAConfig.Config = map[string]interface{}{
|
||||
// It actually doesn't work as expected with some higher values because
|
||||
// the token bucket is initialized with max(10%, 1) burst which for small
|
||||
// values is 1 and then the test completes so fast it doesn't actually
|
||||
// replenish any tokens so you only get the burst allowed through. This is
|
||||
// OK, running the test slower is likely to be more brittle anyway since
|
||||
// it will become more timing dependent whether the actual rate the
|
||||
// requests are made matches the expectation from the sleeps etc.
|
||||
"CSRMaxPerSecond": 1,
|
||||
}
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Generate a CSR and request signing a few times in a loop.
|
||||
spiffeID := connect.TestSpiffeIDService(t, "web")
|
||||
csr, _ := connect.TestCSR(t, spiffeID)
|
||||
args := &structs.CASignRequest{
|
||||
Datacenter: "dc1",
|
||||
CSR: csr,
|
||||
}
|
||||
var reply structs.IssuedCert
|
||||
|
||||
errs := make([]error, 10)
|
||||
for i := 0; i < len(errs); i++ {
|
||||
errs[i] = msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply)
|
||||
}
|
||||
|
||||
limitedCount := 0
|
||||
successCount := 0
|
||||
for _, err := range errs {
|
||||
if err == nil {
|
||||
successCount++
|
||||
} else if err.Error() == ErrRateLimited.Error() {
|
||||
limitedCount++
|
||||
} else {
|
||||
require.NoError(err)
|
||||
}
|
||||
}
|
||||
// I've only ever seen this as 1/9 however if the test runs slowly on an
|
||||
// over-subscribed CPU (e.g. in CI) it's possible that later requests could
|
||||
// have had their token replenished and succeed so we allow a little slack -
|
||||
// the test here isn't really the exact token bucket response more a sanity
|
||||
// check that some limiting is being applied. Note that we can't just measure
|
||||
// the time it took to send them all and infer how many should have succeeded
|
||||
// without some complex modelling of the token bucket algorithm.
|
||||
require.Truef(successCount >= 1, "at least 1 CSRs should have succeeded, got %d", successCount)
|
||||
require.Truef(limitedCount >= 7, "at least 7 CSRs should have been rate limited, got %d", limitedCount)
|
||||
}
|
||||
|
||||
func TestConnectCASign_concurrencyLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.CAConfig.Config = map[string]interface{}{
|
||||
// Must disable the rate limit since it takes precedence
|
||||
"CSRMaxPerSecond": 0,
|
||||
"CSRMaxConcurrent": 1,
|
||||
}
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Generate a CSR and request signing a few times in a loop.
|
||||
spiffeID := connect.TestSpiffeIDService(t, "web")
|
||||
csr, _ := connect.TestCSR(t, spiffeID)
|
||||
args := &structs.CASignRequest{
|
||||
Datacenter: "dc1",
|
||||
CSR: csr,
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
errs := make(chan error, 10)
|
||||
times := make(chan time.Duration, cap(errs))
|
||||
start := time.Now()
|
||||
for i := 0; i < cap(errs); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
var reply structs.IssuedCert
|
||||
errs <- msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply)
|
||||
times <- time.Since(start)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
|
||||
limitedCount := 0
|
||||
successCount := 0
|
||||
var minTime, maxTime time.Duration
|
||||
for err := range errs {
|
||||
elapsed := <-times
|
||||
if elapsed < minTime || minTime == 0 {
|
||||
minTime = elapsed
|
||||
}
|
||||
if elapsed > maxTime {
|
||||
maxTime = elapsed
|
||||
}
|
||||
if err == nil {
|
||||
successCount++
|
||||
} else if err.Error() == ErrRateLimited.Error() {
|
||||
limitedCount++
|
||||
} else {
|
||||
require.NoError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// These are very hand wavy - on my mac times look like this:
|
||||
// 2.776009ms
|
||||
// 3.705813ms
|
||||
// 4.527212ms
|
||||
// 5.267755ms
|
||||
// 6.119809ms
|
||||
// 6.958083ms
|
||||
// 7.869179ms
|
||||
// 8.675058ms
|
||||
// 9.512281ms
|
||||
// 10.238183ms
|
||||
//
|
||||
// But it's indistinguishable from noise - even if you disable the concurrency
|
||||
// limiter you get pretty much the same pattern/spread.
|
||||
//
|
||||
// On the other hand it's only timing that stops us from not hitting the 500ms
|
||||
// timeout. On highly CPU constrained CI box this could be brittle if we
|
||||
// assert that we never get rate limited.
|
||||
//
|
||||
// So this test is not super strong - but it's a sanity check at least that
|
||||
// things don't break when configured this way, and through manual
|
||||
// inspection/debug logging etc. we can verify it's actually doing the
|
||||
// concurrency limit thing. If you add a 100ms sleep into the sign endpoint
|
||||
// after the rate limit code for example it makes it much more obvious:
|
||||
//
|
||||
// With 100ms sleep an no concurrency limit:
|
||||
// min=109ms, max=118ms
|
||||
// With concurrency limit of 1:
|
||||
// min=106ms, max=538ms (with ~half hitting the 500ms timeout)
|
||||
//
|
||||
// Without instrumenting the endpoint to make the RPC take an artificially
|
||||
// long time it's hard to know what else we can do to actively detect that the
|
||||
// requests were serialized.
|
||||
t.Logf("min=%s, max=%s", minTime, maxTime)
|
||||
//t.Fail() // Uncomment to see the time spread logged
|
||||
require.Truef(successCount >= 1, "at least 1 CSRs should have succeeded, got %d", successCount)
|
||||
}
|
||||
|
||||
func TestConnectCASignValidation(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ func init() {
|
|||
registerEndpoint(func(s *Server) interface{} { return &ACL{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &Catalog{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) })
|
||||
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &Health{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &Intention{s} })
|
||||
registerEndpoint(func(s *Server) interface{} { return &Internal{s} })
|
||||
|
|
|
@ -174,7 +174,6 @@ func (s *Store) caSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.CAConf
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed CA config lookup: %s", err)
|
||||
}
|
||||
|
||||
// Set the indexes, prevent the cluster ID from changing.
|
||||
if prev != nil {
|
||||
existing := prev.(*structs.CAConfiguration)
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
@ -335,6 +336,11 @@ func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc {
|
|||
return ok
|
||||
}
|
||||
|
||||
isTooManyRequests := func(err error) bool {
|
||||
// Sadness net/rpc can't do nice typed errors so this is all we got
|
||||
return err.Error() == consul.ErrRateLimited.Error()
|
||||
}
|
||||
|
||||
addAllowHeader := func(methods []string) {
|
||||
resp.Header().Add("Allow", strings.Join(methods, ","))
|
||||
}
|
||||
|
@ -358,6 +364,9 @@ func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc {
|
|||
case isBadRequest(err):
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
fmt.Fprint(resp, err.Error())
|
||||
case isTooManyRequests(err):
|
||||
resp.WriteHeader(http.StatusTooManyRequests)
|
||||
fmt.Fprint(resp, err.Error())
|
||||
default:
|
||||
resp.WriteHeader(http.StatusInternalServerError)
|
||||
fmt.Fprint(resp, err.Error())
|
||||
|
|
|
@ -222,6 +222,10 @@ func (c *CAConfiguration) GetCommonConfig() (*CommonCAProviderConfig, error) {
|
|||
}
|
||||
|
||||
var config CommonCAProviderConfig
|
||||
|
||||
// Set Defaults
|
||||
config.CSRMaxPerSecond = 50 // See doc comment for rationale here.
|
||||
|
||||
decodeConf := &mapstructure.DecoderConfig{
|
||||
DecodeHook: ParseDurationFunc(),
|
||||
Result: &config,
|
||||
|
@ -244,6 +248,30 @@ type CommonCAProviderConfig struct {
|
|||
LeafCertTTL time.Duration
|
||||
|
||||
SkipValidate bool
|
||||
|
||||
// CSRMaxPerSecond is a rate limit on processing Connect Certificate Signing
|
||||
// Requests on the servers. It applies to all CA providers so can be used to
|
||||
// limit rate to an external CA too. 0 disables the rate limit. Defaults to 50
|
||||
// which is low enough to prevent overload of a reasonably sized production
|
||||
// server while allowing a cluster with 1000 service instances to complete a
|
||||
// rotation in 20 seconds. For reference a quad-core 2017 MacBook pro can
|
||||
// process 100 signing RPCs a second while using less than half of one core.
|
||||
// For large clusters with powerful servers it's advisable to increase this
|
||||
// rate or to disable this limit and instead rely on CSRMaxConcurrent to only
|
||||
// consume a subset of the server's cores.
|
||||
CSRMaxPerSecond float32
|
||||
|
||||
// CSRMaxConcurrent is a limit on how many concurrent CSR signing requests
|
||||
// will be processed in parallel. New incoming signing requests will try for
|
||||
// `consul.csrSemaphoreWait` (currently 500ms) for a slot before being
|
||||
// rejected with a "rate limited" backpressure response. This effectively sets
|
||||
// how many CPU cores can be occupied by Connect CA signing activity and
|
||||
// should be a (small) subset of your server's available cores to allow other
|
||||
// tasks to complete when a barrage of CSRs come in (e.g. after a CA root
|
||||
// rotation). Setting to 0 disables the limit, attempting to sign certs
|
||||
// immediately in the RPC goroutine. This is 0 by default and CSRMaxPerSecond
|
||||
// is used. This is ignored if CSRMaxPerSecond is non-zero.
|
||||
CSRMaxConcurrent int
|
||||
}
|
||||
|
||||
func (c CommonCAProviderConfig) Validate() error {
|
||||
|
|
|
@ -18,12 +18,14 @@ func TestCAConfiguration_GetCommonConfig(t *testing.T) {
|
|||
name: "basic defaults",
|
||||
cfg: &CAConfiguration{
|
||||
Config: map[string]interface{}{
|
||||
"RotationPeriod": "2160h",
|
||||
"LeafCertTTL": "72h",
|
||||
"RotationPeriod": "2160h",
|
||||
"LeafCertTTL": "72h",
|
||||
"CSRMaxPerSecond": "50",
|
||||
},
|
||||
},
|
||||
want: &CommonCAProviderConfig{
|
||||
LeafCertTTL: 72 * time.Hour,
|
||||
LeafCertTTL: 72 * time.Hour,
|
||||
CSRMaxPerSecond: 50,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -41,7 +43,8 @@ func TestCAConfiguration_GetCommonConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
want: &CommonCAProviderConfig{
|
||||
LeafCertTTL: 72 * time.Hour,
|
||||
LeafCertTTL: 72 * time.Hour,
|
||||
CSRMaxPerSecond: 50, // The default value
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -23,7 +23,10 @@ type CAConfig struct {
|
|||
|
||||
// CommonCAProviderConfig is the common options available to all CA providers.
|
||||
type CommonCAProviderConfig struct {
|
||||
LeafCertTTL time.Duration
|
||||
LeafCertTTL time.Duration
|
||||
SkipValidate bool
|
||||
CSRMaxPerSecond float32
|
||||
CSRMaxConcurrent int
|
||||
}
|
||||
|
||||
// ConsulCAProviderConfig is the config for the built-in Consul CA provider.
|
||||
|
@ -41,7 +44,6 @@ func ParseConsulCAConfig(raw map[string]interface{}) (*ConsulCAProviderConfig, e
|
|||
var config ConsulCAProviderConfig
|
||||
decodeConf := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
|
||||
ErrorUnused: true,
|
||||
Result: &config,
|
||||
WeaklyTypedInput: true,
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
// Package semaphore implements a simple semaphore that is based on
|
||||
// golang.org/x/sync/semaphore but doesn't support weights. It's advantage over
|
||||
// a simple buffered chan is that the capacity of the semaphore (i.e. the number
|
||||
// of slots available) can be changed dynamically at runtime without waiting for
|
||||
// all existing work to stop. This makes it easier to implement e.g. concurrency
|
||||
// limits on certain operations that can be reconfigured at runtime.
|
||||
package semaphore
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Dynamic implements a semaphore whose capacity can be changed dynamically at
|
||||
// run time.
|
||||
type Dynamic struct {
|
||||
size int64
|
||||
cur int64
|
||||
waiters list.List
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewDynamic returns a dynamic semaphore with the given initial capacity. Note
|
||||
// that this is for convenience and to match golang.org/x/sync/semaphore however
|
||||
// it's possible to use a zero-value semaphore provided SetSize is called before
|
||||
// use.
|
||||
func NewDynamic(n int64) *Dynamic {
|
||||
return &Dynamic{
|
||||
size: n,
|
||||
}
|
||||
}
|
||||
|
||||
// SetSize dynamically updates the number of available slots. If there are more
|
||||
// than n slots currently acquired, no further acquires will succeed until
|
||||
// sufficient have been released to take the total outstanding below n again.
|
||||
func (s *Dynamic) SetSize(n int64) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.size = n
|
||||
return nil
|
||||
}
|
||||
|
||||
// Acquire attempts to acquire one "slot" in the semaphore, blocking only until
|
||||
// ctx is Done. On success, returns nil. On failure, returns ctx.Err() and leaves
|
||||
// the semaphore unchanged.
|
||||
//
|
||||
// If ctx is already done, Acquire may still succeed without blocking.
|
||||
func (s *Dynamic) Acquire(ctx context.Context) error {
|
||||
s.mu.Lock()
|
||||
if s.cur < s.size {
|
||||
s.cur++
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Need to wait, add to waiter list
|
||||
ready := make(chan struct{})
|
||||
elem := s.waiters.PushBack(ready)
|
||||
s.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
s.mu.Lock()
|
||||
select {
|
||||
case <-ready:
|
||||
// Acquired the semaphore after we were canceled. Rather than trying to
|
||||
// fix up the queue, just pretend we didn't notice the cancellation.
|
||||
err = nil
|
||||
default:
|
||||
s.waiters.Remove(elem)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return err
|
||||
|
||||
case <-ready:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Release releases the semaphore. It will panic if release is called on an
|
||||
// empty semphore.
|
||||
func (s *Dynamic) Release() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.cur < 1 {
|
||||
panic("semaphore: bad release")
|
||||
}
|
||||
|
||||
next := s.waiters.Front()
|
||||
|
||||
// If there are no waiters, just decrement and we're done
|
||||
if next == nil {
|
||||
s.cur--
|
||||
return
|
||||
}
|
||||
|
||||
// Need to yield our slot to the next waiter.
|
||||
// Remove them from the list
|
||||
s.waiters.Remove(next)
|
||||
// And trigger it's chan before we release the lock
|
||||
close(next.Value.(chan struct{}))
|
||||
// Note we _don't_ decrement inflight since the slot was yielded directly.
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
package semaphore
|
||||
|
||||
// Based on https://github.com/golang/sync/blob/master/semaphore/semaphore_test.go
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const maxSleep = 1 * time.Millisecond
|
||||
|
||||
func HammerDynamic(sem *Dynamic, loops int) {
|
||||
for i := 0; i < loops; i++ {
|
||||
sem.Acquire(context.Background())
|
||||
time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
|
||||
sem.Release()
|
||||
}
|
||||
}
|
||||
|
||||
// TestDynamic hammers the semaphore from all available cores to ensure we don't
|
||||
// hit a panic or race detector notice something wonky.
|
||||
func TestDynamic(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
n := runtime.GOMAXPROCS(0)
|
||||
loops := 10000 / n
|
||||
sem := NewDynamic(int64(n))
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
HammerDynamic(sem, loops)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestDynamicPanic(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
defer func() {
|
||||
if recover() == nil {
|
||||
t.Fatal("release of an unacquired dynamic semaphore did not panic")
|
||||
}
|
||||
}()
|
||||
w := NewDynamic(1)
|
||||
w.Release()
|
||||
}
|
||||
|
||||
func checkAcquire(t *testing.T, sem *Dynamic, wantAcquire bool) {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
defer cancel()
|
||||
err := sem.Acquire(ctx)
|
||||
if wantAcquire {
|
||||
require.NoErrorf(t, err, "failed to acquire when we should have")
|
||||
} else {
|
||||
require.Error(t, err, "failed to block when should be full")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDynamicAcquire(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
sem := NewDynamic(2)
|
||||
|
||||
// Consume one slot [free: 1]
|
||||
sem.Acquire(ctx)
|
||||
// Should be able to consume another [free: 0]
|
||||
checkAcquire(t, sem, true)
|
||||
// Should fail to consume another [free: 0]
|
||||
checkAcquire(t, sem, false)
|
||||
|
||||
// Release 2
|
||||
sem.Release()
|
||||
sem.Release()
|
||||
|
||||
// Should be able to consume another [free: 1]
|
||||
checkAcquire(t, sem, true)
|
||||
// Should be able to consume another [free: 0]
|
||||
checkAcquire(t, sem, true)
|
||||
// Should fail to consume another [free: 0]
|
||||
checkAcquire(t, sem, false)
|
||||
|
||||
// Now expand the semaphore and we should be able to acquire again [free: 2]
|
||||
sem.SetSize(4)
|
||||
|
||||
// Should be able to consume another [free: 1]
|
||||
checkAcquire(t, sem, true)
|
||||
// Should be able to consume another [free: 0]
|
||||
checkAcquire(t, sem, true)
|
||||
// Should fail to consume another [free: 0]
|
||||
checkAcquire(t, sem, false)
|
||||
|
||||
// Shrinking it should work [free: 0]
|
||||
sem.SetSize(3)
|
||||
|
||||
// Should fail to consume another [free: 0]
|
||||
checkAcquire(t, sem, false)
|
||||
|
||||
// Release one [free: 0] (3 slots used are release, size only 3)
|
||||
sem.Release()
|
||||
|
||||
// Should fail to consume another [free: 0]
|
||||
checkAcquire(t, sem, false)
|
||||
|
||||
sem.Release()
|
||||
|
||||
// Should be able to consume another [free: 1]
|
||||
checkAcquire(t, sem, true)
|
||||
}
|
|
@ -857,23 +857,72 @@ default will automatically work with some tooling.
|
|||
<p>There are also a number of common configuration options supported by all providers:</p>
|
||||
|
||||
* <a name="ca_leaf_cert_ttl"></a><a href="#ca_leaf_cert_ttl">`leaf_cert_ttl`</a> The upper bound on the
|
||||
lease duration of a leaf certificate issued for a service. In most cases a new leaf certificate will be
|
||||
requested by a proxy before this limit is reached. This is also the effective limit on how long a server
|
||||
outage can last (with no leader) before network connections will start being rejected, and as a result the
|
||||
defaults is `72h` to last through a weekend without intervention. This value cannot be lower than 1 hour
|
||||
or higher than 1 year.
|
||||
lease duration of a leaf certificate issued for a service. In most
|
||||
cases a new leaf certificate will be requested by a proxy before this
|
||||
limit is reached. This is also the effective limit on how long a
|
||||
server outage can last (with no leader) before network connections
|
||||
will start being rejected, and as a result the defaults is `72h` to
|
||||
last through a weekend without intervention. This value cannot be
|
||||
lower than 1 hour or higher than 1 year.
|
||||
|
||||
This value is also used when rotating out old root certificates from the cluster. When a root certificate
|
||||
has been inactive (rotated out) for more than twice the *current* `leaf_cert_ttl`, it will be removed from
|
||||
the trusted list.
|
||||
This value is also used when rotating out old root certificates from
|
||||
the cluster. When a root certificate has been inactive (rotated out)
|
||||
for more than twice the *current* `leaf_cert_ttl`, it will be removed
|
||||
from the trusted list.
|
||||
|
||||
* <a name="connect_proxy"></a><a href="#connect_proxy">`proxy`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This object allows setting options for the Connect proxies. The following sub-keys are available:
|
||||
* <a name="ca_csr_max_per_second"></a><a
|
||||
href="#ca_csr_max_per_second">`csr_max_per_second`</a> Sets a rate
|
||||
limit on the maximum number of Certificate Signing Requests (CSRs) the
|
||||
servers will accept. This is used to prevent CA rotation from causing
|
||||
unbounded CPU usage on servers. It defaults to 50 which is
|
||||
conservative - a 2017 Macbook can process about 100 per second using
|
||||
only ~40% of one CPU core - but sufficient for deployments up to ~1500
|
||||
service instances before the time it takes to rotate is impacted. For
|
||||
larger deployments we recommend increasing this based on the expected
|
||||
number of server instances and server resources, or use
|
||||
`csr_max_concurrent` instead if servers have more than one core.
|
||||
Setting this to zero disables rate limiting. Added in 1.4.1.
|
||||
|
||||
* <a name="connect_proxy_allow_managed_registration"></a><a href="#connect_proxy_allow_managed_registration">`allow_managed_api_registration`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) Allows managed proxies to be configured with services that are registered via the Agent HTTP API. Enabling this would allow anyone with permission to register a service to define a command to execute for the proxy. By default, this is false to protect against arbitrary process execution.
|
||||
* <a name="ca_csr_max_concurrent"></a><a
|
||||
href="#ca_csr_max_concurrent">`csr_max_concurrent`</a> Sets a limit
|
||||
on how many Certificate Signing Requests will be processed
|
||||
concurrently. Defaults to 0 (disabled). This is useful when you have
|
||||
more than one or two cores available to the server. For example on an
|
||||
8 core server, setting this to 1 will ensure that even during a CA
|
||||
rotation no more than one server core on the leader will be consumed
|
||||
at a time with generating new certificates. Setting this is
|
||||
recommended _instead_ of `csr_max_per_second` where you know there are
|
||||
multiple cores available since it is simpler to reason about limiting
|
||||
CSR resources this way without artificially slowing down rotations.
|
||||
Added in 1.4.1.
|
||||
|
||||
* <a name="connect_proxy_allow_managed_root"></a><a href="#connect_proxy_allow_managed_root">`allow_managed_root`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) Allows Consul to start managed proxies if Consul is running as root (EUID of the process is zero). We recommend running Consul as a non-root user. By default, this is false to protect inadvertently running external processes as root.
|
||||
|
||||
* <a name="connect_proxy_defaults"></a><a href="#connect_proxy_defaults">`proxy_defaults`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This object configures the default proxy settings for service definitions with [managed proxies](/docs/connect/proxies/managed-deprecated.html) (now deprecated). It accepts the fields `exec_mode`, `daemon_command`, and `config`. These are used as default values for the respective fields in the service definition.
|
||||
* <a name="connect_proxy"></a><a href="#connect_proxy">`proxy`</a>
|
||||
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This
|
||||
object allows setting options for the Connect proxies. The following
|
||||
sub-keys are available:
|
||||
* <a name="connect_proxy_allow_managed_registration"></a><a
|
||||
href="#connect_proxy_allow_managed_registration">`allow_managed_api_registration`</a>
|
||||
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html)
|
||||
Allows managed proxies to be configured with services that are
|
||||
registered via the Agent HTTP API. Enabling this would allow anyone
|
||||
with permission to register a service to define a command to execute
|
||||
for the proxy. By default, this is false to protect against
|
||||
arbitrary process execution.
|
||||
* <a name="connect_proxy_allow_managed_root"></a><a
|
||||
href="#connect_proxy_allow_managed_root">`allow_managed_root`</a>
|
||||
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html)
|
||||
Allows Consul to start managed proxies if Consul is running as root
|
||||
(EUID of the process is zero). We recommend running Consul as a
|
||||
non-root user. By default, this is false to protect inadvertently
|
||||
running external processes as root.
|
||||
* <a name="connect_proxy_defaults"></a><a
|
||||
href="#connect_proxy_defaults">`proxy_defaults`</a>
|
||||
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This
|
||||
object configures the default proxy settings for service definitions
|
||||
with [managed proxies](/docs/connect/proxies/managed-deprecated.html)
|
||||
(now deprecated). It accepts the fields `exec_mode`, `daemon_command`,
|
||||
and `config`. These are used as default values for the respective
|
||||
fields in the service definition.
|
||||
|
||||
* <a name="datacenter"></a><a href="#datacenter">`datacenter`</a> Equivalent to the
|
||||
[`-datacenter` command-line flag](#_datacenter).
|
||||
|
|
|
@ -79,9 +79,21 @@ So a scaling factor of `5` (i.e. `raft_multiplier: 5`) updates the following val
|
|||
|
||||
~> **NOTE** Wide networks with more latency will perform better with larger values of `raft_multiplier`.
|
||||
|
||||
The trade off is between leader stability and time to recover from an actual leader failure. A short multiplier minimizes failure detection and election time but may be triggered frequently in high latency situations. This can cause constant leadership churn and associated unavailability. A high multiplier reduces the chances that spurious failures will cause leadership churn but it does this at the expense of taking longer to detect real failures and thus takes longer to restore cluster availability.
|
||||
The trade off is between leader stability and time to recover from an actual
|
||||
leader failure. A short multiplier minimizes failure detection and election time
|
||||
but may be triggered frequently in high latency situations. This can cause
|
||||
constant leadership churn and associated unavailability. A high multiplier
|
||||
reduces the chances that spurious failures will cause leadership churn but it
|
||||
does this at the expense of taking longer to detect real failures and thus takes
|
||||
longer to restore cluster availability.
|
||||
|
||||
Leadership instability can also be caused by under-provisioned CPU resources and is more likely in environments where CPU cycles are shared with other workloads. In order for a server to remain the leader, it must send frequent heartbeat messages to all other servers every few hundred milliseconds. If some number of these are missing or late due to the leader not having sufficient CPU to send them on time, the other servers will detect it as failed and hold a new election.
|
||||
Leadership instability can also be caused by under-provisioned CPU resources and
|
||||
is more likely in environments where CPU cycles are shared with other workloads.
|
||||
In order for a server to remain the leader, it must send frequent heartbeat
|
||||
messages to all other servers every few hundred milliseconds. If some number of
|
||||
these are missing or late due to the leader not having sufficient CPU to send
|
||||
them on time, the other servers will detect it as failed and hold a new
|
||||
election.
|
||||
|
||||
It's best to benchmark with a realistic workload when choosing a production server for Consul.
|
||||
Here are some general recommendations:
|
||||
|
@ -159,3 +171,45 @@ To prevent any CPU spikes from a misconfigured client, RPC requests to the serve
|
|||
~> **NOTE** Rate limiting is configured on the client agent only.
|
||||
|
||||
In addition, two [performance indicators](/docs/agent/telemetry.html) — `consul.runtime.alloc_bytes` and `consul.runtime.heap_objects` — can help diagnose if the current sizing is not adequately meeting the load.
|
||||
|
||||
## Connect Certificate Signing CPU Limits
|
||||
|
||||
If you enable [Connect](/docs/connect/index.html), the leader server will need
|
||||
to perform public key signing operations for every service instance in the
|
||||
cluster. Typically these operations are fast on modern hardware, however when
|
||||
the CA is changed or it's key rotated, the leader will face an influx of
|
||||
requests for new certificates for every service instance running.
|
||||
|
||||
While the client agents distribute these randomly over 30 seconds to avoid an
|
||||
immediate thundering herd, they don't have enough information to tune that
|
||||
period based on the number of certificates in use in the cluster so picking
|
||||
longer smearing results in artificially slow rotations for small clusters.
|
||||
|
||||
Smearing requests over 30s is sufficient to bring RPC load to a reasonable level
|
||||
in all but the very largest clusters, but the extra CPU load from cryptographic
|
||||
operations could impact the server's normal work. To limit that, Consul since
|
||||
1.4.1 exposes two ways to limit the impact Certificate signing has on the leader
|
||||
[`csr_max_per_second`](/docs/agent/options.html#ca_csr_max_per_second) and
|
||||
[`csr_max_concurrent`](/docs/agent/options.html#ca_csr_max_concurrent).
|
||||
|
||||
By default we set a limit of 50 per second which is reasonable on modest
|
||||
hardware but may be too low and impact rotation times if more than 1500 service
|
||||
instances are using Connect in the cluster. `csr_max_per_second` is likely best
|
||||
if you have fewer than four cores available since a whole core being used by
|
||||
signing is likely to impact the server stability if it's all or a large portion
|
||||
of the cores available. The downside is that you need to capacity plan: how many
|
||||
service instances will need Connect certificates? What CSR rate can your server
|
||||
tolerate without impacting stability? How fast do you want CA rotations to
|
||||
process?
|
||||
|
||||
For larger production deployments, we generally recommend multiple CPU cores for
|
||||
servers to handle the normal workload. With four or more cores available, it's
|
||||
simpler to limit signing CPU impact with `csr_max_concurrent` rather than tune
|
||||
the rate limit. This effectively sets how many CPU cores can be monopolized by
|
||||
certificate signing work (although it doesn't pin that work to specific cores).
|
||||
In this case `csr_max_per_second` should be disabled (set to `0`).
|
||||
|
||||
For example if you have an 8 core server, setting `csr_max_concurrent` to `1`
|
||||
would allow you to process CSRs as fast as a single core can (which is likely
|
||||
sufficient for the very large clusters), without consuming all available
|
||||
CPU cores and impacting normal server work or stability.
|
||||
|
|
Loading…
Reference in New Issue