agent: remove agent cache dependency from service mesh leaf certificate management (#17075)

* agent: remove agent cache dependency from service mesh leaf certificate management

This extracts the leaf cert management from within the agent cache.

This code was produced by the following process:

1. All tests in agent/cache, agent/cache-types, agent/auto-config,
   agent/consul/servercert were run at each stage.

    - The tests in agent matching .*Leaf were run at each stage.

    - The tests in agent/leafcert were run at each stage after they
      existed.

2. The former leaf cert Fetch implementation was extracted into a new
   package behind a "fake RPC" endpoint to make it look almost like all
   other cache type internals.

3. The old cache type was shimmed to use the fake RPC endpoint and
   generally cleaned up.

4. I selectively duplicated all of Get/Notify/NotifyCallback/Prepopulate
   from the agent/cache.Cache implementation over into the new package.
   This was renamed as leafcert.Manager.

    - Code that was irrelevant to the leaf cert type was deleted
      (inlining blocking=true, refresh=false)

5. Everything that used the leaf cert cache type (including proxycfg
   stuff) was shifted to use the leafcert.Manager instead.

6. agent/cache-types tests were moved and gently replumbed to execute
   as-is against a leafcert.Manager.

7. Inspired by some of the locking changes from derek's branch I split
   the fat lock into N+1 locks.

8. The waiter chan struct{} was eventually replaced with a
   singleflight.Group around cache updates, which was likely the biggest
   net structural change.

9. The awkward two layers or logic produced as a byproduct of marrying
   the agent cache management code with the leaf cert type code was
   slowly coalesced and flattened to remove confusion.

10. The .*Leaf tests from the agent package were copied and made to work
    directly against a leafcert.Manager to increase direct coverage.

I have done a best effort attempt to port the previous leaf-cert cache
type's tests over in spirit, as well as to take the e2e-ish tests in the
agent package with Leaf in the test name and copy those into the
agent/leafcert package to get more direct coverage, rather than coverage
tangled up in the agent logic.

There is no net-new test coverage, just coverage that was pushed around
from elsewhere.
This commit is contained in:
R.B. Boyer 2023-06-13 10:54:45 -05:00 committed by GitHub
parent 0a1efe73f3
commit 72f991d8d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 3563 additions and 2124 deletions

3
.changelog/17075.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
agent: remove agent cache dependency from service mesh leaf certificate management
```

View File

@ -49,6 +49,7 @@ import (
grpcDNS "github.com/hashicorp/consul/agent/grpc-external/services/dns"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
@ -123,6 +124,7 @@ var configSourceToName = map[configSource]string{
ConfigSourceLocal: "local",
ConfigSourceRemote: "remote",
}
var configSourceFromName = map[string]configSource{
"local": ConfigSourceLocal,
"remote": ConfigSourceRemote,
@ -247,6 +249,9 @@ type Agent struct {
// cache is the in-memory cache for data the Agent requests.
cache *cache.Cache
// leafCertManager issues and caches leaf certs as needed.
leafCertManager *leafcert.Manager
// checkReapAfter maps the check ID to a timeout after which we should
// reap its associated service
checkReapAfter map[structs.CheckID]time.Duration
@ -428,6 +433,12 @@ type Agent struct {
// - create the AutoConfig object for future use in fully
// resolving the configuration
func New(bd BaseDeps) (*Agent, error) {
if bd.LeafCertManager == nil {
return nil, errors.New("LeafCertManager is required")
}
if bd.NetRPC == nil {
return nil, errors.New("NetRPC is required")
}
a := Agent{
checkReapAfter: make(map[structs.CheckID]time.Duration),
checkMonitors: make(map[structs.CheckID]*checks.CheckMonitor),
@ -454,6 +465,7 @@ func New(bd BaseDeps) (*Agent, error) {
tlsConfigurator: bd.TLSConfigurator,
config: bd.RuntimeConfig,
cache: bd.Cache,
leafCertManager: bd.LeafCertManager,
routineManager: routine.NewManager(bd.Logger),
scadaProvider: bd.HCP.Provider,
}
@ -497,6 +509,9 @@ func New(bd BaseDeps) (*Agent, error) {
},
}
// TODO(rb): remove this once NetRPC is properly available in BaseDeps without an Agent
bd.NetRPC.SetNetRPC(&a)
// We used to do this in the Start method. However it doesn't need to go
// there any longer. Originally it did because we passed the agent
// delegate to some of the cache registrations. Now we just
@ -674,7 +689,7 @@ func (a *Agent) Start(ctx context.Context) error {
Datacenter: a.config.Datacenter,
ACLsEnabled: a.config.ACLsEnabled,
},
Cache: a.cache,
LeafCertManager: a.leafCertManager,
GetStore: func() servercert.Store { return server.FSM().State() },
TLSConfigurator: a.tlsConfigurator,
}
@ -4354,13 +4369,6 @@ func (a *Agent) registerCache() {
a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{RPC: a})
a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
RPC: a,
Cache: a.cache,
Datacenter: a.config.Datacenter,
TestOverrideCAChangeInitialDelay: a.config.ConnectTestCALeafRootChangeSpread,
})
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a})
a.cache.RegisterType(cachetype.IntentionUpstreamsName, &cachetype.IntentionUpstreams{RPC: a})
@ -4521,7 +4529,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache),
IntentionUpstreamsDestination: proxycfgglue.CacheIntentionUpstreamsDestination(a.cache),
InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache),
LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache),
LeafCertificate: proxycfgglue.LocalLeafCerts(a.leafCertManager),
PeeredUpstreams: proxycfgglue.CachePeeredUpstreams(a.cache),
PeeringList: proxycfgglue.CachePeeringList(a.cache),
PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache),

View File

@ -24,6 +24,7 @@ import (
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/debug"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
token_store "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
@ -1569,7 +1570,7 @@ func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *htt
// TODO(peering): expose way to get kind=mesh-gateway type cert with appropriate ACLs
args := cachetype.ConnectCALeafRequest{
args := leafcert.ConnectCALeafRequest{
Service: serviceName, // Need name not ID
}
var qOpts structs.QueryOptions
@ -1598,17 +1599,13 @@ func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *htt
return nil, nil
}
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args)
reply, m, err := s.agent.leafCertManager.Get(req.Context(), &args)
if err != nil {
return nil, err
}
defer setCacheMeta(resp, &m)
reply, ok := raw.(*structs.IssuedCert)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
setIndex(resp, reply.ModifyIndex)
return reply, nil

View File

@ -6912,14 +6912,27 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
require.Equal(t, issued, issued2)
}
replyCh := make(chan *httptest.ResponseRecorder, 1)
go func(index string) {
resp := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/leaf/test?index="+index, nil)
a.srv.h.ServeHTTP(resp, req)
replyCh <- resp
}(index)
// Set a new CA
ca2 := connect.TestCAConfigSet(t, a, nil)
// Issue a blocking query to ensure that the cert gets updated appropriately
t.Run("test blocking queries update leaf cert", func(t *testing.T) {
resp := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/leaf/test?index="+index, nil)
a.srv.h.ServeHTTP(resp, req)
var resp *httptest.ResponseRecorder
select {
case resp = <-replyCh:
case <-time.After(500 * time.Millisecond):
t.Fatal("blocking query did not wake up during rotation")
}
dec := json.NewDecoder(resp.Body)
issued2 := &structs.IssuedCert{}
require.NoError(t, dec.Decode(issued2))

View File

@ -52,6 +52,7 @@ import (
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
@ -329,8 +330,15 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
HTTPMaxHeaderBytes: tt.maxHeaderBytes,
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})
cfg := config.RuntimeConfig{BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC)}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)
@ -5444,8 +5452,15 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})
cfg := config.RuntimeConfig{BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC)}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)
@ -6030,8 +6045,15 @@ func TestAgent_startListeners(t *testing.T) {
HTTPAddrs: []net.Addr{},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})
bd, err := initEnterpriseBaseDeps(bd, &config.RuntimeConfig{})
require.NoError(t, err)
@ -6161,8 +6183,15 @@ func TestAgent_startListeners_scada(t *testing.T) {
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})
cfg := config.RuntimeConfig{BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC)}
bd, err := initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)
@ -6214,7 +6243,13 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})
agent, err := New(bd)
require.NoError(t, err)

View File

@ -21,6 +21,7 @@ import (
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
@ -566,9 +567,8 @@ func TestGoRoutineManagement(t *testing.T) {
})
leafReq := ac.leafCertRequest()
mcfg.cache.On("Notify",
mcfg.leafCerts.On("Notify",
mock.Anything,
cachetype.ConnectCALeafName,
&leafReq,
leafWatchID,
mock.Anything,
@ -717,10 +717,9 @@ func startedAutoConfig(t *testing.T, autoEncrypt bool) testAutoConfig {
mock.Anything,
).Return(nil).Once()
mcfg.cache.On("Notify",
mcfg.leafCerts.On("Notify",
mock.Anything,
cachetype.ConnectCALeafName,
&cachetype.ConnectCALeafRequest{
&leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Agent: "autoconf",
Token: originalToken,
@ -875,10 +874,9 @@ func TestTokenUpdate(t *testing.T) {
})
leafCtx, leafCancel := context.WithCancel(context.Background())
testAC.mcfg.cache.On("Notify",
testAC.mcfg.leafCerts.On("Notify",
mock.Anything,
cachetype.ConnectCALeafName,
&cachetype.ConnectCALeafRequest{
&leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Agent: "autoconf",
Token: newToken,
@ -975,14 +973,14 @@ func TestCertUpdate(t *testing.T) {
NotAfter: secondCert.ValidBefore,
}).Once()
req := cachetype.ConnectCALeafRequest{
req := leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Agent: "autoconf",
Token: testAC.originalToken,
DNSSAN: defaultDNSSANs,
IPSAN: defaultIPSANs,
}
require.True(t, testAC.mcfg.cache.sendNotification(context.Background(), req.CacheInfo().Key, cache.UpdateEvent{
require.True(t, testAC.mcfg.leafCerts.sendNotification(context.Background(), req.Key(), cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: secondCert,
Meta: cache.ResultMeta{
@ -1102,14 +1100,14 @@ func TestFallback(t *testing.T) {
// now that all the mocks are set up we can trigger the whole thing by sending the second expired cert
// as a cache update event.
req := cachetype.ConnectCALeafRequest{
req := leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Agent: "autoconf",
Token: testAC.originalToken,
DNSSAN: defaultDNSSANs,
IPSAN: defaultIPSANs,
}
require.True(t, testAC.mcfg.cache.sendNotification(context.Background(), req.CacheInfo().Key, cache.UpdateEvent{
require.True(t, testAC.mcfg.leafCerts.sendNotification(context.Background(), req.Key(), cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: secondCert,
Meta: cache.ResultMeta{

View File

@ -20,6 +20,7 @@ import (
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
@ -347,10 +348,9 @@ func TestAutoEncrypt_TokenUpdate(t *testing.T) {
})
leafCtx, leafCancel := context.WithCancel(context.Background())
testAC.mcfg.cache.On("Notify",
testAC.mcfg.leafCerts.On("Notify",
mock.Anything,
cachetype.ConnectCALeafName,
&cachetype.ConnectCALeafRequest{
&leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Agent: "autoconf",
Token: newToken,
@ -430,14 +430,14 @@ func TestAutoEncrypt_CertUpdate(t *testing.T) {
NotAfter: secondCert.ValidBefore,
}).Once()
req := cachetype.ConnectCALeafRequest{
req := leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Agent: "autoconf",
Token: testAC.originalToken,
DNSSAN: defaultDNSSANs,
IPSAN: defaultIPSANs,
}
require.True(t, testAC.mcfg.cache.sendNotification(context.Background(), req.CacheInfo().Key, cache.UpdateEvent{
require.True(t, testAC.mcfg.leafCerts.sendNotification(context.Background(), req.Key(), cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: secondCert,
Meta: cache.ResultMeta{
@ -538,14 +538,14 @@ func TestAutoEncrypt_Fallback(t *testing.T) {
// now that all the mocks are set up we can trigger the whole thing by sending the second expired cert
// as a cache update event.
req := cachetype.ConnectCALeafRequest{
req := leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Agent: "autoconf",
Token: testAC.originalToken,
DNSSAN: defaultDNSSANs,
IPSAN: defaultIPSANs,
}
require.True(t, testAC.mcfg.cache.sendNotification(context.Background(), req.CacheInfo().Key, cache.UpdateEvent{
require.True(t, testAC.mcfg.leafCerts.sendNotification(context.Background(), req.Key(), cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: secondCert,
Meta: cache.ResultMeta{

View File

@ -13,7 +13,9 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib/retry"
)
@ -33,6 +35,19 @@ type Cache interface {
Prepopulate(t string, result cache.FetchResult, dc string, peerName string, token string, key string) error
}
// LeafCertManager is an interface to represent the methods of the
// agent/leafcert.Manager struct that we care about
type LeafCertManager interface {
Prepopulate(
ctx context.Context,
key string,
index uint64,
value *structs.IssuedCert,
authorityKeyID string,
) error
Notify(ctx context.Context, req *leafcert.ConnectCALeafRequest, correlationID string, ch chan<- cache.UpdateEvent) error
}
// ServerProvider is an interface that can be used to find one server in the local DC known to
// the agent via Gossip
type ServerProvider interface {
@ -92,9 +107,12 @@ type Config struct {
TLSConfigurator TLSConfigurator
// Cache is an object implementing our Cache interface. The Cache
// used at runtime must be able to handle Roots and Leaf Cert watches
// used at runtime must be able to handle Roots watches
Cache Cache
// LeafCertManager is an object implementing our LeafCertManager interface.
LeafCertManager LeafCertManager
// FallbackLeeway is the amount of time after certificate expiration before
// invoking the fallback routine. If not set this will default to 10s.
FallbackLeeway time.Duration

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
@ -112,6 +113,85 @@ type mockWatcher struct {
done <-chan struct{}
}
type mockLeafCerts struct {
mock.Mock
lock sync.Mutex
watchers map[string][]mockWatcher
}
var _ LeafCertManager = (*mockLeafCerts)(nil)
func newMockLeafCerts(t *testing.T) *mockLeafCerts {
m := mockLeafCerts{
watchers: make(map[string][]mockWatcher),
}
m.Test(t)
return &m
}
func (m *mockLeafCerts) Notify(ctx context.Context, req *leafcert.ConnectCALeafRequest, correlationID string, ch chan<- cache.UpdateEvent) error {
ret := m.Called(ctx, req, correlationID, ch)
err := ret.Error(0)
if err == nil {
m.lock.Lock()
key := req.Key()
m.watchers[key] = append(m.watchers[key], mockWatcher{ch: ch, done: ctx.Done()})
m.lock.Unlock()
}
return err
}
func (m *mockLeafCerts) Prepopulate(
ctx context.Context,
key string,
index uint64,
value *structs.IssuedCert,
authorityKeyID string,
) error {
// we cannot know what the private key is prior to it being injected into the cache.
// therefore redact it here and all mock expectations should take that into account
restore := value.PrivateKeyPEM
value.PrivateKeyPEM = "redacted"
ret := m.Called(ctx, key, index, value, authorityKeyID)
if restore != "" {
value.PrivateKeyPEM = restore
}
return ret.Error(0)
}
func (m *mockLeafCerts) sendNotification(ctx context.Context, key string, u cache.UpdateEvent) bool {
m.lock.Lock()
defer m.lock.Unlock()
watchers, ok := m.watchers[key]
if !ok || len(m.watchers) < 1 {
return false
}
var newWatchers []mockWatcher
for _, watcher := range watchers {
select {
case watcher.ch <- u:
newWatchers = append(newWatchers, watcher)
case <-watcher.done:
// do nothing, this watcher will be removed from the list
case <-ctx.Done():
// return doesn't matter here really, the test is being cancelled
return true
}
}
// this removes any already cancelled watches from being sent to
m.watchers[key] = newWatchers
return true
}
type mockCache struct {
mock.Mock
@ -223,6 +303,7 @@ type mockedConfig struct {
directRPC *mockDirectRPC
serverProvider *mockServerProvider
cache *mockCache
leafCerts *mockLeafCerts
tokens *mockTokenStore
tlsCfg *mockTLSConfigurator
enterpriseConfig *mockedEnterpriseConfig
@ -233,6 +314,7 @@ func newMockedConfig(t *testing.T) *mockedConfig {
directRPC := newMockDirectRPC(t)
serverProvider := newMockServerProvider(t)
mcache := newMockCache(t)
mleafs := newMockLeafCerts(t)
tokens := newMockTokenStore(t)
tlsCfg := newMockTLSConfigurator(t)
@ -246,6 +328,7 @@ func newMockedConfig(t *testing.T) *mockedConfig {
if !t.Failed() {
directRPC.AssertExpectations(t)
serverProvider.AssertExpectations(t)
mleafs.AssertExpectations(t)
mcache.AssertExpectations(t)
tokens.AssertExpectations(t)
tlsCfg.AssertExpectations(t)
@ -258,6 +341,7 @@ func newMockedConfig(t *testing.T) *mockedConfig {
DirectRPC: directRPC,
ServerProvider: serverProvider,
Cache: mcache,
LeafCertManager: mleafs,
Tokens: tokens,
TLSConfigurator: tlsCfg,
Logger: testutil.Logger(t),
@ -267,6 +351,7 @@ func newMockedConfig(t *testing.T) *mockedConfig {
directRPC: directRPC,
serverProvider: serverProvider,
cache: mcache,
leafCerts: mleafs,
tokens: tokens,
tlsCfg: tlsCfg,
@ -311,7 +396,7 @@ func (m *mockedConfig) expectInitialTLS(t *testing.T, agentName, datacenter, tok
rootsReq.CacheInfo().Key,
).Return(nil).Once()
leafReq := cachetype.ConnectCALeafRequest{
leafReq := leafcert.ConnectCALeafRequest{
Token: token,
Agent: agentName,
Datacenter: datacenter,
@ -323,24 +408,18 @@ func (m *mockedConfig) expectInitialTLS(t *testing.T, agentName, datacenter, tok
// on up with the request.
copy := *cert
copy.PrivateKeyPEM = "redacted"
leafRes := cache.FetchResult{
Value: &copy,
Index: copy.RaftIndex.ModifyIndex,
State: cachetype.ConnectCALeafSuccess(ca.SigningKeyID),
}
// we should prepopulate the cache with the agents cert
m.cache.On("Prepopulate",
cachetype.ConnectCALeafName,
leafRes,
datacenter,
"",
token,
m.leafCerts.On("Prepopulate",
mock.Anything,
leafReq.Key(),
copy.RaftIndex.ModifyIndex,
&copy,
ca.SigningKeyID,
).Return(nil).Once()
// when prepopulating the cert in the cache we grab the token so
// we should expec that here
// we should expect that here
m.tokens.On("AgentToken").Return(token).Once()
}

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbautoconf"
"github.com/hashicorp/consul/proto/private/pbconnect"
@ -106,12 +107,14 @@ func (ac *AutoConfig) populateCertificateCache(certs *structs.SignedResponse) er
leafReq := ac.leafCertRequest()
// prepolutate leaf cache
certRes := cache.FetchResult{
Value: &certs.IssuedCert,
Index: certs.IssuedCert.RaftIndex.ModifyIndex,
State: cachetype.ConnectCALeafSuccess(connect.EncodeSigningKeyID(cert.AuthorityKeyId)),
}
if err := ac.acConfig.Cache.Prepopulate(cachetype.ConnectCALeafName, certRes, leafReq.Datacenter, structs.DefaultPeerKeyword, leafReq.Token, leafReq.Key()); err != nil {
err = ac.acConfig.LeafCertManager.Prepopulate(
context.Background(),
leafReq.Key(),
certs.IssuedCert.RaftIndex.ModifyIndex,
&certs.IssuedCert,
connect.EncodeSigningKeyID(cert.AuthorityKeyId),
)
if err != nil {
return err
}
@ -129,7 +132,7 @@ func (ac *AutoConfig) setupCertificateCacheWatches(ctx context.Context) (context
}
leafReq := ac.leafCertRequest()
err = ac.acConfig.Cache.Notify(notificationCtx, cachetype.ConnectCALeafName, &leafReq, leafWatchID, ac.cacheUpdates)
err = ac.acConfig.LeafCertManager.Notify(notificationCtx, &leafReq, leafWatchID, ac.cacheUpdates)
if err != nil {
cancel()
return nil, err
@ -194,8 +197,8 @@ func (ac *AutoConfig) caRootsRequest() structs.DCSpecificRequest {
return structs.DCSpecificRequest{Datacenter: ac.config.Datacenter}
}
func (ac *AutoConfig) leafCertRequest() cachetype.ConnectCALeafRequest {
return cachetype.ConnectCALeafRequest{
func (ac *AutoConfig) leafCertRequest() leafcert.ConnectCALeafRequest {
return leafcert.ConnectCALeafRequest{
Datacenter: ac.config.Datacenter,
Agent: ac.config.NodeName,
DNSSAN: ac.getDNSSANs(),

View File

@ -1,774 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package cachetype
import (
"context"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/mitchellh/hashstructure"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const ConnectCALeafName = "connect-ca-leaf"
// caChangeJitterWindow is the time over which we spread each round of retries
// when attempting to get a new certificate following a root rotation. It's
// selected to be a trade-off between not making rotation unnecessarily slow on
// a tiny cluster while not hammering the servers on a huge cluster
// unnecessarily hard. Servers rate limit to protect themselves from the
// expensive crypto work, but in practice have 10k+ RPCs all in the same second
// will cause a major disruption even on large servers due to downloading the
// payloads, parsing msgpack etc. Instead we pick a window that for now is fixed
// but later might be either user configurable (not nice since it would become
// another hard-to-tune value) or set dynamically by the server based on it's
// knowledge of how many certs need to be rotated. Currently the server doesn't
// know that so we pick something that is reasonable. We err on the side of
// being slower that we need in trivial cases but gentler for large deployments.
// 30s means that even with a cluster of 10k service instances, the server only
// has to cope with ~333 RPCs a second which shouldn't be too bad if it's rate
// limiting the actual expensive crypto work.
//
// The actual backoff strategy when we are rate limited is to have each cert
// only retry once with each window of this size, at a point in the window
// selected at random. This performs much better than exponential backoff in
// terms of getting things rotated quickly with more predictable load and so
// fewer rate limited requests. See the full simulation this is based on at
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md for
// more detail.
const caChangeJitterWindow = 30 * time.Second
// ConnectCALeaf supports fetching and generating Connect leaf
// certificates.
type ConnectCALeaf struct {
RegisterOptionsBlockingNoRefresh
caIndex uint64 // Current index for CA roots
// rootWatchMu protects access to the rootWatchSubscribers map and
// rootWatchCancel
rootWatchMu sync.Mutex
// rootWatchSubscribers is a set of chans, one for each currently in-flight
// Fetch. These chans have root updates delivered from the root watcher.
rootWatchSubscribers map[chan struct{}]struct{}
// rootWatchCancel is a func to call to stop the background root watch if any.
// You must hold inflightMu to read (e.g. call) or write the value.
rootWatchCancel func()
// testRootWatchStart/StopCount are testing helpers that allow tests to
// observe the reference counting behavior that governs the shared root watch.
// It's not exactly pretty to expose internals like this, but seems cleaner
// than constructing elaborate and brittle test cases that we can infer
// correct behavior from, and simpler than trying to probe runtime goroutine
// traces to infer correct behavior that way. They must be accessed
// atomically.
testRootWatchStartCount uint32
testRootWatchStopCount uint32
RPC RPC // RPC client for remote requests
Cache *cache.Cache // Cache that has CA root certs via ConnectCARoot
Datacenter string // This agent's datacenter
// TestOverrideCAChangeInitialDelay allows overriding the random jitter after a
// root change with a fixed delay. So far ths is only done in tests. If it's
// zero the caChangeInitialSpreadDefault maximum jitter will be used but if
// set, it overrides and provides a fixed delay. To essentially disable the
// delay in tests they can set it to 1 nanosecond. We may separately allow
// configuring the jitter limit by users later but this is different and for
// tests only since we need to set a deterministic time delay in order to test
// the behavior here fully and determinstically.
TestOverrideCAChangeInitialDelay time.Duration
}
// fetchState is some additional metadata we store with each cert in the cache
// to track things like expiry and coordinate paces root rotations. It's
// important this doesn't contain any pointer types since we rely on the struct
// being copied to avoid modifying the actual state in the cache entry during
// Fetch. Pointers themselves are OK, but if we point to another struct that we
// call a method or modify in some way that would directly mutate the cache and
// cause problems. We'd need to deep-clone in that case in Fetch below.
// time.Time technically contains a pointer to the Location but we ignore that
// since all times we get from our wall clock should point to the same Location
// anyway.
type fetchState struct {
// authorityKeyId is the ID of the CA key (whether root or intermediate) that signed
// the current cert. This is just to save parsing the whole cert everytime
// we have to check if the root changed.
authorityKeyID string
// forceExpireAfter is used to coordinate renewing certs after a CA rotation
// in a staggered way so that we don't overwhelm the servers.
forceExpireAfter time.Time
// activeRootRotationStart is set when the root has changed and we need to get
// a new cert but haven't got one yet. forceExpireAfter will be set to the
// next scheduled time we should try our CSR, but this is needed to calculate
// the retry windows if we are rate limited when we try. See comment on
// caChangeJitterWindow above for more.
activeRootRotationStart time.Time
// consecutiveRateLimitErrs stores how many rate limit errors we've hit. We
// use this to choose a new window for the next retry. See comment on
// caChangeJitterWindow above for more.
consecutiveRateLimitErrs int
}
func ConnectCALeafSuccess(authorityKeyID string) interface{} {
return fetchState{
authorityKeyID: authorityKeyID,
forceExpireAfter: time.Time{},
consecutiveRateLimitErrs: 0,
activeRootRotationStart: time.Time{},
}
}
// fetchStart is called on each fetch that is about to block and wait for
// changes to the leaf. It subscribes a chan to receive updates from the shared
// root watcher and triggers root watcher if it's not already running.
func (c *ConnectCALeaf) fetchStart(rootUpdateCh chan struct{}) {
c.rootWatchMu.Lock()
defer c.rootWatchMu.Unlock()
// Lazy allocation
if c.rootWatchSubscribers == nil {
c.rootWatchSubscribers = make(map[chan struct{}]struct{})
}
// Make sure a root watcher is running. We don't only do this on first request
// to be more tolerant of errors that could cause the root watcher to fail and
// exit.
if c.rootWatchCancel == nil {
ctx, cancel := context.WithCancel(context.Background())
c.rootWatchCancel = cancel
go c.rootWatcher(ctx)
}
c.rootWatchSubscribers[rootUpdateCh] = struct{}{}
}
// fetchDone is called when a blocking call exits to unsubscribe from root
// updates and possibly stop the shared root watcher if it's no longer needed.
// Note that typically root CA is still being watched by clients directly and
// probably by the ProxyConfigManager so it will stay hot in cache for a while,
// we are just not monitoring it for updates any more.
func (c *ConnectCALeaf) fetchDone(rootUpdateCh chan struct{}) {
c.rootWatchMu.Lock()
defer c.rootWatchMu.Unlock()
delete(c.rootWatchSubscribers, rootUpdateCh)
if len(c.rootWatchSubscribers) == 0 && c.rootWatchCancel != nil {
// This was the last request. Stop the root watcher.
c.rootWatchCancel()
c.rootWatchCancel = nil
}
}
// rootWatcher is the shared rootWatcher that runs in a background goroutine
// while needed by one or more inflight Fetch calls.
func (c *ConnectCALeaf) rootWatcher(ctx context.Context) {
atomic.AddUint32(&c.testRootWatchStartCount, 1)
defer atomic.AddUint32(&c.testRootWatchStopCount, 1)
ch := make(chan cache.UpdateEvent, 1)
err := c.Cache.Notify(ctx, ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: c.Datacenter,
}, "roots", ch)
notifyChange := func() {
c.rootWatchMu.Lock()
defer c.rootWatchMu.Unlock()
for ch := range c.rootWatchSubscribers {
select {
case ch <- struct{}{}:
default:
// Don't block - chans are 1-buffered so act as an edge trigger and
// reload CA state directly from cache so they never "miss" updates.
}
}
}
if err != nil {
// Trigger all inflight watchers. We don't pass the error, but they will
// reload from cache and observe the same error and return it to the caller,
// or if it's transient, will continue and the next Fetch will get us back
// into the right state. Seems better than busy loop-retrying here given
// that almost any error we would see here would also be returned from the
// cache get this will trigger.
notifyChange()
return
}
var oldRoots *structs.IndexedCARoots
// Wait for updates to roots or all requests to stop
for {
select {
case <-ctx.Done():
return
case e := <-ch:
// Root response changed in some way. Note this might be the initial
// fetch.
if e.Err != nil {
// See above rationale about the error propagation
notifyChange()
continue
}
roots, ok := e.Result.(*structs.IndexedCARoots)
if !ok {
// See above rationale about the error propagation
notifyChange()
continue
}
// Check that the active root is actually different from the last CA
// config there are many reasons the config might have changed without
// actually updating the CA root that is signing certs in the cluster.
// The Fetch calls will also validate this since the first call here we
// don't know if it changed or not, but there is no point waking up all
// Fetch calls to check this if we know none of them will need to act on
// this update.
if oldRoots != nil && oldRoots.ActiveRootID == roots.ActiveRootID {
continue
}
// Distribute the update to all inflight requests - they will decide
// whether or not they need to act on it.
notifyChange()
oldRoots = roots
}
}
}
// calculateSoftExpiry encapsulates our logic for when to renew a cert based on
// it's age. It returns a pair of times min, max which makes it easier to test
// the logic without non-deterministic jitter to account for. The caller should
// choose a time randomly in between these.
//
// We want to balance a few factors here:
// - renew too early and it increases the aggregate CSR rate in the cluster
// - renew too late and it risks disruption to the service if a transient
// error prevents the renewal
// - we want a broad amount of jitter so if there is an outage, we don't end
// up with all services in sync and causing a thundering herd every
// renewal period. Broader is better for smoothing requests but pushes
// both earlier and later tradeoffs above.
//
// Somewhat arbitrarily the current strategy looks like this:
//
// 0 60% 90%
// Issued [------------------------------|===============|!!!!!] Expires
// 72h TTL: 0 ~43h ~65h
// 1h TTL: 0 36m 54m
//
// Where |===| is the soft renewal period where we jitter for the first attempt
// and |!!!| is the danger zone where we just try immediately.
//
// In the happy path (no outages) the average renewal occurs half way through
// the soft renewal region or at 75% of the cert lifetime which is ~54 hours for
// a 72 hour cert, or 45 mins for a 1 hour cert.
//
// If we are already in the softRenewal period, we randomly pick a time between
// now and the start of the danger zone.
//
// We pass in now to make testing easier.
func calculateSoftExpiry(now time.Time, cert *structs.IssuedCert) (min time.Time, max time.Time) {
certLifetime := cert.ValidBefore.Sub(cert.ValidAfter)
if certLifetime < 10*time.Minute {
// Shouldn't happen as we limit to 1 hour shortest elsewhere but just be
// defensive against strange times or bugs.
return now, now
}
// Find the 60% mark in diagram above
softRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.6))
hardRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.9))
if now.After(hardRenewTime) {
// In the hard renew period, or already expired. Renew now!
return now, now
}
if now.After(softRenewTime) {
// Already in the soft renew period, make now the lower bound for jitter
softRenewTime = now
}
return softRenewTime, hardRenewTime
}
func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// Get the correct type
reqReal, ok := req.(*ConnectCALeafRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup
// Do we already have a cert in the cache?
var existing *structs.IssuedCert
// Really important this is not a pointer type since otherwise we would set it
// to point to the actual fetchState in the cache entry below and then would
// be directly modifying that in the cache entry even when we might later
// return an error and not update index etc. By being a value, we force a copy
var state fetchState
if opts.LastResult != nil {
existing, ok = opts.LastResult.Value.(*structs.IssuedCert)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last value wrong type: %T", opts.LastResult.Value)
}
if opts.LastResult.State != nil {
state, ok = opts.LastResult.State.(fetchState)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: last state wrong type: %T", opts.LastResult.State)
}
}
}
// Handle brand new request first as it's simplest.
if existing == nil {
return c.generateNewLeaf(reqReal, result)
}
// Setup result to mirror the current value for if we timeout or hit a rate
// limit. This allows us to update the state (e.g. for backoff or retry
// coordination on root change) even if we don't get a new cert.
result.Value = existing
result.Index = existing.ModifyIndex
result.State = state
// Since state is not a pointer, we can't just set it once in result and then
// continue to update it later since we will be updating only our copy.
// Instead we have a helper function that is used to make sure the state is
// updated in the result when we return.
lastResultWithNewState := func() cache.FetchResult {
return cache.FetchResult{
Value: existing,
Index: existing.ModifyIndex,
State: state,
}
}
// Beyond this point we need to only return lastResultWithNewState() not just
// result since otherwise we might "loose" state updates we expect not to.
// We have a certificate in cache already. Check it's still valid.
now := time.Now()
minExpire, maxExpire := calculateSoftExpiry(now, existing)
expiresAt := minExpire.Add(lib.RandomStagger(maxExpire.Sub(minExpire)))
// Check if we have been force-expired by a root update that jittered beyond
// the timeout of the query it was running.
if !state.forceExpireAfter.IsZero() && state.forceExpireAfter.Before(expiresAt) {
expiresAt = state.forceExpireAfter
}
if expiresAt.Equal(now) || expiresAt.Before(now) {
// Already expired, just make a new one right away
return c.generateNewLeaf(reqReal, lastResultWithNewState())
}
// If we called Fetch() with MustRevalidate then this call came from a non-blocking query.
// Any prior CA rotations should've already expired the cert.
// All we need to do is check whether the current CA is the one that signed the leaf. If not, generate a new leaf.
// This is not a perfect solution (as a CA rotation update can be missed) but it should take care of instances like
// see https://github.com/hashicorp/consul/issues/10871, https://github.com/hashicorp/consul/issues/9862
// This seems to me like a hack, so maybe we can revisit the caching/ fetching logic in this case
if req.CacheInfo().MustRevalidate {
roots, err := c.rootsFromCache()
if err != nil {
return lastResultWithNewState(), err
}
if activeRootHasKey(roots, state.authorityKeyID) {
return lastResultWithNewState(), nil
}
// if we reach here then the current leaf was not signed by the same CAs, just regen
return c.generateNewLeaf(reqReal, lastResultWithNewState())
}
// We are about to block and wait for a change or timeout.
// Make a chan we can be notified of changes to CA roots on. It must be
// buffered so we don't miss broadcasts from rootsWatch. It is an edge trigger
// so a single buffer element is sufficient regardless of whether we consume
// the updates fast enough since as soon as we see an element in it, we will
// reload latest CA from cache.
rootUpdateCh := make(chan struct{}, 1)
// The roots may have changed in between blocking calls. We need to verify
// that the existing cert was signed by the current root. If it was we still
// want to do the whole jitter thing. We could code that again here but it's
// identical to the select case below so we just trigger our own update chan
// and let the logic below handle checking if the CA actually changed in the
// common case where it didn't it is a no-op anyway.
rootUpdateCh <- struct{}{}
// Subscribe our chan to get root update notification.
c.fetchStart(rootUpdateCh)
defer c.fetchDone(rootUpdateCh)
// Setup the timeout chan outside the loop so we don't keep bumping the timeout
// later if we loop around.
timeoutCh := time.After(opts.Timeout)
// Setup initial expiry chan. We may change this if root update occurs in the
// loop below.
expiresCh := time.After(expiresAt.Sub(now))
// Current cert is valid so just wait until it expires or we time out.
for {
select {
case <-timeoutCh:
// We timed out the request with same cert.
return lastResultWithNewState(), nil
case <-expiresCh:
// Cert expired or was force-expired by a root change.
return c.generateNewLeaf(reqReal, lastResultWithNewState())
case <-rootUpdateCh:
// A root cache change occurred, reload roots from cache.
roots, err := c.rootsFromCache()
if err != nil {
return lastResultWithNewState(), err
}
// Handle _possibly_ changed roots. We still need to verify the new active
// root is not the same as the one our current cert was signed by since we
// can be notified spuriously if we are the first request since the
// rootsWatcher didn't know about the CA we were signed by. We also rely
// on this on every request to do the initial check that the current roots
// are the same ones the current cert was signed by.
if activeRootHasKey(roots, state.authorityKeyID) {
// Current active CA is the same one that signed our current cert so
// keep waiting for a change.
continue
}
state.activeRootRotationStart = time.Now()
// CA root changed. We add some jitter here to avoid a thundering herd.
// See docs on caChangeJitterWindow const.
delay := lib.RandomStagger(caChangeJitterWindow)
if c.TestOverrideCAChangeInitialDelay > 0 {
delay = c.TestOverrideCAChangeInitialDelay
}
// Force the cert to be expired after the jitter - the delay above might
// be longer than we have left on our timeout. We set forceExpireAfter in
// the cache state so the next request will notice we still need to renew
// and do it at the right time. This is cleared once a new cert is
// returned by generateNewLeaf.
state.forceExpireAfter = state.activeRootRotationStart.Add(delay)
// If the delay time is within the current timeout, we want to renew the
// as soon as it's up. We change the expire time and chan so that when we
// loop back around, we'll wait at most delay until generating a new cert.
if state.forceExpireAfter.Before(expiresAt) {
expiresAt = state.forceExpireAfter
expiresCh = time.After(delay)
}
continue
}
}
}
func activeRootHasKey(roots *structs.IndexedCARoots, currentSigningKeyID string) bool {
for _, ca := range roots.Roots {
if ca.Active {
return ca.SigningKeyID == currentSigningKeyID
}
}
// Shouldn't be possible since at least one root should be active.
return false
}
func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) {
// Background is fine here because this isn't a blocking query as no index is set.
// Therefore this will just either be a cache hit or return once the non-blocking query returns.
rawRoots, _, err := c.Cache.Get(context.Background(), ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: c.Datacenter,
})
if err != nil {
return nil, err
}
roots, ok := rawRoots.(*structs.IndexedCARoots)
if !ok {
return nil, errors.New("invalid RootCA response type")
}
return roots, nil
}
// generateNewLeaf does the actual work of creating a new private key,
// generating a CSR and getting it signed by the servers. result argument
// represents the last result currently in cache if any along with its state.
func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest,
result cache.FetchResult) (cache.FetchResult, error) {
var state fetchState
if result.State != nil {
var ok bool
state, ok = result.State.(fetchState)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: result state wrong type: %T", result.State)
}
}
// Need to lookup RootCAs response to discover trust domain. This should be a
// cache hit.
roots, err := c.rootsFromCache()
if err != nil {
return result, err
}
if roots.TrustDomain == "" {
return result, errors.New("cluster has no CA bootstrapped yet")
}
// Build the cert uri
var id connect.CertURI
var dnsNames []string
var ipAddresses []net.IP
switch {
case req.Service != "":
id = &connect.SpiffeIDService{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
Partition: req.TargetPartition(),
Namespace: req.TargetNamespace(),
Service: req.Service,
}
dnsNames = append(dnsNames, req.DNSSAN...)
case req.Agent != "":
id = &connect.SpiffeIDAgent{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
Partition: req.TargetPartition(),
Agent: req.Agent,
}
dnsNames = append([]string{"localhost"}, req.DNSSAN...)
ipAddresses = append([]net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")}, req.IPSAN...)
case req.Kind == structs.ServiceKindMeshGateway:
id = &connect.SpiffeIDMeshGateway{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
Partition: req.TargetPartition(),
}
dnsNames = append(dnsNames, req.DNSSAN...)
case req.Kind != "":
return result, fmt.Errorf("unsupported kind: %s", req.Kind)
case req.Server:
if req.Datacenter == "" {
return result, errors.New("datacenter name must be specified")
}
id = &connect.SpiffeIDServer{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
}
dnsNames = append(dnsNames, connect.PeeringServerSAN(req.Datacenter, roots.TrustDomain))
default:
return result, errors.New("URI must be either service, agent, server, or kind")
}
// Create a new private key
// TODO: for now we always generate EC keys on clients regardless of the key
// type being used by the active CA. This is fine and allowed in TLS1.2 and
// signing EC CSRs with an RSA key is supported by all current CA providers so
// it's OK. IFF we ever need to support a CA provider that refuses to sign a
// CSR with a different signature algorithm, or if we have compatibility
// issues with external PKI systems that require EC certs be signed with ECDSA
// from the CA (this was required in TLS1.1 but not in 1.2) then we can
// instead intelligently pick the key type we generate here based on the key
// type of the active signing CA. We already have that loaded since we need
// the trust domain.
pk, pkPEM, err := connect.GeneratePrivateKey()
if err != nil {
return result, err
}
// Create a CSR.
csr, err := connect.CreateCSR(id, pk, dnsNames, ipAddresses)
if err != nil {
return result, err
}
// Request signing
var reply structs.IssuedCert
args := structs.CASignRequest{
WriteRequest: structs.WriteRequest{Token: req.Token},
Datacenter: req.Datacenter,
CSR: csr,
}
if err := c.RPC.RPC(context.Background(), "ConnectCA.Sign", &args, &reply); err != nil {
if err.Error() == consul.ErrRateLimited.Error() {
if result.Value == nil {
// This was a first fetch - we have no good value in cache. In this case
// we just return the error to the caller rather than rely on surprising
// semi-blocking until the rate limit is appeased or we timeout
// behavior. It's likely the caller isn't expecting this to block since
// it's an initial fetch. This also massively simplifies this edge case.
return result, err
}
if state.activeRootRotationStart.IsZero() {
// We hit a rate limit error by chance - for example a cert expired
// before the root rotation was observed (not triggered by rotation) but
// while server is working through high load from a recent rotation.
// Just pretend there is a rotation and the retry logic here will start
// jittering and retrying in the same way from now.
state.activeRootRotationStart = time.Now()
}
// Increment the errors in the state
state.consecutiveRateLimitErrs++
delay := lib.RandomStagger(caChangeJitterWindow)
if c.TestOverrideCAChangeInitialDelay > 0 {
delay = c.TestOverrideCAChangeInitialDelay
}
// Find the start of the next window we can retry in. See comment on
// caChangeJitterWindow for details of why we use this strategy.
windowStart := state.activeRootRotationStart.Add(
time.Duration(state.consecutiveRateLimitErrs) * delay)
// Pick a random time in that window
state.forceExpireAfter = windowStart.Add(delay)
// Return a result with the existing cert but the new state - the cache
// will see this as no change. Note that we always have an existing result
// here due to the nil value check above.
result.State = state
return result, nil
}
return result, err
}
reply.PrivateKeyPEM = pkPEM
// Reset rotation state
state.forceExpireAfter = time.Time{}
state.consecutiveRateLimitErrs = 0
state.activeRootRotationStart = time.Time{}
cert, err := connect.ParseCert(reply.CertPEM)
if err != nil {
return result, err
}
// Set the CA key ID so we can easily tell when a active root has changed.
state.authorityKeyID = connect.EncodeSigningKeyID(cert.AuthorityKeyId)
result.Value = &reply
// Store value not pointer so we don't accidentally mutate the cache entry
// state in Fetch.
result.State = state
result.Index = reply.ModifyIndex
return result, nil
}
// ConnectCALeafRequest is the cache.Request implementation for the
// ConnectCALeaf cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded
// directly to any Consul servers.
type ConnectCALeafRequest struct {
Token string
Datacenter string
DNSSAN []string
IPSAN []net.IP
MinQueryIndex uint64
MaxQueryTime time.Duration
acl.EnterpriseMeta
MustRevalidate bool
// The following flags indicate the entity we are requesting a cert for.
// Only one of these must be specified.
Service string // Given a Service name, not ID, the request is for a SpiffeIDService.
Agent string // Given an Agent name, not ID, the request is for a SpiffeIDAgent.
Kind structs.ServiceKind // Given "mesh-gateway", the request is for a SpiffeIDMeshGateway. No other kinds supported.
Server bool // If true, the request is for a SpiffeIDServer.
}
func (r *ConnectCALeafRequest) Key() string {
r.EnterpriseMeta.Normalize()
switch {
case r.Agent != "":
v, err := hashstructure.Hash([]interface{}{
r.Agent,
r.PartitionOrDefault(),
}, nil)
if err == nil {
return fmt.Sprintf("agent:%d", v)
}
case r.Kind == structs.ServiceKindMeshGateway:
v, err := hashstructure.Hash([]interface{}{
r.PartitionOrDefault(),
r.DNSSAN,
r.IPSAN,
}, nil)
if err == nil {
return fmt.Sprintf("kind:%d", v)
}
case r.Kind != "":
// this is not valid
case r.Server:
v, err := hashstructure.Hash([]interface{}{
"server",
r.Datacenter,
}, nil)
if err == nil {
return fmt.Sprintf("server:%d", v)
}
default:
v, err := hashstructure.Hash([]interface{}{
r.Service,
r.EnterpriseMeta,
r.DNSSAN,
r.IPSAN,
}, nil)
if err == nil {
return fmt.Sprintf("service:%d", v)
}
}
// If there is an error, we don't set the key. A blank key forces
// no cache for this request so the request is forwarded directly
// to the server.
return ""
}
func (req *ConnectCALeafRequest) TargetPartition() string {
return req.PartitionOrDefault()
}
func (r *ConnectCALeafRequest) CacheInfo() cache.RequestInfo {
return cache.RequestInfo{
Token: r.Token,
Key: r.Key(),
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MustRevalidate: r.MustRevalidate,
}
}

View File

@ -1,11 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
//go:build !consulent
// +build !consulent
package cachetype
func (req *ConnectCALeafRequest) TargetNamespace() string {
return "default"
}

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
//go:build !race
// +build !race
package cachetype
const testingRace = false

View File

@ -1,9 +0,0 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
//go:build race
// +build race
package cachetype
const testingRace = true

View File

@ -8,22 +8,23 @@ import (
"fmt"
"time"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
)
// Correlation ID for leaf cert watches.
const leafWatchID = "leaf"
// Cache is an interface to represent the necessary methods of the agent/cache.Cache.
// LeafCertManager is an interface to represent the necessary methods of the agent/leafcert.Manager.
// It is used to request and renew the server leaf certificate.
type Cache interface {
Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error
type LeafCertManager interface {
Notify(ctx context.Context, req *leafcert.ConnectCALeafRequest, correlationID string, ch chan<- cache.UpdateEvent) error
}
// TLSConfigurator is an interface to represent the necessary methods of the tlsutil.Configurator.
@ -52,7 +53,7 @@ type Config struct {
type Deps struct {
Config Config
Logger hclog.Logger
Cache Cache
LeafCertManager LeafCertManager
GetStore func() Store
TLSConfigurator TLSConfigurator
waiter retry.Waiter
@ -67,9 +68,8 @@ type CertManager struct {
// config contains agent configuration necessary for the cert manager to operate.
config Config
// cache provides an API to issue internal RPC requests and receive notifications
// when there are changes.
cache Cache
// leafCerts grants access to request and renew the server leaf cert.
leafCerts LeafCertManager
// cacheUpdateCh receives notifications of cache update events for resources watched.
cacheUpdateCh chan cache.UpdateEvent
@ -85,10 +85,13 @@ type CertManager struct {
}
func NewCertManager(deps Deps) *CertManager {
if deps.LeafCertManager == nil {
panic("LeafCertManager is required")
}
return &CertManager{
config: deps.Config,
logger: deps.Logger,
cache: deps.Cache,
leafCerts: deps.LeafCertManager,
cacheUpdateCh: make(chan cache.UpdateEvent, 1),
getStore: deps.GetStore,
tlsConfigurator: deps.TLSConfigurator,
@ -156,12 +159,12 @@ func (m *CertManager) watchServerToken(ctx context.Context) {
cancel()
notifyCtx, cancel = context.WithCancel(ctx)
req := cachetype.ConnectCALeafRequest{
req := leafcert.ConnectCALeafRequest{
Datacenter: m.config.Datacenter,
Token: token.Value,
Server: true,
}
if err := m.cache.Notify(notifyCtx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil {
if err := m.leafCerts.Notify(notifyCtx, &req, leafWatchID, m.cacheUpdateCh); err != nil {
return fmt.Errorf("failed to setup leaf cert notifications: %w", err)
}
@ -174,11 +177,11 @@ func (m *CertManager) watchServerToken(ctx context.Context) {
}
func (m *CertManager) watchLeafCert(ctx context.Context) error {
req := cachetype.ConnectCALeafRequest{
req := leafcert.ConnectCALeafRequest{
Datacenter: m.config.Datacenter,
Server: true,
}
if err := m.cache.Notify(ctx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil {
if err := m.leafCerts.Notify(ctx, &req, leafWatchID, m.cacheUpdateCh); err != nil {
return fmt.Errorf("failed to setup leaf cert notifications: %w", err)
}

View File

@ -8,13 +8,15 @@ import (
"testing"
"time"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
type fakeStore struct {
@ -109,7 +111,7 @@ type watchInfo struct {
token string
}
type fakeCache struct {
type fakeLeafCertManager struct {
updateCh chan<- cache.UpdateEvent
// watched is a map of watched correlation IDs to the ACL token of the request.
@ -120,7 +122,7 @@ type fakeCache struct {
syncCh chan struct{}
}
func (c *fakeCache) triggerLeafUpdate() {
func (c *fakeLeafCertManager) triggerLeafUpdate() {
c.updateCh <- cache.UpdateEvent{
CorrelationID: leafWatchID,
Result: &structs.IssuedCert{
@ -131,14 +133,14 @@ func (c *fakeCache) triggerLeafUpdate() {
}
}
func (c *fakeCache) Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error {
c.watched[correlationID] = watchInfo{ctx: ctx, token: r.CacheInfo().Token}
func (c *fakeLeafCertManager) Notify(ctx context.Context, r *leafcert.ConnectCALeafRequest, correlationID string, ch chan<- cache.UpdateEvent) error {
c.watched[correlationID] = watchInfo{ctx: ctx, token: r.Token}
c.updateCh = ch
c.syncCh <- struct{}{}
return nil
}
func (c *fakeCache) timeoutIfNotUpdated(t *testing.T) error {
func (c *fakeLeafCertManager) timeoutIfNotUpdated(t *testing.T) error {
t.Helper()
select {
@ -159,7 +161,7 @@ func testWaiter() retry.Waiter {
func TestCertManager_ACLsDisabled(t *testing.T) {
tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)}
cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)}
leafCerts := fakeLeafCertManager{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)}
store := fakeStore{
conf: make(chan *structs.CAConfiguration, 1),
tokenEntry: make(chan *structs.SystemMetadataEntry, 1),
@ -172,7 +174,7 @@ func TestCertManager_ACLsDisabled(t *testing.T) {
ACLsEnabled: false,
},
TLSConfigurator: &tlsConfigurator,
Cache: &cache,
LeafCertManager: &leafCerts,
GetStore: func() Store { return &store },
})
@ -185,11 +187,11 @@ func TestCertManager_ACLsDisabled(t *testing.T) {
require.Empty(t, tlsConfigurator.cert)
require.Empty(t, tlsConfigurator.peeringServerName)
require.Contains(t, cache.watched, leafWatchID)
require.Contains(t, leafCerts.watched, leafWatchID)
})
testutil.RunStep(t, "leaf cert update", func(t *testing.T) {
cache.triggerLeafUpdate()
leafCerts.triggerLeafUpdate()
// Wait for the update to arrive.
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t))
@ -214,7 +216,7 @@ func TestCertManager_ACLsDisabled(t *testing.T) {
func TestCertManager_ACLsEnabled(t *testing.T) {
tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)}
cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)}
leafCerts := fakeLeafCertManager{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)}
store := fakeStore{
conf: make(chan *structs.CAConfiguration, 1),
tokenEntry: make(chan *structs.SystemMetadataEntry, 1),
@ -227,7 +229,7 @@ func TestCertManager_ACLsEnabled(t *testing.T) {
ACLsEnabled: true,
},
TLSConfigurator: &tlsConfigurator,
Cache: &cache,
LeafCertManager: &leafCerts,
GetStore: func() Store { return &store },
})
@ -240,7 +242,7 @@ func TestCertManager_ACLsEnabled(t *testing.T) {
require.Empty(t, tlsConfigurator.cert)
require.Empty(t, tlsConfigurator.peeringServerName)
require.Empty(t, cache.watched)
require.Empty(t, leafCerts.watched)
})
var leafCtx context.Context
@ -249,16 +251,16 @@ func TestCertManager_ACLsEnabled(t *testing.T) {
testutil.RunStep(t, "server token update", func(t *testing.T) {
store.setServerToken("first-secret", tokenCanceler)
require.NoError(t, cache.timeoutIfNotUpdated(t))
require.NoError(t, leafCerts.timeoutIfNotUpdated(t))
require.Contains(t, cache.watched, leafWatchID)
require.Equal(t, "first-secret", cache.watched[leafWatchID].token)
require.Contains(t, leafCerts.watched, leafWatchID)
require.Equal(t, "first-secret", leafCerts.watched[leafWatchID].token)
leafCtx = cache.watched[leafWatchID].ctx
leafCtx = leafCerts.watched[leafWatchID].ctx
})
testutil.RunStep(t, "leaf cert update", func(t *testing.T) {
cache.triggerLeafUpdate()
leafCerts.triggerLeafUpdate()
// Wait for the update to arrive.
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t))
@ -276,15 +278,15 @@ func TestCertManager_ACLsEnabled(t *testing.T) {
// Fire the existing WatchSet to simulate a state store update.
tokenCanceler <- struct{}{}
// The leaf watch in the cache should have been reset.
require.NoError(t, cache.timeoutIfNotUpdated(t))
// The leaf watch in the leafCerts should have been reset.
require.NoError(t, leafCerts.timeoutIfNotUpdated(t))
// The original leaf watch context should have been canceled.
require.Error(t, leafCtx.Err())
// A new leaf watch is expected with the new token.
require.Contains(t, cache.watched, leafWatchID)
require.Equal(t, "second-secret", cache.watched[leafWatchID].token)
require.Contains(t, leafCerts.watched, leafWatchID)
require.Equal(t, "second-secret", leafCerts.watched[leafWatchID].token)
})
testutil.RunStep(t, "ca config update", func(t *testing.T) {

View File

@ -0,0 +1,47 @@
package leafcert
import (
"context"
"errors"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
// NewCachedRootsReader returns a RootsReader that sources data from the agent cache.
func NewCachedRootsReader(cache *cache.Cache, dc string) RootsReader {
return &agentCacheRootsReader{
cache: cache,
datacenter: dc,
}
}
type agentCacheRootsReader struct {
cache *cache.Cache
datacenter string
}
var _ RootsReader = (*agentCacheRootsReader)(nil)
func (r *agentCacheRootsReader) Get() (*structs.IndexedCARoots, error) {
// Background is fine here because this isn't a blocking query as no index is set.
// Therefore this will just either be a cache hit or return once the non-blocking query returns.
rawRoots, _, err := r.cache.Get(context.Background(), cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: r.datacenter,
})
if err != nil {
return nil, err
}
roots, ok := rawRoots.(*structs.IndexedCARoots)
if !ok {
return nil, errors.New("invalid RootCA response type")
}
return roots, nil
}
func (r *agentCacheRootsReader) Notify(ctx context.Context, correlationID string, ch chan<- cache.UpdateEvent) error {
return r.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: r.datacenter,
}, correlationID, ch)
}

133
agent/leafcert/cert.go Normal file
View File

@ -0,0 +1,133 @@
package leafcert
import (
"sync"
"time"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/ttlcache"
)
// certData tracks all of the metadata about a leaf cert.
type certData struct {
// lock locks access to all fields
lock sync.Mutex
// index is the last raft index associated with an update of the 'value' field
index uint64
// value is the last updated cert contents or nil if not populated initially
value *structs.IssuedCert
// state is metadata related to cert generation
state fetchState
// fetchedAt was the time when 'value' was last updated
fetchedAt time.Time
// refreshing indicates if there is an active request attempting to refresh
// the current leaf cert contents.
refreshing bool
// lastFetchErr is the last error encountered when attempting to populate
// the 'value' field.
lastFetchErr error
// expiry contains information about the expiration of this
// cert. This is a pointer as its shared as a value in the
// ExpiryHeap as well.
expiry *ttlcache.Entry
// refreshRateLimiter limits the rate at which the cert can be regenerated
refreshRateLimiter *rate.Limiter
}
func (c *certData) MarkRefreshing(v bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.refreshing = v
}
func (c *certData) GetValueAndState() (*structs.IssuedCert, fetchState) {
c.lock.Lock()
defer c.lock.Unlock()
return c.value, c.state
}
func (c *certData) GetError() error {
c.lock.Lock()
defer c.lock.Unlock()
return c.lastFetchErr
}
// NOTE: this function only has one goroutine in it per key at all times
func (c *certData) Update(
newCert *structs.IssuedCert,
newState fetchState,
err error,
) {
c.lock.Lock()
defer c.lock.Unlock()
// Importantly, always reset the Error. Having both Error and a Value that
// are non-nil is allowed in the cache entry but it indicates that the Error
// is _newer_ than the last good value. So if the err is nil then we need to
// reset to replace any _older_ errors and avoid them bubbling up. If the
// error is non-nil then we need to set it anyway and used to do it in the
// code below. See https://github.com/hashicorp/consul/issues/4480.
c.lastFetchErr = err
c.state = newState
if newCert != nil {
c.index = newCert.ModifyIndex
c.value = newCert
c.fetchedAt = time.Now()
}
if c.index < 1 {
// Less than one is invalid unless there was an error and in this case
// there wasn't since a value was returned. If a badly behaved RPC
// returns 0 when it has no data, we might get into a busy loop here. We
// set this to minimum of 1 which is safe because no valid user data can
// ever be written at raft index 1 due to the bootstrap process for
// raft. This insure that any subsequent background refresh request will
// always block, but allows the initial request to return immediately
// even if there is no data.
c.index = 1
}
}
// fetchState is some additional metadata we store with each cert in the cache
// to track things like expiry and coordinate paces root rotations. It's
// important this doesn't contain any pointer types since we rely on the struct
// being copied to avoid modifying the actual state in the cache entry during
// Fetch. Pointers themselves are OK, but if we point to another struct that we
// call a method or modify in some way that would directly mutate the cache and
// cause problems. We'd need to deep-clone in that case in Fetch below.
// time.Time technically contains a pointer to the Location but we ignore that
// since all times we get from our wall clock should point to the same Location
// anyway.
type fetchState struct {
// authorityKeyId is the ID of the CA key (whether root or intermediate) that signed
// the current cert. This is just to save parsing the whole cert everytime
// we have to check if the root changed.
authorityKeyID string
// forceExpireAfter is used to coordinate renewing certs after a CA rotation
// in a staggered way so that we don't overwhelm the servers.
forceExpireAfter time.Time
// activeRootRotationStart is set when the root has changed and we need to get
// a new cert but haven't got one yet. forceExpireAfter will be set to the
// next scheduled time we should try our CSR, but this is needed to calculate
// the retry windows if we are rate limited when we try. See comment on
// const caChangeJitterWindow above for more.
activeRootRotationStart time.Time
// consecutiveRateLimitErrs stores how many rate limit errors we've hit. We
// use this to choose a new window for the next retry. See comment on
// const caChangeJitterWindow above for more.
consecutiveRateLimitErrs int
}

362
agent/leafcert/generate.go Normal file
View File

@ -0,0 +1,362 @@
package leafcert
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
)
// caChangeJitterWindow is the time over which we spread each round of retries
// when attempting to get a new certificate following a root rotation. It's
// selected to be a trade-off between not making rotation unnecessarily slow on
// a tiny cluster while not hammering the servers on a huge cluster
// unnecessarily hard. Servers rate limit to protect themselves from the
// expensive crypto work, but in practice have 10k+ RPCs all in the same second
// will cause a major disruption even on large servers due to downloading the
// payloads, parsing msgpack etc. Instead we pick a window that for now is fixed
// but later might be either user configurable (not nice since it would become
// another hard-to-tune value) or set dynamically by the server based on it's
// knowledge of how many certs need to be rotated. Currently the server doesn't
// know that so we pick something that is reasonable. We err on the side of
// being slower that we need in trivial cases but gentler for large deployments.
// 30s means that even with a cluster of 10k service instances, the server only
// has to cope with ~333 RPCs a second which shouldn't be too bad if it's rate
// limiting the actual expensive crypto work.
//
// The actual backoff strategy when we are rate limited is to have each cert
// only retry once with each window of this size, at a point in the window
// selected at random. This performs much better than exponential backoff in
// terms of getting things rotated quickly with more predictable load and so
// fewer rate limited requests. See the full simulation this is based on at
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md for
// more detail.
const caChangeJitterWindow = 30 * time.Second
// NOTE: this function only has one goroutine in it per key at all times
func (m *Manager) attemptLeafRefresh(
req *ConnectCALeafRequest,
existing *structs.IssuedCert,
state fetchState,
) (*structs.IssuedCert, fetchState, error) {
if req.MaxQueryTime <= 0 {
req.MaxQueryTime = DefaultQueryTimeout
}
// Handle brand new request first as it's simplest.
if existing == nil {
return m.generateNewLeaf(req, state, true)
}
// We have a certificate in cache already. Check it's still valid.
now := time.Now()
minExpire, maxExpire := calculateSoftExpiry(now, existing)
expiresAt := minExpire.Add(lib.RandomStagger(maxExpire.Sub(minExpire)))
// Check if we have been force-expired by a root update that jittered beyond
// the timeout of the query it was running.
if !state.forceExpireAfter.IsZero() && state.forceExpireAfter.Before(expiresAt) {
expiresAt = state.forceExpireAfter
}
if expiresAt.Equal(now) || expiresAt.Before(now) {
// Already expired, just make a new one right away
return m.generateNewLeaf(req, state, false)
}
// If we called Get() with MustRevalidate then this call came from a non-blocking query.
// Any prior CA rotations should've already expired the cert.
// All we need to do is check whether the current CA is the one that signed the leaf. If not, generate a new leaf.
// This is not a perfect solution (as a CA rotation update can be missed) but it should take care of instances like
// see https://github.com/hashicorp/consul/issues/10871, https://github.com/hashicorp/consul/issues/9862
// This seems to me like a hack, so maybe we can revisit the caching/ fetching logic in this case
if req.MustRevalidate {
roots, err := m.rootsReader.Get()
if err != nil {
return nil, state, err
} else if roots == nil {
return nil, state, errors.New("no CA roots")
}
if activeRootHasKey(roots, state.authorityKeyID) {
return nil, state, nil
}
// if we reach here then the current leaf was not signed by the same CAs, just regen
return m.generateNewLeaf(req, state, false)
}
// We are about to block and wait for a change or timeout.
// Make a chan we can be notified of changes to CA roots on. It must be
// buffered so we don't miss broadcasts from rootsWatch. It is an edge trigger
// so a single buffer element is sufficient regardless of whether we consume
// the updates fast enough since as soon as we see an element in it, we will
// reload latest CA from cache.
rootUpdateCh := make(chan struct{}, 1)
// The roots may have changed in between blocking calls. We need to verify
// that the existing cert was signed by the current root. If it was we still
// want to do the whole jitter thing. We could code that again here but it's
// identical to the select case below so we just trigger our own update chan
// and let the logic below handle checking if the CA actually changed in the
// common case where it didn't it is a no-op anyway.
rootUpdateCh <- struct{}{}
// Subscribe our chan to get root update notification.
m.rootWatcher.Subscribe(rootUpdateCh)
defer m.rootWatcher.Unsubscribe(rootUpdateCh)
// Setup the timeout chan outside the loop so we don't keep bumping the timeout
// later if we loop around.
timeoutTimer := time.NewTimer(req.MaxQueryTime)
defer timeoutTimer.Stop()
// Setup initial expiry chan. We may change this if root update occurs in the
// loop below.
expiresTimer := time.NewTimer(expiresAt.Sub(now))
defer func() {
// Resolve the timer reference at defer time, so we use the latest one each time.
expiresTimer.Stop()
}()
// Current cert is valid so just wait until it expires or we time out.
for {
select {
case <-timeoutTimer.C:
// We timed out the request with same cert.
return nil, state, nil
case <-expiresTimer.C:
// Cert expired or was force-expired by a root change.
return m.generateNewLeaf(req, state, false)
case <-rootUpdateCh:
// A root cache change occurred, reload roots from cache.
roots, err := m.rootsReader.Get()
if err != nil {
return nil, state, err
} else if roots == nil {
return nil, state, errors.New("no CA roots")
}
// Handle _possibly_ changed roots. We still need to verify the new active
// root is not the same as the one our current cert was signed by since we
// can be notified spuriously if we are the first request since the
// rootsWatcher didn't know about the CA we were signed by. We also rely
// on this on every request to do the initial check that the current roots
// are the same ones the current cert was signed by.
if activeRootHasKey(roots, state.authorityKeyID) {
// Current active CA is the same one that signed our current cert so
// keep waiting for a change.
continue
}
state.activeRootRotationStart = time.Now()
// CA root changed. We add some jitter here to avoid a thundering herd.
// See docs on caChangeJitterWindow const.
delay := m.getJitteredCAChangeDelay()
// Force the cert to be expired after the jitter - the delay above might
// be longer than we have left on our timeout. We set forceExpireAfter in
// the cache state so the next request will notice we still need to renew
// and do it at the right time. This is cleared once a new cert is
// returned by generateNewLeaf.
state.forceExpireAfter = state.activeRootRotationStart.Add(delay)
// If the delay time is within the current timeout, we want to renew the
// as soon as it's up. We change the expire time and chan so that when we
// loop back around, we'll wait at most delay until generating a new cert.
if state.forceExpireAfter.Before(expiresAt) {
expiresAt = state.forceExpireAfter
// Stop the former one and create a new one.
expiresTimer.Stop()
expiresTimer = time.NewTimer(delay)
}
continue
}
}
}
func (m *Manager) getJitteredCAChangeDelay() time.Duration {
if m.config.TestOverrideCAChangeInitialDelay > 0 {
return m.config.TestOverrideCAChangeInitialDelay
}
// CA root changed. We add some jitter here to avoid a thundering herd.
// See docs on caChangeJitterWindow const.
return lib.RandomStagger(caChangeJitterWindow)
}
func activeRootHasKey(roots *structs.IndexedCARoots, currentSigningKeyID string) bool {
for _, ca := range roots.Roots {
if ca.Active {
return ca.SigningKeyID == currentSigningKeyID
}
}
// Shouldn't be possible since at least one root should be active.
return false
}
// generateNewLeaf does the actual work of creating a new private key,
// generating a CSR and getting it signed by the servers.
//
// NOTE: do not hold the lock while doing the RPC/blocking stuff
func (m *Manager) generateNewLeaf(
req *ConnectCALeafRequest,
newState fetchState,
firstTime bool,
) (*structs.IssuedCert, fetchState, error) {
// Need to lookup RootCAs response to discover trust domain. This should be a
// cache hit.
roots, err := m.rootsReader.Get()
if err != nil {
return nil, newState, err
} else if roots == nil {
return nil, newState, errors.New("no CA roots")
}
if roots.TrustDomain == "" {
return nil, newState, errors.New("cluster has no CA bootstrapped yet")
}
// Build the cert uri
var id connect.CertURI
var dnsNames []string
var ipAddresses []net.IP
switch {
case req.Service != "":
id = &connect.SpiffeIDService{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
Partition: req.TargetPartition(),
Namespace: req.TargetNamespace(),
Service: req.Service,
}
dnsNames = append(dnsNames, req.DNSSAN...)
case req.Agent != "":
id = &connect.SpiffeIDAgent{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
Partition: req.TargetPartition(),
Agent: req.Agent,
}
dnsNames = append([]string{"localhost"}, req.DNSSAN...)
ipAddresses = append([]net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")}, req.IPSAN...)
case req.Kind == structs.ServiceKindMeshGateway:
id = &connect.SpiffeIDMeshGateway{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
Partition: req.TargetPartition(),
}
dnsNames = append(dnsNames, req.DNSSAN...)
case req.Kind != "":
return nil, newState, fmt.Errorf("unsupported kind: %s", req.Kind)
case req.Server:
if req.Datacenter == "" {
return nil, newState, errors.New("datacenter name must be specified")
}
id = &connect.SpiffeIDServer{
Host: roots.TrustDomain,
Datacenter: req.Datacenter,
}
dnsNames = append(dnsNames, connect.PeeringServerSAN(req.Datacenter, roots.TrustDomain))
default:
return nil, newState, errors.New("URI must be either service, agent, server, or kind")
}
// Create a new private key
// TODO: for now we always generate EC keys on clients regardless of the key
// type being used by the active CA. This is fine and allowed in TLS1.2 and
// signing EC CSRs with an RSA key is supported by all current CA providers so
// it's OK. IFF we ever need to support a CA provider that refuses to sign a
// CSR with a different signature algorithm, or if we have compatibility
// issues with external PKI systems that require EC certs be signed with ECDSA
// from the CA (this was required in TLS1.1 but not in 1.2) then we can
// instead intelligently pick the key type we generate here based on the key
// type of the active signing CA. We already have that loaded since we need
// the trust domain.
pk, pkPEM, err := connect.GeneratePrivateKey()
if err != nil {
return nil, newState, err
}
// Create a CSR.
csr, err := connect.CreateCSR(id, pk, dnsNames, ipAddresses)
if err != nil {
return nil, newState, err
}
// Request signing
args := structs.CASignRequest{
WriteRequest: structs.WriteRequest{Token: req.Token},
Datacenter: req.Datacenter,
CSR: csr,
}
reply, err := m.certSigner.SignCert(context.Background(), &args)
if err != nil {
if err.Error() == consul.ErrRateLimited.Error() {
if firstTime {
// This was a first fetch - we have no good value in cache. In this case
// we just return the error to the caller rather than rely on surprising
// semi-blocking until the rate limit is appeased or we timeout
// behavior. It's likely the caller isn't expecting this to block since
// it's an initial fetch. This also massively simplifies this edge case.
return nil, newState, err
}
if newState.activeRootRotationStart.IsZero() {
// We hit a rate limit error by chance - for example a cert expired
// before the root rotation was observed (not triggered by rotation) but
// while server is working through high load from a recent rotation.
// Just pretend there is a rotation and the retry logic here will start
// jittering and retrying in the same way from now.
newState.activeRootRotationStart = time.Now()
}
// Increment the errors in the state
newState.consecutiveRateLimitErrs++
delay := m.getJitteredCAChangeDelay()
// Find the start of the next window we can retry in. See comment on
// caChangeJitterWindow for details of why we use this strategy.
windowStart := newState.activeRootRotationStart.Add(
time.Duration(newState.consecutiveRateLimitErrs) * delay)
// Pick a random time in that window
newState.forceExpireAfter = windowStart.Add(delay)
// Return a result with the existing cert but the new state - the cache
// will see this as no change. Note that we always have an existing result
// here due to the nil value check above.
return nil, newState, nil
}
return nil, newState, err
}
reply.PrivateKeyPEM = pkPEM
// Reset rotation state
newState.forceExpireAfter = time.Time{}
newState.consecutiveRateLimitErrs = 0
newState.activeRootRotationStart = time.Time{}
cert, err := connect.ParseCert(reply.CertPEM)
if err != nil {
return nil, newState, err
}
// Set the CA key ID so we can easily tell when a active root has changed.
newState.authorityKeyID = connect.EncodeSigningKeyID(cert.AuthorityKeyId)
return reply, newState, nil
}

556
agent/leafcert/leafcert.go Normal file
View File

@ -0,0 +1,556 @@
package leafcert
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"golang.org/x/sync/singleflight"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/ttlcache"
)
const (
DefaultLastGetTTL = 72 * time.Hour // reasonable default is days
// DefaultLeafCertRefreshRate is the default rate at which certs can be refreshed.
// This defaults to not being limited
DefaultLeafCertRefreshRate = rate.Inf
// DefaultLeafCertRefreshMaxBurst is the number of cache entry fetches that can
// occur in a burst.
DefaultLeafCertRefreshMaxBurst = 2
DefaultLeafCertRefreshBackoffMin = 3 // 3 attempts before backing off
DefaultLeafCertRefreshMaxWait = 1 * time.Minute // maximum backoff wait time
DefaultQueryTimeout = 10 * time.Minute
)
type Config struct {
// LastGetTTL is the time that the certs returned by this type remain in
// the cache after the last get operation. If a cert isn't accessed within
// this duration, the certs is purged and background refreshing will cease.
LastGetTTL time.Duration
// LeafCertRefreshMaxBurst max burst size of RateLimit for a single cache entry
LeafCertRefreshMaxBurst int
// LeafCertRefreshRate represents the max calls/sec for a single cache entry
LeafCertRefreshRate rate.Limit
// LeafCertRefreshBackoffMin is the number of attempts to wait before
// backing off.
//
// Mostly configurable just for testing.
LeafCertRefreshBackoffMin uint
// LeafCertRefreshMaxWait is the maximum backoff wait time.
//
// Mostly configurable just for testing.
LeafCertRefreshMaxWait time.Duration
// TestOverrideCAChangeInitialDelay allows overriding the random jitter
// after a root change with a fixed delay. So far ths is only done in
// tests. If it's zero the caChangeInitialSpreadDefault maximum jitter will
// be used but if set, it overrides and provides a fixed delay. To
// essentially disable the delay in tests they can set it to 1 nanosecond.
// We may separately allow configuring the jitter limit by users later but
// this is different and for tests only since we need to set a
// deterministic time delay in order to test the behavior here fully and
// determinstically.
TestOverrideCAChangeInitialDelay time.Duration
}
func (c Config) withDefaults() Config {
if c.LastGetTTL <= 0 {
c.LastGetTTL = DefaultLastGetTTL
}
if c.LeafCertRefreshRate == 0.0 {
c.LeafCertRefreshRate = DefaultLeafCertRefreshRate
}
if c.LeafCertRefreshMaxBurst == 0 {
c.LeafCertRefreshMaxBurst = DefaultLeafCertRefreshMaxBurst
}
if c.LeafCertRefreshBackoffMin == 0 {
c.LeafCertRefreshBackoffMin = DefaultLeafCertRefreshBackoffMin
}
if c.LeafCertRefreshMaxWait == 0 {
c.LeafCertRefreshMaxWait = DefaultLeafCertRefreshMaxWait
}
return c
}
type Deps struct {
Config Config
Logger hclog.Logger
// RootsReader is an interface to access connect CA roots.
RootsReader RootsReader
// CertSigner is an interface to remotely sign certificates.
CertSigner CertSigner
}
type RootsReader interface {
Get() (*structs.IndexedCARoots, error)
Notify(ctx context.Context, correlationID string, ch chan<- cache.UpdateEvent) error
}
type CertSigner interface {
SignCert(ctx context.Context, args *structs.CASignRequest) (*structs.IssuedCert, error)
}
func NewManager(deps Deps) *Manager {
deps.Config = deps.Config.withDefaults()
if deps.Logger == nil {
deps.Logger = hclog.NewNullLogger()
}
if deps.RootsReader == nil {
panic("RootsReader is required")
}
if deps.CertSigner == nil {
panic("CertSigner is required")
}
m := &Manager{
config: deps.Config,
logger: deps.Logger,
certSigner: deps.CertSigner,
rootsReader: deps.RootsReader,
//
certs: make(map[string]*certData),
certsExpiryHeap: ttlcache.NewExpiryHeap(),
}
m.ctx, m.ctxCancel = context.WithCancel(context.Background())
m.rootWatcher = &rootWatcher{
ctx: m.ctx,
rootsReader: m.rootsReader,
}
// Start the expiry watcher
go m.runExpiryLoop()
return m
}
type Manager struct {
logger hclog.Logger
// config contains agent configuration necessary for the cert manager to operate.
config Config
// rootsReader is an interface to access connect CA roots.
rootsReader RootsReader
// certSigner is an interface to remotely sign certificates.
certSigner CertSigner
// rootWatcher helps let multiple requests for leaf certs to coordinate
// sharing a single long-lived watch for the root certs. This allows the
// leaf cert requests to notice when the roots rotate and trigger their
// reissuance.
rootWatcher *rootWatcher
// This is the "top-level" internal context. This is used to cancel
// background operations.
ctx context.Context
ctxCancel context.CancelFunc
// lock guards access to certs and certsExpiryHeap
lock sync.RWMutex
certs map[string]*certData
certsExpiryHeap *ttlcache.ExpiryHeap
// certGroup is a singleflight group keyed identically to the certs map.
// When the leaf cert itself needs replacement requests will coalesce
// together through this chokepoint.
certGroup singleflight.Group
}
func (m *Manager) getCertData(key string) *certData {
m.lock.RLock()
cd, ok := m.certs[key]
m.lock.RUnlock()
if ok {
return cd
}
m.lock.Lock()
defer m.lock.Unlock()
cd, ok = m.certs[key]
if !ok {
cd = &certData{
expiry: m.certsExpiryHeap.Add(key, m.config.LastGetTTL),
refreshRateLimiter: rate.NewLimiter(
m.config.LeafCertRefreshRate,
m.config.LeafCertRefreshMaxBurst,
),
}
m.certs[key] = cd
metrics.SetGauge([]string{"leaf-certs", "entries_count"}, float32(len(m.certs)))
}
return cd
}
// Stop stops any background work and frees all resources for the manager.
// Current fetch requests are allowed to continue to completion and callers may
// still access the current leaf cert values so coordination isn't needed with
// callers, however no background activity will continue. It's intended to
// close the manager at agent shutdown so no further requests should be made,
// however concurrent or in-flight ones won't break.
func (m *Manager) Stop() {
if m.ctxCancel != nil {
m.ctxCancel()
m.ctxCancel = nil
}
}
// Get returns the leaf cert for the request. If data satisfying the
// minimum index is present, it is returned immediately. Otherwise,
// this will block until the cert is refreshed or the request timeout is
// reached.
//
// Multiple Get calls for the same logical request will block on a single
// network request.
//
// The timeout specified by the request will be the timeout on the cache
// Get, and does not correspond to the timeout of any background data
// fetching. If the timeout is reached before data satisfying the minimum
// index is retrieved, the last known value (maybe nil) is returned. No
// error is returned on timeout. This matches the behavior of Consul blocking
// queries.
func (m *Manager) Get(ctx context.Context, req *ConnectCALeafRequest) (*structs.IssuedCert, cache.ResultMeta, error) {
// Lightweight copy this object so that manipulating req doesn't race.
dup := *req
req = &dup
// We don't want non-blocking queries to return expired leaf certs
// or leaf certs not valid under the current CA. So always revalidate
// the leaf cert on non-blocking queries (ie when MinQueryIndex == 0)
//
// NOTE: This conditional was formerly only in the API endpoint.
if req.MinQueryIndex == 0 {
req.MustRevalidate = true
}
return m.internalGet(ctx, req)
}
func (m *Manager) internalGet(ctx context.Context, req *ConnectCALeafRequest) (*structs.IssuedCert, cache.ResultMeta, error) {
key := req.Key()
if key == "" {
return nil, cache.ResultMeta{}, fmt.Errorf("a key is required")
}
if req.MaxQueryTime <= 0 {
req.MaxQueryTime = DefaultQueryTimeout
}
timeoutTimer := time.NewTimer(req.MaxQueryTime)
defer timeoutTimer.Stop()
// First time through
first := true
for {
// Get the current value
cd := m.getCertData(key)
cd.lock.Lock()
var (
existing = cd.value
existingIndex = cd.index
refreshing = cd.refreshing
fetchedAt = cd.fetchedAt
lastFetchErr = cd.lastFetchErr
expiry = cd.expiry
)
cd.lock.Unlock()
shouldReplaceCert := certNeedsUpdate(req, existingIndex, existing, refreshing)
if expiry != nil {
// The entry already exists in the TTL heap, touch it to keep it alive since
// this Get is still interested in the value. Note that we used to only do
// this in the `entryValid` block below but that means that a cache entry
// will expire after it's TTL regardless of how many callers are waiting for
// updates in this method in a couple of cases:
//
// 1. If the agent is disconnected from servers for the TTL then the client
// will be in backoff getting errors on each call to Get and since an
// errored cache entry has Valid = false it won't be touching the TTL.
//
// 2. If the value is just not changing then the client's current index
// will be equal to the entry index and entryValid will be false. This
// is a common case!
//
// But regardless of the state of the entry, assuming it's already in the
// TTL heap, we should touch it every time around here since this caller at
// least still cares about the value!
m.lock.Lock()
m.certsExpiryHeap.Update(expiry.Index(), m.config.LastGetTTL)
m.lock.Unlock()
}
if !shouldReplaceCert {
meta := cache.ResultMeta{
Index: existingIndex,
}
if first {
meta.Hit = true
}
// For non-background refresh types, the age is just how long since we
// fetched it last.
if !fetchedAt.IsZero() {
meta.Age = time.Since(fetchedAt)
}
// We purposely do not return an error here since the cache only works with
// fetching values that either have a value or have an error, but not both.
// The Error may be non-nil in the entry in the case that an error has
// occurred _since_ the last good value, but we still want to return the
// good value to clients that are not requesting a specific version. The
// effect of this is that blocking clients will all see an error immediately
// without waiting a whole timeout to see it, but clients that just look up
// cache with an older index than the last valid result will still see the
// result and not the error here. I.e. the error is not "cached" without a
// new fetch attempt occurring, but the last good value can still be fetched
// from cache.
return existing, meta, nil
}
// If this isn't our first time through and our last value has an error, then
// we return the error. This has the behavior that we don't sit in a retry
// loop getting the same error for the entire duration of the timeout.
// Instead, we make one effort to fetch a new value, and if there was an
// error, we return. Note that the invariant is that if both entry.Value AND
// entry.Error are non-nil, the error _must_ be more recent than the Value. In
// other words valid fetches should reset the error. See
// https://github.com/hashicorp/consul/issues/4480.
if !first && lastFetchErr != nil {
return existing, cache.ResultMeta{Index: existingIndex}, lastFetchErr
}
notifyCh := m.triggerCertRefreshInGroup(req, cd)
// No longer our first time through
first = false
select {
case <-ctx.Done():
return nil, cache.ResultMeta{}, ctx.Err()
case <-notifyCh:
// Our fetch returned, retry the get from the cache.
req.MustRevalidate = false
case <-timeoutTimer.C:
// Timeout on the cache read, just return whatever we have.
return existing, cache.ResultMeta{Index: existingIndex}, nil
}
}
}
func certNeedsUpdate(req *ConnectCALeafRequest, index uint64, value *structs.IssuedCert, refreshing bool) bool {
if value == nil {
return true
}
if req.MinQueryIndex > 0 && req.MinQueryIndex >= index {
// MinIndex was given and matches or is higher than current value so we
// ignore the cache and fallthrough to blocking on a new value.
return true
}
// Check if re-validate is requested. If so the first time round the
// loop is not a hit but subsequent ones should be treated normally.
if req.MustRevalidate {
// It is important to note that this block ONLY applies when we are not
// in indefinite refresh mode (where the underlying goroutine will
// continue to re-query for data).
//
// In this mode goroutines have a 1:1 relationship to RPCs that get
// executed, and importantly they DO NOT SLEEP after executing.
//
// This means that a running goroutine for this cache entry extremely
// strongly implies that the RPC has not yet completed, which is why
// this check works for the revalidation-avoidance optimization here.
if refreshing {
// There is an active goroutine performing a blocking query for
// this data, which has not returned.
//
// We can logically deduce that the contents of the cache are
// actually current, and we can simply return this while leaving
// the blocking query alone.
return false
} else {
return true
}
}
return false
}
func (m *Manager) triggerCertRefreshInGroup(req *ConnectCALeafRequest, cd *certData) <-chan singleflight.Result {
// Lightweight copy this object so that manipulating req doesn't race.
dup := *req
req = &dup
if req.MaxQueryTime == 0 {
req.MaxQueryTime = DefaultQueryTimeout
}
// At this point, we know we either don't have a cert at all or the
// cert we have is too old. We need to mint a new one.
//
// We use a singleflight group to coordinate only one request driving
// the async update to the key at once.
//
// NOTE: this anonymous function only has one goroutine in it per key at all times
return m.certGroup.DoChan(req.Key(), func() (any, error) {
cd.lock.Lock()
var (
shouldReplaceCert = certNeedsUpdate(req, cd.index, cd.value, cd.refreshing)
rateLimiter = cd.refreshRateLimiter
lastIndex = cd.index
)
cd.lock.Unlock()
if !shouldReplaceCert {
// This handles the case where a fetch succeeded after checking for
// its existence in Get. This ensures that we don't miss updates
// since we don't hold the lock between the read and then the
// refresh trigger.
return nil, nil
}
if err := rateLimiter.Wait(m.ctx); err != nil {
// NOTE: this can only happen when the entire cache is being
// shutdown and isn't something that can happen normally.
return nil, nil
}
cd.MarkRefreshing(true)
defer cd.MarkRefreshing(false)
req.MinQueryIndex = lastIndex
// Start building the new entry by blocking on the fetch.
m.refreshLeafAndUpdate(req, cd)
return nil, nil
})
}
// testGet is a way for the test code to do a get but from the middle of the
// logic stack, skipping some of the caching logic.
func (m *Manager) testGet(req *ConnectCALeafRequest) (uint64, *structs.IssuedCert, error) {
cd := m.getCertData(req.Key())
m.refreshLeafAndUpdate(req, cd)
cd.lock.Lock()
var (
index = cd.index
cert = cd.value
err = cd.lastFetchErr
)
cd.lock.Unlock()
if err != nil {
return 0, nil, err
}
return index, cert, nil
}
// refreshLeafAndUpdate will try to refresh the leaf and persist the updated
// data back to the in-memory store.
//
// NOTE: this function only has one goroutine in it per key at all times
func (m *Manager) refreshLeafAndUpdate(req *ConnectCALeafRequest, cd *certData) {
existing, state := cd.GetValueAndState()
newCert, updatedState, err := m.attemptLeafRefresh(req, existing, state)
cd.Update(newCert, updatedState, err)
}
// Prepopulate puts a cert in manually. This is useful when the correct initial
// value is known and the cache shouldn't refetch the same thing on startup. It
// is used to set AgentLeafCert when AutoEncrypt.TLS is turned on. The manager
// itself cannot fetch that the first time because it requires a special
// RPCType. Subsequent runs are fine though.
func (m *Manager) Prepopulate(
ctx context.Context,
key string,
index uint64,
value *structs.IssuedCert,
authorityKeyID string,
) error {
if value == nil {
return errors.New("value is required")
}
cd := m.getCertData(key)
cd.lock.Lock()
defer cd.lock.Unlock()
cd.index = index
cd.value = value
cd.state = fetchState{
authorityKeyID: authorityKeyID,
forceExpireAfter: time.Time{},
consecutiveRateLimitErrs: 0,
activeRootRotationStart: time.Time{},
}
return nil
}
// runExpiryLoop is a blocking function that watches the expiration
// heap and invalidates cert entries that have expired.
func (m *Manager) runExpiryLoop() {
for {
m.lock.RLock()
timer := m.certsExpiryHeap.Next()
m.lock.RUnlock()
select {
case <-m.ctx.Done():
timer.Stop()
return
case <-m.certsExpiryHeap.NotifyCh:
timer.Stop()
continue
case <-timer.Wait():
m.lock.Lock()
entry := timer.Entry
// Entry expired! Remove it.
delete(m.certs, entry.Key())
m.certsExpiryHeap.Remove(entry.Index())
// Set some metrics
metrics.IncrCounter([]string{"leaf-certs", "evict_expired"}, 1)
metrics.SetGauge([]string{"leaf-certs", "entries_count"}, float32(len(m.certs)))
m.lock.Unlock()
}
}
}

File diff suppressed because it is too large Load Diff

152
agent/leafcert/roots.go Normal file
View File

@ -0,0 +1,152 @@
package leafcert
import (
"context"
"sync"
"sync/atomic"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// rootWatcher helps let multiple requests for leaf certs to coordinate sharing
// a single long-lived watch for the root certs. This allows the leaf cert
// requests to notice when the roots rotate and trigger their reissuance.
type rootWatcher struct {
// This is the "top-level" internal context. This is used to cancel
// background operations.
ctx context.Context
// rootsReader is an interface to access connect CA roots.
rootsReader RootsReader
// lock protects access to the subscribers map and cancel
lock sync.Mutex
// subscribers is a set of chans, one for each currently in-flight
// Fetch. These chans have root updates delivered from the root watcher.
subscribers map[chan struct{}]struct{}
// cancel is a func to call to stop the background root watch if any.
// You must hold lock to read (e.g. call) or write the value.
cancel func()
// testStart/StopCount are testing helpers that allow tests to
// observe the reference counting behavior that governs the shared root watch.
// It's not exactly pretty to expose internals like this, but seems cleaner
// than constructing elaborate and brittle test cases that we can infer
// correct behavior from, and simpler than trying to probe runtime goroutine
// traces to infer correct behavior that way. They must be accessed
// atomically.
testStartCount uint32
testStopCount uint32
}
// Subscribe is called on each fetch that is about to block and wait for
// changes to the leaf. It subscribes a chan to receive updates from the shared
// root watcher and triggers root watcher if it's not already running.
func (r *rootWatcher) Subscribe(rootUpdateCh chan struct{}) {
r.lock.Lock()
defer r.lock.Unlock()
// Lazy allocation
if r.subscribers == nil {
r.subscribers = make(map[chan struct{}]struct{})
}
// Make sure a root watcher is running. We don't only do this on first request
// to be more tolerant of errors that could cause the root watcher to fail and
// exit.
if r.cancel == nil {
ctx, cancel := context.WithCancel(r.ctx)
r.cancel = cancel
go r.rootWatcher(ctx)
}
r.subscribers[rootUpdateCh] = struct{}{}
}
// Unsubscribe is called when a blocking call exits to unsubscribe from root
// updates and possibly stop the shared root watcher if it's no longer needed.
// Note that typically root CA is still being watched by clients directly and
// probably by the ProxyConfigManager so it will stay hot in cache for a while,
// we are just not monitoring it for updates any more.
func (r *rootWatcher) Unsubscribe(rootUpdateCh chan struct{}) {
r.lock.Lock()
defer r.lock.Unlock()
delete(r.subscribers, rootUpdateCh)
if len(r.subscribers) == 0 && r.cancel != nil {
// This was the last request. Stop the root watcher.
r.cancel()
r.cancel = nil
}
}
func (r *rootWatcher) notifySubscribers() {
r.lock.Lock()
defer r.lock.Unlock()
for ch := range r.subscribers {
select {
case ch <- struct{}{}:
default:
// Don't block - chans are 1-buffered so this default case
// means the subscriber already holds an update signal.
}
}
}
// rootWatcher is the shared rootWatcher that runs in a background goroutine
// while needed by one or more inflight Fetch calls.
func (r *rootWatcher) rootWatcher(ctx context.Context) {
atomic.AddUint32(&r.testStartCount, 1)
defer atomic.AddUint32(&r.testStopCount, 1)
ch := make(chan cache.UpdateEvent, 1)
if err := r.rootsReader.Notify(ctx, "roots", ch); err != nil {
// Trigger all inflight watchers. We don't pass the error, but they will
// reload from cache and observe the same error and return it to the caller,
// or if it's transient, will continue and the next Fetch will get us back
// into the right state. Seems better than busy loop-retrying here given
// that almost any error we would see here would also be returned from the
// cache get this will trigger.
r.notifySubscribers()
return
}
var oldRoots *structs.IndexedCARoots
// Wait for updates to roots or all requests to stop
for {
select {
case <-ctx.Done():
return
case e := <-ch:
// Root response changed in some way. Note this might be the initial
// fetch.
if e.Err != nil {
// See above rationale about the error propagation
r.notifySubscribers()
continue
}
roots, ok := e.Result.(*structs.IndexedCARoots)
if !ok {
// See above rationale about the error propagation
r.notifySubscribers()
continue
}
// Check that the active root is actually different from the last CA
// config there are many reasons the config might have changed without
// actually updating the CA root that is signing certs in the cluster.
// The Fetch calls will also validate this since the first call here we
// don't know if it changed or not, but there is no point waking up all
// Fetch calls to check this if we know none of them will need to act on
// this update.
if oldRoots != nil && oldRoots.ActiveRootID == roots.ActiveRootID {
continue
}
// Distribute the update to all inflight requests - they will decide
// whether or not they need to act on it.
r.notifySubscribers()
oldRoots = roots
}
}
}

View File

@ -0,0 +1,35 @@
package leafcert
import (
"context"
"github.com/hashicorp/consul/agent/structs"
)
// NetRPC is an interface that an NetRPC client must implement. This is a helper
// interface that is implemented by the agent delegate so that Type
// implementations can request NetRPC access.
type NetRPC interface {
RPC(ctx context.Context, method string, args any, reply any) error
}
// NewNetRPCCertSigner returns a CertSigner that uses net-rpc to sign certs.
func NewNetRPCCertSigner(netRPC NetRPC) CertSigner {
return &netRPCCertSigner{netRPC: netRPC}
}
type netRPCCertSigner struct {
// NetRPC is an RPC client for remote cert signing requests.
netRPC NetRPC
}
var _ CertSigner = (*netRPCCertSigner)(nil)
func (s *netRPCCertSigner) SignCert(ctx context.Context, args *structs.CASignRequest) (*structs.IssuedCert, error) {
var reply structs.IssuedCert
err := s.netRPC.RPC(ctx, "ConnectCA.Sign", args, &reply)
if err != nil {
return nil, err
}
return &reply, nil
}

View File

@ -0,0 +1,243 @@
package leafcert
import (
"bytes"
"context"
"crypto/rand"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
)
// testSigner implements NetRPC and handles leaf signing operations
type testSigner struct {
caLock sync.Mutex
ca *structs.CARoot
prevRoots []*structs.CARoot // remember prior ones
IDGenerator *atomic.Uint64
RootsReader *testRootsReader
signCallLock sync.Mutex
signCallErrors []error
signCallErrorCount uint64
signCallCapture []*structs.CASignRequest
}
var _ CertSigner = (*testSigner)(nil)
var ReplyWithExpiredCert = errors.New("reply with expired cert")
func newTestSigner(t *testing.T, idGenerator *atomic.Uint64, rootsReader *testRootsReader) *testSigner {
if idGenerator == nil {
idGenerator = &atomic.Uint64{}
}
if rootsReader == nil {
rootsReader = newTestRootsReader(t)
}
s := &testSigner{
IDGenerator: idGenerator,
RootsReader: rootsReader,
}
return s
}
func (s *testSigner) SetSignCallErrors(errs ...error) {
s.signCallLock.Lock()
defer s.signCallLock.Unlock()
s.signCallErrors = append(s.signCallErrors, errs...)
}
func (s *testSigner) GetSignCallErrorCount() uint64 {
s.signCallLock.Lock()
defer s.signCallLock.Unlock()
return s.signCallErrorCount
}
func (s *testSigner) UpdateCA(t *testing.T, ca *structs.CARoot) *structs.CARoot {
if ca == nil {
ca = connect.TestCA(t, nil)
}
roots := &structs.IndexedCARoots{
ActiveRootID: ca.ID,
TrustDomain: connect.TestTrustDomain,
Roots: []*structs.CARoot{ca},
QueryMeta: structs.QueryMeta{Index: s.nextIndex()},
}
// Update the signer first.
s.caLock.Lock()
{
s.ca = ca
roots.Roots = append(roots.Roots, s.prevRoots...)
// Remember for the next rotation.
dup := ca.Clone()
dup.Active = false
s.prevRoots = append(s.prevRoots, dup)
}
s.caLock.Unlock()
// Then trigger an event when updating the roots.
s.RootsReader.Set(roots)
return ca
}
func (s *testSigner) nextIndex() uint64 {
return s.IDGenerator.Add(1)
}
func (s *testSigner) getCA() *structs.CARoot {
s.caLock.Lock()
defer s.caLock.Unlock()
return s.ca
}
func (s *testSigner) GetCapture(idx int) *structs.CASignRequest {
s.signCallLock.Lock()
defer s.signCallLock.Unlock()
if len(s.signCallCapture) > idx {
return s.signCallCapture[idx]
}
return nil
}
func (s *testSigner) SignCert(ctx context.Context, req *structs.CASignRequest) (*structs.IssuedCert, error) {
useExpiredCert := false
s.signCallLock.Lock()
s.signCallCapture = append(s.signCallCapture, req)
if len(s.signCallErrors) > 0 {
err := s.signCallErrors[0]
s.signCallErrors = s.signCallErrors[1:]
if err == ReplyWithExpiredCert {
useExpiredCert = true
} else if err != nil {
s.signCallErrorCount++
s.signCallLock.Unlock()
return nil, err
}
}
s.signCallLock.Unlock()
// parts of this were inlined from CAManager and the connect ca provider
ca := s.getCA()
if ca == nil {
return nil, fmt.Errorf("must call UpdateCA at least once")
}
csr, err := connect.ParseCSR(req.CSR)
if err != nil {
return nil, fmt.Errorf("error parsing CSR: %w", err)
}
connect.HackSANExtensionForCSR(csr)
spiffeID, err := connect.ParseCertURI(csr.URIs[0])
if err != nil {
return nil, fmt.Errorf("error parsing CSR URI: %w", err)
}
serviceID, isService := spiffeID.(*connect.SpiffeIDService)
if !isService {
return nil, fmt.Errorf("unexpected spiffeID type %T", spiffeID)
}
signer, err := connect.ParseSigner(ca.SigningKey)
if err != nil {
return nil, fmt.Errorf("error parsing CA signing key: %w", err)
}
keyId, err := connect.KeyId(signer.Public())
if err != nil {
return nil, fmt.Errorf("error forming CA key id from public key: %w", err)
}
subjectKeyID, err := connect.KeyId(csr.PublicKey)
if err != nil {
return nil, fmt.Errorf("error forming subject key id from public key: %w", err)
}
caCert, err := connect.ParseCert(ca.RootCert)
if err != nil {
return nil, fmt.Errorf("error parsing CA root cert pem: %w", err)
}
const expiration = 10 * time.Minute
now := time.Now()
template := x509.Certificate{
SerialNumber: big.NewInt(int64(s.nextIndex())),
URIs: csr.URIs,
Signature: csr.Signature,
// We use the correct signature algorithm for the CA key we are signing with
// regardless of the algorithm used to sign the CSR signature above since
// the leaf might use a different key type.
SignatureAlgorithm: connect.SigAlgoForKey(signer),
PublicKeyAlgorithm: csr.PublicKeyAlgorithm,
PublicKey: csr.PublicKey,
BasicConstraintsValid: true,
KeyUsage: x509.KeyUsageDataEncipherment |
x509.KeyUsageKeyAgreement |
x509.KeyUsageDigitalSignature |
x509.KeyUsageKeyEncipherment,
ExtKeyUsage: []x509.ExtKeyUsage{
x509.ExtKeyUsageClientAuth,
x509.ExtKeyUsageServerAuth,
},
NotAfter: now.Add(expiration),
NotBefore: now,
AuthorityKeyId: keyId,
SubjectKeyId: subjectKeyID,
DNSNames: csr.DNSNames,
IPAddresses: csr.IPAddresses,
}
if useExpiredCert {
template.NotBefore = time.Now().Add(-13 * time.Hour)
template.NotAfter = time.Now().Add(-1 * time.Hour)
}
// Create the certificate, PEM encode it and return that value.
var buf bytes.Buffer
bs, err := x509.CreateCertificate(
rand.Reader, &template, caCert, csr.PublicKey, signer)
if err != nil {
return nil, fmt.Errorf("error creating cert pem from CSR: %w", err)
}
err = pem.Encode(&buf, &pem.Block{Type: "CERTIFICATE", Bytes: bs})
if err != nil {
return nil, fmt.Errorf("error encoding cert pem into text: %w", err)
}
leafPEM := buf.String()
leafCert, err := connect.ParseCert(leafPEM)
if err != nil {
return nil, fmt.Errorf("error parsing cert from generated leaf pem: %w", err)
}
index := s.nextIndex()
return &structs.IssuedCert{
SerialNumber: connect.EncodeSerialNumber(leafCert.SerialNumber),
CertPEM: leafPEM,
Service: serviceID.Service,
ServiceURI: leafCert.URIs[0].String(),
ValidAfter: leafCert.NotBefore,
ValidBefore: leafCert.NotAfter,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
}, nil
}

103
agent/leafcert/structs.go Normal file
View File

@ -0,0 +1,103 @@
package leafcert
import (
"fmt"
"net"
"time"
"github.com/mitchellh/hashstructure"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
// ConnectCALeafRequest is the cache.Request implementation for the
// ConnectCALeaf cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded
// directly to any Consul servers.
type ConnectCALeafRequest struct {
Token string
Datacenter string
DNSSAN []string
IPSAN []net.IP
MinQueryIndex uint64
MaxQueryTime time.Duration
acl.EnterpriseMeta
MustRevalidate bool
// The following flags indicate the entity we are requesting a cert for.
// Only one of these must be specified.
Service string // Given a Service name, not ID, the request is for a SpiffeIDService.
Agent string // Given an Agent name, not ID, the request is for a SpiffeIDAgent.
Kind structs.ServiceKind // Given "mesh-gateway", the request is for a SpiffeIDMeshGateway. No other kinds supported.
Server bool // If true, the request is for a SpiffeIDServer.
}
func (r *ConnectCALeafRequest) Key() string {
r.EnterpriseMeta.Normalize()
switch {
case r.Agent != "":
v, err := hashstructure.Hash([]any{
r.Agent,
r.PartitionOrDefault(),
}, nil)
if err == nil {
return fmt.Sprintf("agent:%d", v)
}
case r.Kind == structs.ServiceKindMeshGateway:
v, err := hashstructure.Hash([]any{
r.PartitionOrDefault(),
r.DNSSAN,
r.IPSAN,
}, nil)
if err == nil {
return fmt.Sprintf("kind:%d", v)
}
case r.Kind != "":
// this is not valid
case r.Server:
v, err := hashstructure.Hash([]any{
"server",
r.Datacenter,
}, nil)
if err == nil {
return fmt.Sprintf("server:%d", v)
}
default:
v, err := hashstructure.Hash([]any{
r.Service,
r.EnterpriseMeta,
r.DNSSAN,
r.IPSAN,
}, nil)
if err == nil {
return fmt.Sprintf("service:%d", v)
}
}
// If there is an error, we don't set the key. A blank key forces
// no cache for this request so the request is forwarded directly
// to the server.
return ""
}
func (req *ConnectCALeafRequest) TargetNamespace() string {
return req.NamespaceOrDefault()
}
func (req *ConnectCALeafRequest) TargetPartition() string {
return req.PartitionOrDefault()
}
func (r *ConnectCALeafRequest) CacheInfo() cache.RequestInfo {
return cache.RequestInfo{
Token: r.Token,
Key: r.Key(),
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
Timeout: r.MaxQueryTime,
MustRevalidate: r.MustRevalidate,
}
}

View File

@ -0,0 +1,79 @@
package leafcert
import (
"net"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestConnectCALeafRequest_Key(t *testing.T) {
key := func(r ConnectCALeafRequest) string {
return r.Key()
}
t.Run("service", func(t *testing.T) {
t.Run("name", func(t *testing.T) {
r1 := key(ConnectCALeafRequest{Service: "web"})
r2 := key(ConnectCALeafRequest{Service: "api"})
require.True(t, strings.HasPrefix(r1, "service:"), "Key %s does not start with service:", r1)
require.True(t, strings.HasPrefix(r2, "service:"), "Key %s does not start with service:", r2)
require.NotEqual(t, r1, r2, "Cache keys for different services should not be equal")
})
t.Run("dns-san", func(t *testing.T) {
r3 := key(ConnectCALeafRequest{Service: "foo", DNSSAN: []string{"a.com"}})
r4 := key(ConnectCALeafRequest{Service: "foo", DNSSAN: []string{"b.com"}})
require.NotEqual(t, r3, r4, "Cache keys for different DNSSAN should not be equal")
})
t.Run("ip-san", func(t *testing.T) {
r5 := key(ConnectCALeafRequest{Service: "foo", IPSAN: []net.IP{net.ParseIP("192.168.4.139")}})
r6 := key(ConnectCALeafRequest{Service: "foo", IPSAN: []net.IP{net.ParseIP("192.168.4.140")}})
require.NotEqual(t, r5, r6, "Cache keys for different IPSAN should not be equal")
})
})
t.Run("agent", func(t *testing.T) {
t.Run("name", func(t *testing.T) {
r1 := key(ConnectCALeafRequest{Agent: "abc"})
require.True(t, strings.HasPrefix(r1, "agent:"), "Key %s does not start with agent:", r1)
})
t.Run("dns-san ignored", func(t *testing.T) {
r3 := key(ConnectCALeafRequest{Agent: "foo", DNSSAN: []string{"a.com"}})
r4 := key(ConnectCALeafRequest{Agent: "foo", DNSSAN: []string{"b.com"}})
require.Equal(t, r3, r4, "DNSSAN is ignored for agent type")
})
t.Run("ip-san ignored", func(t *testing.T) {
r5 := key(ConnectCALeafRequest{Agent: "foo", IPSAN: []net.IP{net.ParseIP("192.168.4.139")}})
r6 := key(ConnectCALeafRequest{Agent: "foo", IPSAN: []net.IP{net.ParseIP("192.168.4.140")}})
require.Equal(t, r5, r6, "IPSAN is ignored for agent type")
})
})
t.Run("kind", func(t *testing.T) {
t.Run("invalid", func(t *testing.T) {
r1 := key(ConnectCALeafRequest{Kind: "terminating-gateway"})
require.Empty(t, r1)
})
t.Run("mesh-gateway", func(t *testing.T) {
t.Run("normal", func(t *testing.T) {
r1 := key(ConnectCALeafRequest{Kind: "mesh-gateway"})
require.True(t, strings.HasPrefix(r1, "kind:"), "Key %s does not start with kind:", r1)
})
t.Run("dns-san", func(t *testing.T) {
r3 := key(ConnectCALeafRequest{Kind: "mesh-gateway", DNSSAN: []string{"a.com"}})
r4 := key(ConnectCALeafRequest{Kind: "mesh-gateway", DNSSAN: []string{"b.com"}})
require.NotEqual(t, r3, r4, "Cache keys for different DNSSAN should not be equal")
})
t.Run("ip-san", func(t *testing.T) {
r5 := key(ConnectCALeafRequest{Kind: "mesh-gateway", IPSAN: []net.IP{net.ParseIP("192.168.4.139")}})
r6 := key(ConnectCALeafRequest{Kind: "mesh-gateway", IPSAN: []net.IP{net.ParseIP("192.168.4.140")}})
require.NotEqual(t, r5, r6, "Cache keys for different IPSAN should not be equal")
})
})
})
t.Run("server", func(t *testing.T) {
r1 := key(ConnectCALeafRequest{
Server: true,
Datacenter: "us-east",
})
require.True(t, strings.HasPrefix(r1, "server:"), "Key %s does not start with server:", r1)
})
}

63
agent/leafcert/util.go Normal file
View File

@ -0,0 +1,63 @@
package leafcert
import (
"time"
"github.com/hashicorp/consul/agent/structs"
)
// calculateSoftExpiry encapsulates our logic for when to renew a cert based on
// it's age. It returns a pair of times min, max which makes it easier to test
// the logic without non-deterministic jitter to account for. The caller should
// choose a time randomly in between these.
//
// We want to balance a few factors here:
// - renew too early and it increases the aggregate CSR rate in the cluster
// - renew too late and it risks disruption to the service if a transient
// error prevents the renewal
// - we want a broad amount of jitter so if there is an outage, we don't end
// up with all services in sync and causing a thundering herd every
// renewal period. Broader is better for smoothing requests but pushes
// both earlier and later tradeoffs above.
//
// Somewhat arbitrarily the current strategy looks like this:
//
// 0 60% 90%
// Issued [------------------------------|===============|!!!!!] Expires
// 72h TTL: 0 ~43h ~65h
// 1h TTL: 0 36m 54m
//
// Where |===| is the soft renewal period where we jitter for the first attempt
// and |!!!| is the danger zone where we just try immediately.
//
// In the happy path (no outages) the average renewal occurs half way through
// the soft renewal region or at 75% of the cert lifetime which is ~54 hours for
// a 72 hour cert, or 45 mins for a 1 hour cert.
//
// If we are already in the softRenewal period, we randomly pick a time between
// now and the start of the danger zone.
//
// We pass in now to make testing easier.
func calculateSoftExpiry(now time.Time, cert *structs.IssuedCert) (min time.Time, max time.Time) {
certLifetime := cert.ValidBefore.Sub(cert.ValidAfter)
if certLifetime < 10*time.Minute {
// Shouldn't happen as we limit to 1 hour shortest elsewhere but just be
// defensive against strange times or bugs.
return now, now
}
// Find the 60% mark in diagram above
softRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.6))
hardRenewTime := cert.ValidAfter.Add(time.Duration(float64(certLifetime) * 0.9))
if now.After(hardRenewTime) {
// In the hard renew period, or already expired. Renew now!
return now, now
}
if now.After(softRenewTime) {
// Already in the soft renew period, make now the lower bound for jitter
softRenewTime = now
}
return softRenewTime, hardRenewTime
}

133
agent/leafcert/util_test.go Normal file
View File

@ -0,0 +1,133 @@
package leafcert
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func TestCalculateSoftExpire(t *testing.T) {
tests := []struct {
name string
now string
issued string
lifetime time.Duration
wantMin string
wantMax string
}{
{
name: "72h just issued",
now: "2018-01-01 00:00:01",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Should jitter between 60% and 90% of the lifetime which is 43.2/64.8
// hours after issued
wantMin: "2018-01-02 19:12:00",
wantMax: "2018-01-03 16:48:00",
},
{
name: "72h in renew range",
// This time should be inside the renewal range.
now: "2018-01-02 20:00:20",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Min should be the "now" time
wantMin: "2018-01-02 20:00:20",
wantMax: "2018-01-03 16:48:00",
},
{
name: "72h in hard renew",
// This time should be inside the renewal range.
now: "2018-01-03 18:00:00",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-03 18:00:00",
wantMax: "2018-01-03 18:00:00",
},
{
name: "72h expired",
// This time is after expiry
now: "2018-01-05 00:00:00",
issued: "2018-01-01 00:00:00",
lifetime: 72 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-05 00:00:00",
wantMax: "2018-01-05 00:00:00",
},
{
name: "1h just issued",
now: "2018-01-01 00:00:01",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Should jitter between 60% and 90% of the lifetime which is 36/54 mins
// hours after issued
wantMin: "2018-01-01 00:36:00",
wantMax: "2018-01-01 00:54:00",
},
{
name: "1h in renew range",
// This time should be inside the renewal range.
now: "2018-01-01 00:40:00",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Min should be the "now" time
wantMin: "2018-01-01 00:40:00",
wantMax: "2018-01-01 00:54:00",
},
{
name: "1h in hard renew",
// This time should be inside the renewal range.
now: "2018-01-01 00:55:00",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-01 00:55:00",
wantMax: "2018-01-01 00:55:00",
},
{
name: "1h expired",
// This time is after expiry
now: "2018-01-01 01:01:01",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Hour,
// Min and max should both be the "now" time
wantMin: "2018-01-01 01:01:01",
wantMax: "2018-01-01 01:01:01",
},
{
name: "too short lifetime",
// This time is after expiry
now: "2018-01-01 01:01:01",
issued: "2018-01-01 00:00:00",
lifetime: 1 * time.Minute,
// Min and max should both be the "now" time
wantMin: "2018-01-01 01:01:01",
wantMax: "2018-01-01 01:01:01",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
now, err := time.Parse("2006-01-02 15:04:05", tc.now)
require.NoError(t, err)
issued, err := time.Parse("2006-01-02 15:04:05", tc.issued)
require.NoError(t, err)
wantMin, err := time.Parse("2006-01-02 15:04:05", tc.wantMin)
require.NoError(t, err)
wantMax, err := time.Parse("2006-01-02 15:04:05", tc.wantMax)
require.NoError(t, err)
min, max := calculateSoftExpiry(now, &structs.IssuedCert{
ValidAfter: issued,
ValidBefore: issued.Add(tc.lifetime),
})
require.Equal(t, wantMin, min)
require.Equal(t, wantMax, max)
})
}
}

160
agent/leafcert/watch.go Normal file
View File

@ -0,0 +1,160 @@
package leafcert
import (
"context"
"fmt"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/lib"
)
// Notify registers a desire to be updated about changes to a cache result.
//
// It is a helper that abstracts code from performing their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the
// context is canceled.
//
// The passed context must be canceled or timeout in order to free resources
// and stop maintaining the value in cache. Typically request-scoped resources
// do this but if a long-lived context like context.Background is used, then the
// caller must arrange for it to be canceled when the watch is no longer
// needed.
//
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
// fast enough it will block the notification loop. When the chan is later
// drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behavior transparently.
//
// The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (m *Manager) Notify(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
return m.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
select {
case ch <- event:
case <-ctx.Done():
}
})
}
// NotifyCallback allows you to receive notifications about changes to a cache
// result in the same way as Notify, but accepts a callback function instead of
// a channel.
func (m *Manager) NotifyCallback(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cache.Callback,
) error {
if req.Key() == "" {
return fmt.Errorf("a key is required")
}
// Lightweight copy this object so that manipulating req doesn't race.
dup := *req
req = &dup
if req.MaxQueryTime <= 0 {
req.MaxQueryTime = DefaultQueryTimeout
}
go m.notifyBlockingQuery(ctx, req, correlationID, cb)
return nil
}
func (m *Manager) notifyBlockingQuery(
ctx context.Context,
req *ConnectCALeafRequest,
correlationID string,
cb cache.Callback,
) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
failures := uint(0)
for {
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Blocking request
req.MinQueryIndex = index
newValue, meta, err := m.internalGet(ctx, req)
// Check context hasn't been canceled
if ctx.Err() != nil {
return
}
// Check the index of the value returned in the cache entry to be sure it
// changed
if index == 0 || index < meta.Index {
cb(ctx, cache.UpdateEvent{
CorrelationID: correlationID,
Result: newValue,
Meta: meta,
Err: err,
})
// Update index for next request
index = meta.Index
}
var wait time.Duration
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
wait = backOffWait(m.config, failures)
m.logger.
With("error", err).
With("index", index).
Warn("handling error in Manager.Notify")
}
if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if err == nil && index < 1 {
index = 1
}
}
}
func backOffWait(cfg Config, failures uint) time.Duration {
if failures > cfg.LeafCertRefreshBackoffMin {
shift := failures - cfg.LeafCertRefreshBackoffMin
waitTime := cfg.LeafCertRefreshMaxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > cfg.LeafCertRefreshMaxWait {
waitTime = cfg.LeafCertRefreshMaxWait
}
return waitTime + lib.RandomStagger(waitTime)
}
return 0
}

View File

@ -10,8 +10,6 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
@ -23,6 +21,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/private/pbpeering"
)
// ServerDataSourceDeps contains the dependencies needed for sourcing data from
@ -81,17 +80,6 @@ func CacheServiceGateways(c *cache.Cache) proxycfg.GatewayServices {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.ServiceGatewaysName}
}
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
// sourcing data from the agent cache.
//
// Note: there isn't a server-local equivalent of this data source because
// "agentless" proxies obtain certificates via SDS served by consul-dataplane.
// If SDS is not supported on consul-dataplane, data is sourced from the server agent cache
// even for "agentless" proxies.
func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
}
// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by
// sourcing data from the agent cache.
//

View File

@ -0,0 +1,25 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package proxycfgglue
import (
"context"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/proxycfg"
)
// LocalLeafCerts satisfies the proxycfg.LeafCertificate interface by sourcing data from
// the given leafcert.Manager.
func LocalLeafCerts(m *leafcert.Manager) proxycfg.LeafCertificate {
return &localLeafCerts{m}
}
type localLeafCerts struct {
leafCertManager *leafcert.Manager
}
func (c *localLeafCerts) Notify(ctx context.Context, req *leafcert.ConnectCALeafRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
return c.leafCertManager.NotifyCallback(ctx, req, correlationID, dispatchCacheUpdate(ch))
}

View File

@ -7,7 +7,7 @@ import (
"context"
"fmt"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
@ -489,7 +489,7 @@ func (h *handlerAPIGateway) watchIngressLeafCert(ctx context.Context, snap *Conf
snap.APIGateway.LeafCertWatchCancel()
}
ctx, cancel := context.WithCancel(ctx)
err := h.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
err := h.dataSources.LeafCertificate.Notify(ctx, &leafcert.ConnectCALeafRequest{
Datacenter: h.source.Datacenter,
Token: h.token,
Service: h.service,

View File

@ -11,13 +11,15 @@ import (
"path"
"strings"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/mitchellh/mapstructure"
)
type handlerConnectProxy struct {
@ -69,7 +71,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
}
// Watch the leaf cert
err = s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
err = s.dataSources.LeafCertificate.Notify(ctx, &leafcert.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Service: s.proxyCfg.DestinationServiceName,

View File

@ -8,6 +8,7 @@ import (
"errors"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
)
@ -212,7 +213,7 @@ type InternalServiceDump interface {
// LeafCertificate is the interface used to consume updates about a service's
// leaf certificate.
type LeafCertificate interface {
Notify(ctx context.Context, req *cachetype.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error
Notify(ctx context.Context, req *leafcert.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error
}
// PeeredUpstreams is the interface used to consume updates about upstreams

View File

@ -7,7 +7,7 @@ import (
"context"
"fmt"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
@ -222,7 +222,7 @@ func (s *handlerIngressGateway) watchIngressLeafCert(ctx context.Context, snap *
snap.IngressGateway.LeafCertWatchCancel()
}
ctx, cancel := context.WithCancel(ctx)
err := s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
err := s.dataSources.LeafCertificate.Notify(ctx, &leafcert.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Service: s.service,

View File

@ -10,10 +10,10 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
@ -130,7 +130,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{Token: "my-token"},
}
leafReq := &cachetype.ConnectCALeafRequest{
leafReq := &leafcert.ConnectCALeafRequest{
Datacenter: "dc1",
Token: "my-token",
Service: "web",
@ -358,7 +358,7 @@ func testManager_BasicLifecycle(
t *testing.T,
dataSources *TestDataSources,
rootsReq *structs.DCSpecificRequest,
leafReq *cachetype.ConnectCALeafRequest,
leafReq *leafcert.ConnectCALeafRequest,
roots *structs.IndexedCARoots,
webProxy *structs.NodeService,
expectSnap *ConfigSnapshot,

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/maps"
@ -392,7 +393,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
if hasExports && snap.MeshGateway.LeafCertWatchCancel == nil {
// no watch and we need one
ctx, cancel := context.WithCancel(ctx)
err := s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
err := s.dataSources.LeafCertificate.Notify(ctx, &leafcert.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Kind: structs.ServiceKindMeshGateway,

View File

@ -10,15 +10,15 @@ import (
"testing"
"time"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
apimod "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/private/pbpeering"
@ -139,7 +139,7 @@ func recordWatches(sc *stateConfig) *watchRecorder {
IntentionUpstreams: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr},
IntentionUpstreamsDestination: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr},
InternalServiceDump: typedWatchRecorder[*structs.ServiceDumpRequest]{wr},
LeafCertificate: typedWatchRecorder[*cachetype.ConnectCALeafRequest]{wr},
LeafCertificate: typedWatchRecorder[*leafcert.ConnectCALeafRequest]{wr},
PeeringList: typedWatchRecorder[*cachetype.PeeringListRequest]{wr},
PeeredUpstreams: typedWatchRecorder[*structs.PartitionSpecificRequest]{wr},
PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr},
@ -224,7 +224,7 @@ func genVerifyTrustBundleReadWatch(peer string) verifyWatchRequest {
func genVerifyLeafWatchWithDNSSANs(expectedService string, expectedDatacenter string, expectedDNSSANs []string) verifyWatchRequest {
return func(t testing.TB, request any) {
reqReal, ok := request.(*cachetype.ConnectCALeafRequest)
reqReal, ok := request.(*leafcert.ConnectCALeafRequest)
reqReal.Token = aclToken
require.True(t, ok)
require.Equal(t, aclToken, reqReal.Token)

View File

@ -8,7 +8,7 @@ import (
"fmt"
"strings"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
)
@ -172,7 +172,7 @@ func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u UpdateEv
// This cert is used to terminate mTLS connections on the service's behalf
if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok {
ctx, cancel := context.WithCancel(ctx)
err := s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
err := s.dataSources.LeafCertificate.Notify(ctx, &leafcert.ConnectCALeafRequest{
Datacenter: s.source.Datacenter,
Token: s.token,
Service: svc.Service.Name,

View File

@ -21,6 +21,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/private/pbpeering"
@ -749,7 +750,7 @@ func testConfigSnapshotFixture(
IntentionUpstreams: &noopDataSource[*structs.ServiceSpecificRequest]{},
IntentionUpstreamsDestination: &noopDataSource[*structs.ServiceSpecificRequest]{},
InternalServiceDump: &noopDataSource[*structs.ServiceDumpRequest]{},
LeafCertificate: &noopDataSource[*cachetype.ConnectCALeafRequest]{},
LeafCertificate: &noopDataSource[*leafcert.ConnectCALeafRequest]{},
PeeringList: &noopDataSource[*cachetype.PeeringListRequest]{},
PeeredUpstreams: &noopDataSource[*structs.PartitionSpecificRequest]{},
PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{},
@ -954,7 +955,7 @@ func NewTestDataSources() *TestDataSources {
IntentionUpstreams: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
IntentionUpstreamsDestination: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
InternalServiceDump: NewTestDataSource[*structs.ServiceDumpRequest, *structs.IndexedCheckServiceNodes](),
LeafCertificate: NewTestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert](),
LeafCertificate: NewTestDataSource[*leafcert.ConnectCALeafRequest, *structs.IssuedCert](),
PeeringList: NewTestDataSource[*cachetype.PeeringListRequest, *pbpeering.PeeringListResponse](),
PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](),
ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](),
@ -981,7 +982,7 @@ type TestDataSources struct {
IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
IntentionUpstreamsDestination *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedCheckServiceNodes]
LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert]
LeafCertificate *TestDataSource[*leafcert.ConnectCALeafRequest, *structs.IssuedCert]
PeeringList *TestDataSource[*cachetype.PeeringListRequest, *pbpeering.PeeringListResponse]
PeeredUpstreams *TestDataSource[*structs.PartitionSpecificRequest, *structs.IndexedPeeredServiceList]
PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse]

View File

@ -5,6 +5,7 @@ package agent
import (
"context"
"errors"
"fmt"
"io"
"net"
@ -33,6 +34,7 @@ import (
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
grpcWare "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
@ -57,13 +59,41 @@ type BaseDeps struct {
MetricsConfig *lib.MetricsConfig
AutoConfig *autoconf.AutoConfig // TODO: use an interface
Cache *cache.Cache
LeafCertManager *leafcert.Manager
ViewStore *submatview.Store
WatchedFiles []string
NetRPC *LazyNetRPC
deregisterBalancer, deregisterResolver func()
stopHostCollector context.CancelFunc
}
type NetRPC interface {
RPC(ctx context.Context, method string, args any, reply any) error
}
type LazyNetRPC struct {
mu sync.RWMutex
rpc NetRPC
}
func (r *LazyNetRPC) SetNetRPC(rpc NetRPC) {
r.mu.Lock()
defer r.mu.Unlock()
r.rpc = rpc
}
func (r *LazyNetRPC) RPC(ctx context.Context, method string, args any, reply any) error {
r.mu.RLock()
r2 := r.rpc
r.mu.RUnlock()
if r2 == nil {
return errors.New("rpc: initialization ordering error; net-rpc not ready yet")
}
return r2.RPC(ctx, method, args, reply)
}
type ConfigLoader func(source config.Source) (config.LoadResult, error)
func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hclog.InterceptLogger) (BaseDeps, error) {
@ -141,6 +171,18 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore"))
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
d.NetRPC = &LazyNetRPC{}
// TODO: create leafCertManager in BaseDeps once NetRPC is available without Agent
d.LeafCertManager = leafcert.NewManager(leafcert.Deps{
Logger: d.Logger.Named("leaf-certs"),
CertSigner: leafcert.NewNetRPCCertSigner(d.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(d.Cache, cfg.Datacenter),
Config: leafcert.Config{
TestOverrideCAChangeInitialDelay: cfg.ConnectTestCALeafRootChangeSpread,
},
})
agentType := "client"
if cfg.ServerMode {
agentType = "server"
@ -198,6 +240,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
ServerProvider: d.Router,
TLSConfigurator: d.TLSConfigurator,
Cache: d.Cache,
LeafCertManager: d.LeafCertManager,
Tokens: d.Tokens,
EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps, cfg),
}
@ -221,6 +264,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
// handled by something else (e.g. the agent stop channel).
func (bd BaseDeps) Close() {
bd.AutoConfig.Stop()
bd.LeafCertManager.Stop()
bd.MetricsConfig.Cancel()
for _, fn := range []func(){bd.deregisterBalancer, bd.deregisterResolver, bd.stopHostCollector} {