connect: agent leaf cert caching improvements (#5091)

* Add State storage and LastResult argument into Cache so that cache.Types can safely store additional data that is eventually expired.

* New Leaf cache type working and basic tests passing. TODO: more extensive testing for the Root change jitter across blocking requests, test concurrent fetches for different leaves interact nicely with rootsWatcher.

* Add multi-client and delayed rotation tests.

* Typos and cleanup error handling in roots watch

* Add comment about how the FetchResult can be used and change ca leaf state to use a non-pointer state.

* Plumb test override of root CA jitter through TestAgent so that tests are deterministic again!

* Fix failing config test
This commit is contained in:
Paul Banks 2019-01-10 12:46:11 +00:00 committed by GitHub
parent 2dfc9ae989
commit 0638e09b6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1036 additions and 394 deletions

View File

@ -3419,8 +3419,10 @@ func (a *Agent) registerCache() {
})
a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
RPC: a,
Cache: a.cache,
RPC: a,
Cache: a.cache,
Datacenter: a.config.Datacenter,
TestOverrideCAChangeInitialDelay: a.config.ConnectTestCALeafRootChangeSpread,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,

View File

@ -1,13 +1,15 @@
package cachetype
import (
"crypto/sha256"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
@ -16,29 +18,242 @@ import (
// Recommended name for registration.
const ConnectCALeafName = "connect-ca-leaf"
// caChangeInitialSpreadDefault is the jitter we apply after noticing the CA
// changed before requesting a new cert. Since we don't know how many services
// are in the cluster we can't be too smart about setting this so it's a
// tradeoff between not making root rotations take unnecessarily long on small
// clusters and not hammering the servers to hard on large ones. Note that
// server's will soon have CSR rate limiting that will limit the impact on big
// clusters, but a small spread in the initial requests still seems like a good
// idea and limits how many clients will hit the rate limit.
const caChangeInitialSpreadDefault = 20 * time.Second
// ConnectCALeaf supports fetching and generating Connect leaf
// certificates.
type ConnectCALeaf struct {
caIndex uint64 // Current index for CA roots
issuedCertsLock sync.RWMutex
issuedCerts map[string]*structs.IssuedCert
// rootWatchMu protects access to the rootWatchSubscribers map and
// rootWatchCancel
rootWatchMu sync.Mutex
// rootWatchSubscribers is a set of chans, one for each currently in-flight
// Fetch. These chans have root updates delivered from the root watcher.
rootWatchSubscribers map[chan struct{}]struct{}
// rootWatchCancel is a func to call to stop the background root watch if any.
// You must hold inflightMu to read (e.g. call) or write the value.
rootWatchCancel func()
RPC RPC // RPC client for remote requests
Cache *cache.Cache // Cache that has CA root certs via ConnectCARoot
// testRootWatchStart/StopCount are testing helpers that allow tests to
// observe the reference counting behavior that governs the shared root watch.
// It's not exactly pretty to expose internals like this, but seems cleaner
// than constructing elaborate and brittle test cases that we can infer
// correct behavior from, and simpler than trying to probe runtime goroutine
// traces to infer correct behavior that way. They must be accessed
// atomically.
testRootWatchStartCount uint32
testRootWatchStopCount uint32
RPC RPC // RPC client for remote requests
Cache *cache.Cache // Cache that has CA root certs via ConnectCARoot
Datacenter string // This agent's datacenter
// TestOverrideCAChangeInitialDelay allows overriding the random jitter after a
// root change with a fixed delay. So far ths is only done in tests. If it's
// zero the caChangeInitialSpreadDefault maximum jitter will be used but if
// set, it overrides and provides a fixed delay. To essentially disable the
// delay in tests they can set it to 1 nanosecond. We may separately allow
// configuring the jitter limit by users later but this is different and for
// tests only since we need to set a deterministic time delay in order to test
// the behaviour here fully and determinstically.
TestOverrideCAChangeInitialDelay time.Duration
}
// issuedKey returns the issuedCerts cache key for a given service and token. We
// use a hash rather than concatenating strings to provide resilience against
// user input containing our separator - both service name and token ID can be
// freely manipulated by user so may contain any delimiter we choose. It also
// has the benefit of not leaking the ACL token to a new place in memory it
// might get accidentally dumped etc.
func issuedKey(service, token string) string {
hash := sha256.New()
hash.Write([]byte(service))
hash.Write([]byte(token))
return fmt.Sprintf("%x", hash.Sum(nil))
// fetchState is some additional metadata we store with each cert in the cache
// to track things like expiry and coordinate paces root rotations. It's
// important this doesn't contain any pointer types since we rely on the struct
// being copied to avoid modifying the actual state in the cache entry during
// Fetch. Pointers themselves are OK, but if we point to another struct that we
// call a method or modify in some way that would directly mutate the cache and
// cause problems. We'd need to deep-clone in that case in Fetch below.
type fetchState struct {
// authorityKeyID is the key ID of the CA root that signed the current cert.
// This is just to save parsing the whole cert everytime we have to check if
// the root changed.
authorityKeyID string
// forceExpireAfter is used to coordinate renewing certs after a CA rotation
// in a staggered way so that we don't overwhelm the servers.
forceExpireAfter time.Time
}
// fetchStart is called on each fetch that is about to block and wait for
// changes to the leaf. It subscribes a chan to receive updates from the shared
// root watcher and triggers root watcher if it's not already running.
func (c *ConnectCALeaf) fetchStart(rootUpdateCh chan struct{}) {
c.rootWatchMu.Lock()
defer c.rootWatchMu.Unlock()
// Lazy allocation
if c.rootWatchSubscribers == nil {
c.rootWatchSubscribers = make(map[chan struct{}]struct{})
}
// Make sure a root watcher is running. We don't only do this on first request
// to be more tolerant of errors that could cause the root watcher to fail and
// exit.
if c.rootWatchCancel == nil {
ctx, cancel := context.WithCancel(context.Background())
c.rootWatchCancel = cancel
go c.rootWatcher(ctx)
}
c.rootWatchSubscribers[rootUpdateCh] = struct{}{}
}
// fetchDone is called when a blocking call exits to unsubscribe from root
// updates and possibly stop the shared root watcher if it's no longer needed.
// Note that typically root CA is still being watched by clients directly and
// probably by the ProxyConfigManager so it will stay hot in cache for a while,
// we are just not monitoring it for updates any more.
func (c *ConnectCALeaf) fetchDone(rootUpdateCh chan struct{}) {
c.rootWatchMu.Lock()
defer c.rootWatchMu.Unlock()
delete(c.rootWatchSubscribers, rootUpdateCh)
if len(c.rootWatchSubscribers) == 0 && c.rootWatchCancel != nil {
// This was the last request. Stop the root watcher.
c.rootWatchCancel()
}
}
// rootWatcher is the shared rootWatcher that runs in a background goroutine
// while needed by one or more inflight Fetch calls.
func (c *ConnectCALeaf) rootWatcher(ctx context.Context) {
atomic.AddUint32(&c.testRootWatchStartCount, 1)
defer atomic.AddUint32(&c.testRootWatchStopCount, 1)
ch := make(chan cache.UpdateEvent, 1)
err := c.Cache.Notify(ctx, ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: c.Datacenter,
}, "roots", ch)
notifyChange := func() {
c.rootWatchMu.Lock()
defer c.rootWatchMu.Unlock()
for ch := range c.rootWatchSubscribers {
select {
case ch <- struct{}{}:
default:
// Don't block - chans are 1-buffered so act as an edge trigger and
// reload CA state directly from cache so they never "miss" updates.
}
}
}
if err != nil {
// Trigger all inflight watchers. We don't pass the error, but they will
// reload from cache and observe the same error and return it to the caller,
// or if it's transient, will continue and the next Fetch will get us back
// into the right state. Seems better than busy loop-retrying here given
// that almost any error we would see here would also be returned from the
// cache get this will trigger.
notifyChange()
return
}
var oldRoots *structs.IndexedCARoots
// Wait for updates to roots or all requests to stop
for {
select {
case <-ctx.Done():
return
case e := <-ch:
// Root response changed in some way. Note this might be the initial
// fetch.
if e.Err != nil {
// See above rationale about the error propagation
notifyChange()
continue
}
roots, ok := e.Result.(*structs.IndexedCARoots)
if !ok {
// See above rationale about the error propagation
notifyChange()
continue
}
// Check that the active root is actually different from the last CA
// config there are many reasons the config might have changed without
// actually updating the CA root that is signing certs in the cluster.
// The Fetch calls will also validate this since the first call here we
// don't know if it changed or not, but there is no point waking up all
// Fetch calls to check this if we know none of them will need to act on
// this update.
if oldRoots != nil && oldRoots.ActiveRootID == roots.ActiveRootID {
continue
}
// Distribute the update to all inflight requests - they will decide
// whether or not they need to act on it.
notifyChange()
oldRoots = roots
}
}
}
// calculateSoftExpiry encapsulates our logic for when to renew a cert based on
// it's age. It returns a pair of times min, max which makes it easier to test
// the logic without non-determinisic jitter to account for. The caller should
// choose a time randomly in between these.
//
// We want to balance a few factors here:
// - renew too early and it increases the aggregate CSR rate in the cluster
// - renew too late and it risks disruption to the service if a transient
// error prevents the renewal
// - we want a broad amount of jitter so if there is an outage, we don't end
// up with all services in sync and causing a thundering herd every
// renewal period. Broader is better for smoothing requests but pushes
// both earlier and later tradeoffs above.
//
// Somewhat arbitrarily the current strategy looks like this:
//
// 0 60% 90%
// Issued [------------------------------|===============|!!!!!] Expires
// 72h TTL: 0 ~43h ~65h
// 1h TTL: 0 36m 54m
//
// Where |===| is the soft renewal period where we jitter for the first attempt
// and |!!!| is the danger zone where we just try immediately.
//
// In the happy path (no outages) the average renewal occurs half way through
// the soft renewal region or at 75% of the cert lifetime which is ~54 hours for
// a 72 hour cert, or 45 mins for a 1 hour cert.
//
// If we are already in the softRenewal period, we randomly pick a time between
// now and the start of the danger zone.
//
// We pass in now to make testing easier.
func calculateSoftExpiry(now time.Time, cert *structs.IssuedCert) (min time.Time, max time.Time) {
certLifetime := cert.ValidBefore.Sub(cert.ValidAfter)
if certLifetime < 10*time.Minute {
// Shouldn't happen as we limit to 1 hour shortest elsewhere but just be
// defensive against strange times or bugs.
return now, now
}
// Find the 60% mark in diagram above
softRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.6))
hardRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.9))
if now.After(hardRenewTime) {
// In the hard renew period, or already expired. Renew now!
return now, now
}
if now.After(softRenewTime) {
// Already in the soft renew period, make now the lower bound for jitter
softRenewTime = now
}
return softRenewTime, hardRenewTime
}
func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
@ -51,92 +266,165 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
"Internal cache failure: request wrong type: %T", req)
}
// This channel watches our overall timeout. The other goroutines
// launched in this function should end all around the same time so
// they clean themselves up.
// Do we already have a cert in the cache?
var existing *structs.IssuedCert
// Really important this is not a pointer type since otherwise we would set it
// to point to the actual fetchState in the cache entry below and then would
// be directly modifying that in the cache entry even when we might later
// return an error and not update index etc. By being a value, we force a copy
var state fetchState
if opts.LastResult != nil {
existing, ok = opts.LastResult.Value.(*structs.IssuedCert)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last value wrong type: %T", req)
}
state, ok = opts.LastResult.State.(fetchState)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last state wrong type: %T", req)
}
} else {
state = fetchState{}
}
// Handle brand new request first as it's simplest.
if existing == nil {
return c.generateNewLeaf(reqReal, &state)
}
// We have a certificate in cache already. Check it's still valid.
now := time.Now()
minExpire, maxExpire := calculateSoftExpiry(now, existing)
expiresAt := minExpire.Add(lib.RandomStagger(maxExpire.Sub(minExpire)))
// Check if we have been force-expired by a root update that jittered beyond
// the timeout of the query it was running.
if !state.forceExpireAfter.IsZero() && state.forceExpireAfter.Before(expiresAt) {
expiresAt = state.forceExpireAfter
}
if expiresAt == now || expiresAt.Before(now) {
// Already expired, just make a new one right away
return c.generateNewLeaf(reqReal, &state)
}
// We are about to block and wait for a change or timeout.
// Make a chan we can be notified of changes to CA roots on. It must be
// buffered so we don't miss broadcasts from rootsWatch. It is an edge trigger
// so a single buffer element is sufficient regardless of whether we consume
// the updates fast enough since as soon as we see an element in it, we will
// reload latest CA from cache.
rootUpdateCh := make(chan struct{}, 1)
// Subscribe our chan to get root update notification.
c.fetchStart(rootUpdateCh)
defer c.fetchDone(rootUpdateCh)
// Setup result to mirror the current value for if we timeout. This allows us
// to update the state even if we don't generate a new cert.
result.Value = existing
result.Index = existing.ModifyIndex
result.State = state
// Setup the timeout chan outside the loop so we don't keep bumping the timout
// later if we loop around.
timeoutCh := time.After(opts.Timeout)
// Kick off the goroutine that waits for new CA roots. The channel buffer
// is so that the goroutine doesn't block forever if we return for other
// reasons.
newRootCACh := make(chan error, 1)
go c.waitNewRootCA(reqReal.Datacenter, newRootCACh, opts.Timeout)
// Setup initial expiry chan. We may change this if root update occurs in the
// loop below.
expiresCh := time.After(expiresAt.Sub(now))
// Generate a cache key to lookup/store the cert. We MUST generate a new cert
// per token used to ensure revocation by ACL token is robust.
issuedKey := issuedKey(reqReal.Service, reqReal.Token)
// Current cert is valid so just wait until it expires or we time out.
for {
select {
case <-timeoutCh:
// We timed out the request with same cert.
return result, nil
// Get our prior cert (if we had one) and use that to determine our
// expiration time. If no cert exists, we expire immediately since we
// need to generate.
c.issuedCertsLock.RLock()
lastCert := c.issuedCerts[issuedKey]
c.issuedCertsLock.RUnlock()
case <-expiresCh:
// Cert expired or was force-expired by a root change.
return c.generateNewLeaf(reqReal, &state)
var leafExpiryCh <-chan time.Time
if lastCert != nil {
// Determine how long we wait until triggering. If we've already
// expired, we trigger immediately.
if expiryDur := lastCert.ValidBefore.Sub(time.Now()); expiryDur > 0 {
leafExpiryCh = time.After(expiryDur - 1*time.Hour)
// TODO(mitchellh): 1 hour buffer is hardcoded above
// We should not depend on the cache package de-duplicating requests for
// the same service/token (which is all we care about keying our local
// issued cert cache on) since it might later make sense to partition
// clients for other reasons too. So if the request has a 0 MinIndex, and
// the cached cert is still valid, then the client is expecting an
// immediate response and hasn't already seen the cached cert, return it
// now.
if opts.MinIndex == 0 {
result.Value = lastCert
result.Index = lastCert.ModifyIndex
return result, nil
case <-rootUpdateCh:
// A root cache change occurred, reload roots from cache.
roots, err := c.rootsFromCache()
if err != nil {
return result, err
}
// Handle _possibly_ changed roots. We still need to verify the new active
// root is not the same as the one our current cert was signed by since we
// can be notified spuriously if we are the first request since the
// rootsWatcher didn't know about the CA we were signed by.
if activeRootHasKey(roots, state.authorityKeyID) {
// Current active CA is the same one that signed our current cert so
// keep waiting for a change.
continue
}
// CA root changed. We add some jitter here to avoid a thundering herd.
// See docs on caChangeInitialJitter const.
delay := lib.RandomStagger(caChangeInitialSpreadDefault)
if c.TestOverrideCAChangeInitialDelay > 0 {
delay = c.TestOverrideCAChangeInitialDelay
}
// Force the cert to be expired after the jitter - the delay above might
// be longer than we have left on our timeout. We set forceExpireAfter in
// the cache state so the next request will notice we still need to renew
// and do it at the right time. This is cleared once a new cert is
// returned by generateNewLeaf.
state.forceExpireAfter = time.Now().Add(delay)
// If the delay time is within the current timeout, we want to renew the
// as soon as it's up. We change the expire time and chan so that when we
// loop back around, we'll wait at most delay until generating a new cert.
if state.forceExpireAfter.Before(expiresAt) {
expiresAt = state.forceExpireAfter
expiresCh = time.After(delay)
}
continue
}
}
}
if leafExpiryCh == nil {
// If the channel is still nil then it means we need to generate
// a cert no matter what: we either don't have an existing one or
// it is expired.
leafExpiryCh = time.After(0)
}
// Block on the events that wake us up.
select {
case <-timeoutCh:
// On a timeout, we just return the empty result and no error.
// It isn't an error to timeout, its just the limit of time the
// caching system wants us to block for. By returning an empty result
// the caching system will ignore.
return result, nil
case err := <-newRootCACh:
// A new root CA triggers us to refresh the leaf certificate.
// If there was an error while getting the root CA then we return.
// Otherwise, we leave the select statement and move to generation.
if err != nil {
return result, err
func activeRootHasKey(roots *structs.IndexedCARoots, currentSigningKeyID string) bool {
for _, ca := range roots.Roots {
if ca.Active {
if ca.SigningKeyID == currentSigningKeyID {
return true
}
// Found the active CA but it has changed
return false
}
case <-leafExpiryCh:
// The existing leaf certificate is expiring soon, so we generate a
// new cert with a healthy overlapping validity period (determined
// by the above channel).
}
// Shouldn't be possible since at least one root should be active.
return false
}
// Need to lookup RootCAs response to discover trust domain. First just lookup
// with no blocking info - this should be a cache hit most of the time.
func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) {
rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: reqReal.Datacenter,
Datacenter: c.Datacenter,
})
if err != nil {
return result, err
return nil, err
}
roots, ok := rawRoots.(*structs.IndexedCARoots)
if !ok {
return result, errors.New("invalid RootCA response type")
return nil, errors.New("invalid RootCA response type")
}
return roots, nil
}
// generateNewLeaf does the actual work of creating a new private key,
// generating a CSR and getting it signed by the servers.
func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchState) (cache.FetchResult, error) {
var result cache.FetchResult
// Need to lookup RootCAs response to discover trust domain. This should be a
// cache hit.
roots, err := c.rootsFromCache()
if err != nil {
return result, err
}
if roots.TrustDomain == "" {
return result, errors.New("cluster has no CA bootstrapped yet")
@ -145,9 +433,9 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// Build the service ID
serviceID := &connect.SpiffeIDService{
Host: roots.TrustDomain,
Datacenter: reqReal.Datacenter,
Datacenter: req.Datacenter,
Namespace: "default",
Service: reqReal.Service,
Service: req.Service,
}
// Create a new private key
@ -165,8 +453,8 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// Request signing
var reply structs.IssuedCert
args := structs.CASignRequest{
WriteRequest: structs.WriteRequest{Token: reqReal.Token},
Datacenter: reqReal.Datacenter,
WriteRequest: structs.WriteRequest{Token: req.Token},
Datacenter: req.Datacenter,
CSR: csr,
}
if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil {
@ -174,80 +462,22 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
}
reply.PrivateKeyPEM = pkPEM
// Lock the issued certs map so we can insert it. We only insert if
// we didn't happen to get a newer one. This should never happen since
// the Cache should ensure only one Fetch per service, but we sanity
// check just in case.
c.issuedCertsLock.Lock()
defer c.issuedCertsLock.Unlock()
lastCert = c.issuedCerts[issuedKey]
if lastCert == nil || lastCert.ModifyIndex < reply.ModifyIndex {
if c.issuedCerts == nil {
c.issuedCerts = make(map[string]*structs.IssuedCert)
}
// Reset the forcedExpiry in the state
state.forceExpireAfter = time.Time{}
c.issuedCerts[issuedKey] = &reply
lastCert = &reply
}
result.Value = lastCert
result.Index = lastCert.ModifyIndex
return result, nil
}
// waitNewRootCA blocks until a new root CA is available or the timeout is
// reached (on timeout ErrTimeout is returned on the channel).
func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error,
timeout time.Duration) {
// We always want to block on at least an initial value. If this isn't
minIndex := atomic.LoadUint64(&c.caIndex)
if minIndex == 0 {
minIndex = 1
}
// Fetch some new roots. This will block until our MinQueryIndex is
// matched or the timeout is reached.
rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: datacenter,
QueryOptions: structs.QueryOptions{
MinQueryIndex: minIndex,
MaxQueryTime: timeout,
},
})
cert, err := connect.ParseCert(reply.CertPEM)
if err != nil {
ch <- err
return
return result, err
}
// Set the CA key ID so we can easily tell when a active root has changed.
state.authorityKeyID = connect.HexString(cert.AuthorityKeyId)
roots, ok := rawRoots.(*structs.IndexedCARoots)
if !ok {
// This should never happen but we don't want to even risk a panic
ch <- fmt.Errorf(
"internal error: CA root cache returned bad type: %T", rawRoots)
return
}
// We do a loop here because there can be multiple waitNewRootCA calls
// happening simultaneously. Each Fetch kicks off one call. These are
// multiplexed through Cache.Get which should ensure we only ever
// actually make a single RPC call. However, there is a race to set
// the caIndex field so do a basic CAS loop here.
for {
// We only set our index if its newer than what is previously set.
old := atomic.LoadUint64(&c.caIndex)
if old == roots.Index || old > roots.Index {
break
}
// Set the new index atomically. If the caIndex value changed
// in the meantime, retry.
if atomic.CompareAndSwapUint64(&c.caIndex, old, roots.Index) {
break
}
}
// Trigger the channel since we updated.
ch <- nil
result.Value = &reply
// Store value not pointer so we don't accidentally mutate the cache entry
// state in Fetch.
result.State = *state
result.Index = reply.ModifyIndex
return result, nil
}
func (c *ConnectCALeaf) SupportsBlocking() bool {

View File

@ -6,12 +6,139 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestCalculateSoftExpire(t *testing.T) {
tests := []struct {
name string
now string
issued string
lifetime time.Duration
wantMin string
wantMax string
}{
{
name: "72h just issued",
now: "2018-01-01 00:00:01",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Should jitter between 60% and 90% of the lifetime which is 43.2/64.8
// hours after issued
wantMin: "2018-01-02 19:12:00",
wantMax: "2018-01-03 16:48:00",
},
{
name: "72h in renew range",
// This time should be inside the renewal range.
now: "2018-01-02 20:00:20",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Min should be the "now" time
wantMin: "2018-01-02 20:00:20",
wantMax: "2018-01-03 16:48:00",
},
{
name: "72h in hard renew",
// This time should be inside the renewal range.
now: "2018-01-03 18:00:00",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-03 18:00:00",
wantMax: "2018-01-03 18:00:00",
},
{
name: "72h expired",
// This time is after expiry
now: "2018-01-05 00:00:00",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-05 00:00:00",
wantMax: "2018-01-05 00:00:00",
},
{
name: "1h just issued",
now: "2018-01-01 00:00:01",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Should jitter between 60% and 90% of the lifetime which is 36/54 mins
// hours after issued
wantMin: "2018-01-01 00:36:00",
wantMax: "2018-01-01 00:54:00",
},
{
name: "1h in renew range",
// This time should be inside the renewal range.
now: "2018-01-01 00:40:00",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Min should be the "now" time
wantMin: "2018-01-01 00:40:00",
wantMax: "2018-01-01 00:54:00",
},
{
name: "1h in hard renew",
// This time should be inside the renewal range.
now: "2018-01-01 00:55:00",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-01 00:55:00",
wantMax: "2018-01-01 00:55:00",
},
{
name: "1h expired",
// This time is after expiry
now: "2018-01-01 01:01:01",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-01 01:01:01",
wantMax: "2018-01-01 01:01:01",
},
{
name: "too short lifetime",
// This time is after expiry
now: "2018-01-01 01:01:01",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Minute,
// Min and max should both be the "now" time
wantMin: "2018-01-01 01:01:01",
wantMax: "2018-01-01 01:01:01",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
require := require.New(t)
now, err := time.Parse("2006-01-02 15:04:05", tc.now)
require.NoError(err)
issued, err := time.Parse("2006-01-02 15:04:05", tc.issued)
require.NoError(err)
wantMin, err := time.Parse("2006-01-02 15:04:05", tc.wantMin)
require.NoError(err)
wantMax, err := time.Parse("2006-01-02 15:04:05", tc.wantMax)
require.NoError(err)
min, max := calculateSoftExpiry(now, &structs.IssuedCert{
ValidAfter: issued,
ValidBefore: issued.Add(tc.lifetime),
})
require.Equal(wantMin, min)
require.Equal(wantMax, max)
})
}
}
// Test that after an initial signing, new CA roots (new ID) will
// trigger a blocking query to execute.
func TestConnectCALeaf_changingRoots(t *testing.T) {
@ -23,10 +150,16 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
caRoot := connect.TestCA(t, nil)
caRoot.Active = true
rootsCh <- structs.IndexedCARoots{
ActiveRootID: "1",
ActiveRootID: caRoot.ID,
TrustDomain: "fake-trust-domain.consul",
QueryMeta: structs.QueryMeta{Index: 1},
Roots: []*structs.CARoot{
caRoot,
},
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign to return signed cert
@ -35,7 +168,10 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply.ValidBefore = time.Now().Add(12 * time.Hour)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
reply.ValidBefore = time.Now().Add(11 * time.Hour)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
resp = reply
@ -51,10 +187,11 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 1,
}, result)
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
require.Equal(uint64(1), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
// Second fetch should block with set index
@ -66,20 +203,28 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}
// Let's send in new roots, which should trigger the sign req
// Let's send in new roots, which should trigger the sign req. We need to take
// care to set the new root as active
caRoot2 := connect.TestCA(t, nil)
caRoot2.Active = true
caRoot.Active = false
rootsCh <- structs.IndexedCARoots{
ActiveRootID: "2",
ActiveRootID: caRoot2.ID,
TrustDomain: "fake-trust-domain.consul",
QueryMeta: structs.QueryMeta{Index: 2},
Roots: []*structs.CARoot{
caRoot2,
caRoot,
},
QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)},
}
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 2,
}, result)
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
// 3 since the second CA "update" used up 2
require.Equal(uint64(3), v.Index)
}
// Third fetch should block
@ -91,6 +236,331 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
}
}
// Tests that if the root change jitter is longer than the time left on the
// timeout, we return normally but then still renew the cert on a subsequent
// call.
func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) {
t.Parallel()
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
// Override the root-change delay so we will timeout first. We can't set it to
// a crazy high value otherwise we'll have to wait that long in the test to
// see if it actually happens on subsequent calls. We instead reduce the
// timeout in FetchOptions to be much shorter than this.
typ.TestOverrideCAChangeInitialDelay = 100 * time.Millisecond
caRoot := connect.TestCA(t, nil)
caRoot.Active = true
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot,
},
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign to return signed cert
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
reply.ValidBefore = time.Now().Add(11 * time.Hour)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
resp = reply
})
// We'll reuse the fetch options and request. Timeout must be much shorter
// than the initial root delay. 20ms means that if we deliver the root change
// during the first blocking call, we should need to block fully for 5 more
// calls before the cert is renewed. We pick a timeout that is not an exact
// multiple of the 100ms delay above to reduce the chance that timing works
// out in a way that makes it hard to tell a timeout from an early return due
// to a cert renewal.
opts := cache.FetchOptions{MinIndex: 0, Timeout: 35 * time.Millisecond}
req := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web"}
// First fetch should return immediately
fetchCh := TestFetchCh(t, typ, opts, req)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
require.Equal(uint64(1), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
// Let's send in new roots, which should eventually trigger the sign req. We
// need to take care to set the new root as active
caRoot2 := connect.TestCA(t, nil)
caRoot2.Active = true
caRoot.Active = false
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot2.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot2,
caRoot,
},
QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)},
}
earliestRootDelivery := time.Now()
// Some number of fetches (2,3,4 likely) should timeout after 20ms and after
// 100ms has elapsed total we should see the new cert. Since this is all very
// timing dependent, we don't hard code exact numbers here and instead loop
// for plenty of time and do as many calls as it takes and just assert on the
// time taken and that the call either blocks and returns the cached cert, or
// returns the new one.
opts.MinIndex = 1
var shouldExpireAfter time.Time
i := 1
rootsDelivered := false
for rootsDelivered {
start := time.Now()
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case result := <-fetchCh:
v := mustFetchResult(t, result)
timeTaken := time.Since(start)
// There are two options, either it blocked waiting for the delay after
// the rotation or it returned the new CA cert before the timeout was
// done. TO be more robust against timing, we take the value as the
// decider for which case it is, and assert timing matches our expected
// bounds rather than vice versa.
if v.Index > uint64(1) {
// Got a new cert
require.Equal(resp, v.Value)
require.Equal(uint64(3), v.Index)
// Should not have been delivered before the delay
require.True(time.Since(earliestRootDelivery) > typ.TestOverrideCAChangeInitialDelay)
// All good. We are done!
rootsDelivered = true
} else {
// Should be the cached cert
require.Equal(resp, v.Value)
require.Equal(uint64(1), v.Index)
// Sanity check we blocked for the whole timeout
require.Truef(timeTaken > opts.Timeout,
"should block for at least %s, returned after %s",
opts.Timeout, timeTaken)
// Sanity check that the forceExpireAfter state was set correctly
shouldExpireAfter = v.State.(*fetchState).forceExpireAfter
require.True(shouldExpireAfter.After(time.Now()))
require.True(shouldExpireAfter.Before(time.Now().Add(typ.TestOverrideCAChangeInitialDelay)))
}
// Set the LastResult for subsequent fetches
opts.LastResult = &v
case <-time.After(50 * time.Millisecond):
t.Fatalf("request %d blocked too long", i)
}
i++
// Sanity check that we've not gone way beyond the deadline without a
// new cert. We give some leeway to make it less brittle.
require.Falsef(
time.Now().After(shouldExpireAfter.Add(100*time.Millisecond)),
"waited extra 100ms and delayed CA rotate renew didn't happen")
}
}
// This test runs multiple concurrent callers watching different leaf certs and
// tries to ensure that the background root watch activity behaves correctly.
func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) {
t.Parallel()
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
caRoot := connect.TestCA(t, nil)
caRoot.Active = true
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot,
},
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign to return signed cert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
// Note we will sign certs for same service name each time because
// otherwise we have to re-invent whole CSR endpoint here to be able to
// control things - parse PEM sign with right key etc. It doesn't matter -
// we use the CreateIndex to differentiate the "right" results.
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
reply.ValidBefore = time.Now().Add(11 * time.Hour)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
})
// n is the number of clients we'll run
n := 3
// setup/testDoneCh are used for coordinating clients such that each has
// initial cert delivered and is blocking before the root changes. It's not a
// wait group since we want to be able to timeout the main test goroutine if
// one of the clients gets stuck. Instead it's a buffered chan.
setupDoneCh := make(chan struct{}, n)
testDoneCh := make(chan struct{}, n)
// rootsUpdate is used to coordinate clients so they know when they should
// expect to see leaf renewed after root change.
rootsUpdatedCh := make(chan struct{})
// Create a function that models a single client. It should go through the
// steps of getting an initial cert and then watching for changes until root
// updates.
client := func(i int) {
// We'll reuse the fetch options and request
opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Second}
req := &ConnectCALeafRequest{Datacenter: "dc1", Service: fmt.Sprintf("web-%d", i)}
// First fetch should return immediately
fetchCh := TestFetchCh(t, typ, opts, req)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
v := mustFetchResult(t, result)
opts.LastResult = &v
}
// Second fetch should block with set index
opts.MinIndex = 1
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case result := <-fetchCh:
t.Fatalf("should not return: %#v", result)
case <-time.After(100 * time.Millisecond):
}
// We're done with setup and the blocking call is still blocking in
// background.
setupDoneCh <- struct{}{}
// Wait until all others are also done and roots change incase there are
// stragglers delaying the root update.
select {
case <-rootsUpdatedCh:
case <-time.After(200 * time.Millisecond):
t.Fatalf("waited too long for root update")
}
// Now we should see root update within a short period
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
v := mustFetchResult(t, result)
// Index must be different
require.NotEqual(opts.MinIndex, v.Value.(*structs.IssuedCert).CreateIndex)
}
testDoneCh <- struct{}{}
}
// Sanity check the roots watcher is not running yet
assertRootsWatchCounts(t, typ, 0, 0)
for i := 0; i < n; i++ {
go client(i)
}
timeoutCh := time.After(200 * time.Millisecond)
for i := 0; i < n; i++ {
select {
case <-timeoutCh:
t.Fatal("timed out waiting for clients")
case <-setupDoneCh:
}
}
// Should be 3 clients running now, so the roots watcher should have started
// once and not stopped.
assertRootsWatchCounts(t, typ, 1, 0)
// Now we deliver the root update
caRoot2 := connect.TestCA(t, nil)
caRoot2.Active = true
caRoot.Active = false
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot2.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot2,
caRoot,
},
QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)},
}
// And notify clients
close(rootsUpdatedCh)
timeoutCh = time.After(200 * time.Millisecond)
for i := 0; i < n; i++ {
select {
case <-timeoutCh:
t.Fatalf("timed out waiting for %d of %d clients to renew after root change", n-i, n)
case <-testDoneCh:
}
}
// All active requests have returned the new cert so the rootsWatcher should
// have stopped. This is timing dependent though so retry a few times
retry.RunWith(retry.ThreeTimes(), t, func(r *retry.R) {
assertRootsWatchCounts(r, typ, 1, 1)
})
}
func assertRootsWatchCounts(t require.TestingT, typ *ConnectCALeaf, wantStarts, wantStops int) {
if tt, ok := t.(*testing.T); ok {
tt.Helper()
}
starts := atomic.LoadUint32(&typ.testRootWatchStartCount)
stops := atomic.LoadUint32(&typ.testRootWatchStopCount)
require.Equal(t, wantStarts, int(starts))
require.Equal(t, wantStops, int(stops))
}
func mustFetchResult(t *testing.T, result interface{}) cache.FetchResult {
t.Helper()
switch v := result.(type) {
case error:
require.NoError(t, v)
case cache.FetchResult:
return v
default:
t.Fatalf("unexpected type from fetch %T", v)
}
return cache.FetchResult{}
}
// Test that after an initial signing, an expiringLeaf will trigger a
// blocking query to resign.
func TestConnectCALeaf_expiringLeaf(t *testing.T) {
@ -102,10 +572,16 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
caRoot := connect.TestCA(t, nil)
caRoot.Active = true
rootsCh <- structs.IndexedCARoots{
ActiveRootID: "1",
ActiveRootID: caRoot.ID,
TrustDomain: "fake-trust-domain.consul",
QueryMeta: structs.QueryMeta{Index: 1},
Roots: []*structs.CARoot{
caRoot,
},
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign to
@ -117,11 +593,17 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
// This sets the validity to 0 on the first call, and
// 12 hours+ on subsequent calls. This means that our first
// cert expires immediately.
reply.ValidBefore = time.Now().Add((12 * time.Hour) *
time.Duration(reply.CreateIndex-1))
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
if reply.CreateIndex == 1 {
// First call returns expired cert to prime cache with an expired one.
reply.ValidAfter = time.Now().Add(-13 * time.Hour)
reply.ValidBefore = time.Now().Add(-1 * time.Hour)
} else {
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
reply.ValidBefore = time.Now().Add(11 * time.Hour)
}
resp = reply
})
@ -136,10 +618,15 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 1,
}, result)
switch v := result.(type) {
case error:
require.NoError(v)
case cache.FetchResult:
require.Equal(resp, v.Value)
require.Equal(uint64(1), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
}
// Second fetch should return immediately despite there being
@ -149,10 +636,15 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 2,
}, result)
switch v := result.(type) {
case error:
require.NoError(v)
case cache.FetchResult:
require.Equal(resp, v.Value)
require.Equal(uint64(2), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
}
// Third fetch should block since the cert is not expiring and
@ -166,188 +658,6 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
}
}
// Test that once one client (e.g. the proxycfg.Manager) has fetched a cert,
// that subsequent clients get it returned immediately and don't block until it
// expires or their request times out. Note that typically FEtches at this level
// are de-duped by the cache higher up, but if the two clients are using
// different ACL tokens for example (common) that may not be the case, and we
// should wtill deliver correct blocking semantics to both.
//
// Additionally, we want to make sure that clients with different tokens
// generate distinct certs since we might later want to revoke all certs fetched
// with a given token but can't if a client using that token was served a cert
// generated under a different token (say the agent token).
func TestConnectCALeaf_multipleClientsDifferentTokens(t *testing.T) {
t.Parallel()
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
rootsCh <- structs.IndexedCARoots{
ActiveRootID: "1",
TrustDomain: "fake-trust-domain.consul",
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign to
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
reply.ValidBefore = time.Now().Add(12 * time.Hour)
resp = reply
})
// We'll reuse the fetch options and request
opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute}
reqA := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "A-token"}
reqB := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "B-token"}
// First fetch (Client A, MinIndex = 0) should return immediately
fetchCh := TestFetchCh(t, typ, opts, reqA)
var certA *structs.IssuedCert
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 1,
}, result)
certA = result.(cache.FetchResult).Value.(*structs.IssuedCert)
}
// Second fetch (Client B, MinIndex = 0) should return immediately
fetchCh = TestFetchCh(t, typ, opts, reqB)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 2,
}, result)
// Different tokens should result in different certs. Note that we don't
// actually generate and sign real certs in this test with our mock RPC but
// this is enough to be sure we actually generated a different Private Key
// for each one and aren't just differnt due to index values.
require.NotEqual(certA.PrivateKeyPEM,
result.(cache.FetchResult).Value.(*structs.IssuedCert).PrivateKeyPEM)
}
// Third fetch (Client A, MinIndex = > 0) should block
opts.MinIndex = 2
fetchCh = TestFetchCh(t, typ, opts, reqA)
select {
case result := <-fetchCh:
t.Fatalf("should not return: %#v", result)
case <-time.After(100 * time.Millisecond):
}
// Fourth fetch (Client B, MinIndex = > 0) should block
fetchCh = TestFetchCh(t, typ, opts, reqB)
select {
case result := <-fetchCh:
t.Fatalf("should not return: %#v", result)
case <-time.After(100 * time.Millisecond):
}
}
// Test that once one client (e.g. the proxycfg.Manager) has fetched a cert,
// that subsequent clients get it returned immediately and don't block until it
// expires or their request times out. Note that typically Fetches at this level
// are de-duped by the cache higher up, the test above explicitly tests the case
// where two clients with different tokens request the same cert. However two
// clients sharing a token _may_ share the certificate, but the cachetype should
// not implicitly depend on the cache mechanism de-duping these clients.
//
// Genrally we _shouldn't_ rely on implementation details in the cache package
// about partitioning to behave correctly as that is likely to lead to subtle
// errors later when the implementation there changes, so this test ensures that
// even if the cache for some reason decides to not share an existing cache
// entry with a second client despite using the same token, that we don't block
// it's initial request assuming that it's already recieved the in-memory and
// still valid cert.
func TestConnectCALeaf_multipleClientsSameToken(t *testing.T) {
t.Parallel()
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
rootsCh <- structs.IndexedCARoots{
ActiveRootID: "1",
TrustDomain: "fake-trust-domain.consul",
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign to
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
reply.ValidBefore = time.Now().Add(12 * time.Hour)
resp = reply
})
// We'll reuse the fetch options and request
opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute}
reqA := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "shared-token"}
reqB := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web", Token: "shared-token"}
// First fetch (Client A, MinIndex = 0) should return immediately
fetchCh := TestFetchCh(t, typ, opts, reqA)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 1,
}, result)
}
// Second fetch (Client B, MinIndex = 0) should return immediately
fetchCh = TestFetchCh(t, typ, opts, reqB)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
require.Equal(cache.FetchResult{
Value: resp,
Index: 1, // Same result as last fetch
}, result)
}
// Third fetch (Client A, MinIndex = > 0) should block
opts.MinIndex = 1
fetchCh = TestFetchCh(t, typ, opts, reqA)
select {
case result := <-fetchCh:
t.Fatalf("should not return: %#v", result)
case <-time.After(100 * time.Millisecond):
}
// Fourth fetch (Client B, MinIndex = > 0) should block
fetchCh = TestFetchCh(t, typ, opts, reqB)
select {
case result := <-fetchCh:
t.Fatalf("should not return: %#v", result)
case <-time.After(100 * time.Millisecond):
}
}
// testCALeafType returns a *ConnectCALeaf that is pre-configured to
// use the given RPC implementation for "ConnectCA.Sign" operations.
func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.IndexedCARoots) {
@ -368,7 +678,16 @@ func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.Indexed
})
// Create the leaf type
return &ConnectCALeaf{RPC: rpc, Cache: c}, rootsCh
return &ConnectCALeaf{
RPC: rpc,
Cache: c,
Datacenter: "dc1",
// Override the root-change spread so we don't have to wait up to 20 seconds
// to see root changes work. Can be changed back for specific tests that
// need to test this, Note it's not 0 since that used default but is
// effectively the same.
TestOverrideCAChangeInitialDelay: 1 * time.Microsecond,
}, rootsCh
}
// testGatedRootsRPC will send each subsequent value on the channel as the

View File

@ -454,6 +454,13 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
fOpts.MinIndex = entry.Index
fOpts.Timeout = tEntry.Opts.RefreshTimeout
}
if entry.Valid {
fOpts.LastResult = &FetchResult{
Value: entry.Value,
State: entry.State,
Index: entry.Index,
}
}
// Start building the new entry by blocking on the fetch.
result, err := tEntry.Type.Fetch(fOpts, r)
@ -476,6 +483,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
if result.Value != nil {
// A new value was given, so we create a brand new entry.
newEntry.Value = result.Value
newEntry.State = result.State
newEntry.Index = result.Index
newEntry.FetchedAt = time.Now()
if newEntry.Index < 1 {

View File

@ -379,9 +379,17 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
c := TestCache(t)
c.RegisterType("t", typ, nil)
stateCh := make(chan int, 1)
// Configure the type
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Times(1)
typ.Static(FetchResult{Value: nil}, nil)
typ.Static(FetchResult{Value: 42, State: 31, Index: 1}, nil).Times(1)
// Return different State, it should be ignored
typ.Static(FetchResult{Value: nil, State: 32}, nil).Run(func(args mock.Arguments) {
// We should get back the original state
opts := args.Get(0).(FetchOptions)
require.NotNil(opts.LastResult)
stateCh <- opts.LastResult.State.(int)
})
// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "hello"})
@ -398,6 +406,29 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
require.Equal(42, result)
require.False(meta.Hit)
// State delivered to second call should be the result from first call.
select {
case state := <-stateCh:
require.Equal(31, state)
case <-time.After(20 * time.Millisecond):
t.Fatal("timed out")
}
// Next request should get the first returned state too since last Fetch
// returned nil result.
req = TestRequest(t, RequestInfo{
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
result, meta, err = c.Get("t", req)
require.NoError(err)
require.Equal(42, result)
require.False(meta.Hit)
select {
case state := <-stateCh:
require.Equal(31, state)
case <-time.After(20 * time.Millisecond):
t.Fatal("timed out")
}
// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)

View File

@ -12,6 +12,13 @@ import (
type cacheEntry struct {
// Fields pertaining to the actual value
Value interface{}
// State can be used to store info needed by the cache type but that should
// not be part of the result the client gets. For example the Connect Leaf
// type needs to store additional data about when it last attempted a renewal
// that is not part of the actual IssuedCert struct it returns. It's opaque to
// the Cache but allows types to store additional data that is coupled to the
// cache entry's lifetime and will be aged out by TTL etc.
State interface{}
Error error
Index uint64

52
agent/cache/type.go vendored
View File

@ -8,20 +8,25 @@ import (
type Type interface {
// Fetch fetches a single unique item.
//
// The FetchOptions contain the index and timeouts for blocking queries.
// The MinIndex value on the Request itself should NOT be used
// as the blocking index since a request may be reused multiple times
// as part of Refresh behavior.
// The FetchOptions contain the index and timeouts for blocking queries. The
// MinIndex value on the Request itself should NOT be used as the blocking
// index since a request may be reused multiple times as part of Refresh
// behavior.
//
// The return value is a FetchResult which contains information about
// the fetch. If an error is given, the FetchResult is ignored. The
// cache does not support backends that return partial values.
// The return value is a FetchResult which contains information about the
// fetch. If an error is given, the FetchResult is ignored. The cache does not
// support backends that return partial values. Optional State can be added to
// the FetchResult which will be stored with the cache entry and provided to
// the next Fetch call but will not be returned to clients. This allows types
// to add additional bookkeeping data per cache entry that will still be aged
// out along with the entry's TTL.
//
// On timeout, FetchResult can behave one of two ways. First, it can
// return the last known value. This is the default behavior of blocking
// RPC calls in Consul so this allows cache types to be implemented with
// no extra logic. Second, FetchResult can return an unset value and index.
// In this case, the cache will reuse the last value automatically.
// On timeout, FetchResult can behave one of two ways. First, it can return
// the last known value. This is the default behavior of blocking RPC calls in
// Consul so this allows cache types to be implemented with no extra logic.
// Second, FetchResult can return an unset value and index. In this case, the
// cache will reuse the last value automatically. If an unset Value is
// returned, the State field will also be ignored currently.
Fetch(FetchOptions, Request) (FetchResult, error)
// SupportsBlocking should return true if the type supports blocking queries.
@ -41,6 +46,23 @@ type FetchOptions struct {
// Timeout is the maximum time for the query. This must be implemented
// in the Fetch itself.
Timeout time.Duration
// LastResult is the result from the last successful Fetch and represents the
// value currently stored in the cache at the time Fetch is invoked. It will
// be nil on first call where there is no current cache value. There may have
// been other Fetch attempts that resulted in an error in the mean time. These
// are not explicitly represented currently. We could add that if needed this
// was just simpler for now.
//
// The FetchResult read-only! It is constructed per Fetch call so modifying
// the struct directly (e.g. changing it's Index of Value field) will have no
// effect, however the Value and State fields may be pointers to the actual
// values stored in the cache entry. It is thread-unsafe to modify the Value
// or State via pointers since readers may be concurrently inspecting those
// values under the entry lock (although we guarantee only one Fetch call per
// entry) and modifying them even if the index doesn't change or the Fetch
// eventually errors will likely break logical invariants in the cache too!
LastResult *FetchResult
}
// FetchResult is the result of a Type Fetch operation and contains the
@ -49,6 +71,12 @@ type FetchResult struct {
// Value is the result of the fetch.
Value interface{}
// State is opaque data stored in the cache but not returned to clients. It
// can be used by Types to maintain any bookkeeping they need between fetches
// (using FetchOptions.LastResult) in a way that gets automatically cleaned up
// by TTL expiry etc.
State interface{}
// Index is the corresponding index value for this data.
Index uint64
}

View File

@ -531,7 +531,7 @@ type RuntimeConfig struct {
// ConnectReplicationToken is the ACL token used for replicating intentions.
ConnectReplicationToken string
// ConnectTestDisableManagedProxies is not exposed to public config but us
// ConnectTestDisableManagedProxies is not exposed to public config but is
// used by TestAgent to prevent self-executing the test binary in the
// background if a managed proxy is created for a test. The only place we
// actually want to test processes really being spun up and managed is in
@ -541,6 +541,13 @@ type RuntimeConfig struct {
// processes up.
ConnectTestDisableManagedProxies bool
// ConnectTestCALeafRootChangeSpread is used to control how long the CA leaf
// cache with spread CSRs over when a root change occurs. For now we don't
// expose this in public config intentionally but could later with a rename.
// We only set this from during tests to effectively make CA rotation tests
// deterministic again.
ConnectTestCALeafRootChangeSpread time.Duration
// DNSAddrs contains the list of TCP and UDP addresses the DNS server will
// bind to. If the DNS endpoint is disabled (ports.dns <= 0) the list is
// empty.

View File

@ -4981,6 +4981,7 @@ func TestSanitize(t *testing.T) {
"ConnectProxyDefaultScriptCommand": [],
"ConnectSidecarMaxPort": 0,
"ConnectSidecarMinPort": 0,
"ConnectTestCALeafRootChangeSpread": "0s",
"ConnectReplicationToken": "hidden",
"ConnectTestDisableManagedProxies": false,
"ConsulCoordinateUpdateBatchSize": 0,

View File

@ -41,6 +41,7 @@ func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot {
// Create the private key we'll use for this CA cert.
signer, keyPEM := testPrivateKey(t)
result.SigningKey = keyPEM
result.SigningKeyID = HexString(testKeyID(t, signer.Public()))
// The serial number for the cert
sn, err := testSerialNumber()
@ -83,6 +84,9 @@ func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot {
if err != nil {
t.Fatalf("error generating CA ID fingerprint: %s", err)
}
result.SerialNumber = uint64(sn.Int64())
result.NotBefore = template.NotBefore.UTC()
result.NotAfter = template.NotAfter.UTC()
// If there is a prior CA to cross-sign with, then we need to create that
// and set it as the signing cert.
@ -269,12 +273,12 @@ func testPrivateKey(t testing.T) (crypto.Signer, string) {
return pk, buf.String()
}
// testSerialNumber generates a serial number suitable for a certificate.
// For testing, this just sets it to a random number.
//
// This function is taken directly from the Vault implementation.
// testSerialNumber generates a serial number suitable for a certificate. For
// testing, this just sets it to a random number, but one that can fit in a
// uint64 since we use that in our datastructures and assume cert serials will
// fit in that for now.
func testSerialNumber() (*big.Int, error) {
return rand.Int(rand.Reader, (&big.Int{}).Exp(big.NewInt(2), big.NewInt(159), nil))
return rand.Int(rand.Reader, (&big.Int{}).Exp(big.NewInt(2), big.NewInt(63), nil))
}
// testUUID generates a UUID for testing.

View File

@ -59,8 +59,9 @@ type CARoot struct {
// SerialNumber is the x509 serial number of the certificate.
SerialNumber uint64
// SigningKeyID is the ID of the public key that corresponds to the
// private key used to sign the certificate.
// SigningKeyID is the ID of the public key that corresponds to the private
// key used to sign the certificate. Is is the HexString format of the raw
// AuthorityKeyID bytes.
SigningKeyID string
// ExternalTrustDomain is the trust domain this root was generated under. It

View File

@ -379,6 +379,10 @@ func TestConfig(sources ...config.Source) *config.RuntimeConfig {
// Disable connect proxy execution since it causes all kinds of problems with
// self-executing tests etc.
cfg.ConnectTestDisableManagedProxies = true
// Effectively disables the delay after root rotation before requesting CSRs
// to make test deterministic. 0 results in default jitter being applied but a
// tiny delay is effectively thre same.
cfg.ConnectTestCALeafRootChangeSpread = 1 * time.Nanosecond
return &cfg
}