Merge pull request #10479 from hashicorp/dnephin/ca-provider-explore-2

ca: move Server.SignIntermediate to CAManager
This commit is contained in:
Daniel Nephin 2021-07-12 19:03:43 -04:00 committed by GitHub
commit 0ccad1d6f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 449 additions and 402 deletions

View File

@ -12,9 +12,8 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/acmpca"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"

View File

@ -0,0 +1,96 @@
package consul
import (
"crypto/x509"
"fmt"
"net"
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
)
type autoConfigBackend struct {
Server *Server
}
func (b autoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) {
return b.Server.ForwardRPC(method, info, reply)
}
func (b autoConfigBackend) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) {
return b.Server.caManager.SignCertificate(csr, id)
}
// GetCARoots returns the CA roots.
func (b autoConfigBackend) GetCARoots() (*structs.IndexedCARoots, error) {
return b.Server.getCARoots(nil, b.Server.fsm.State())
}
// DatacenterJoinAddresses will return all the strings suitable for usage in
// retry join operations to connect to the the LAN or LAN segment gossip pool.
func (b autoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) {
members, err := b.Server.LANSegmentMembers(segment)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err)
}
var joinAddrs []string
for _, m := range members {
if ok, _ := metadata.IsConsulServer(m); ok {
serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)}
joinAddrs = append(joinAddrs, serfAddr.String())
}
}
return joinAddrs, nil
}
// CreateACLToken will create an ACL token from the given template
func (b autoConfigBackend) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) {
// we have to require local tokens or else it would require having these servers use a token with acl:write to make a
// token create RPC to the servers in the primary DC.
if !b.Server.LocalTokensEnabled() {
return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", b.Server.config.Datacenter)
}
newToken := *template
// generate the accessor id
if newToken.AccessorID == "" {
accessor, err := lib.GenerateUUID(b.Server.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.AccessorID = accessor
}
// generate the secret id
if newToken.SecretID == "" {
secret, err := lib.GenerateUUID(b.Server.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.SecretID = secret
}
newToken.CreateTime = time.Now()
req := structs.ACLTokenBatchSetRequest{
Tokens: structs.ACLTokens{&newToken},
CAS: false,
}
// perform the request to mint the new token
if _, err := b.Server.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil {
return nil, err
}
// return the full token definition from the FSM
_, token, err := b.Server.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta)
return token, err
}

View File

@ -0,0 +1,115 @@
package consul
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func TestAutoConfigBackend_DatacenterJoinAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
conf := testClusterConfig{
Datacenter: "primary",
Servers: 3,
}
nodes := newTestCluster(t, &conf)
var expected []string
for _, srv := range nodes.Servers {
expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort))
}
backend := autoConfigBackend{Server: nodes.Servers[0]}
actual, err := backend.DatacenterJoinAddresses("")
require.NoError(t, err)
require.ElementsMatch(t, expected, actual)
}
func TestAutoConfigBackend_CreateACLToken(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, srv, codec := testACLServerWithConfig(t, nil, false)
waitForLeaderEstablishment(t, srv)
r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1")
require.NoError(t, err)
t.Run("predefined-ids", func(t *testing.T) {
accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3"
secret := "ef453f31-ad58-4ec8-8bf8-342e99763026"
in := &structs.ACLToken{
AccessorID: accessor,
SecretID: secret,
Description: "test",
Policies: []structs.ACLTokenPolicyLink{
{
ID: structs.ACLPolicyGlobalManagementID,
},
},
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
ServiceIdentities: []*structs.ACLServiceIdentity{
{
ServiceName: "web",
},
},
Roles: []structs.ACLTokenRoleLink{
{
ID: r1.ID,
},
},
}
b := autoConfigBackend{Server: srv}
out, err := b.CreateACLToken(in)
require.NoError(t, err)
require.Equal(t, accessor, out.AccessorID)
require.Equal(t, secret, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.Policies, 1)
require.Len(t, out.Roles, 1)
require.Len(t, out.NodeIdentities, 1)
require.Len(t, out.ServiceIdentities, 1)
require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
require.Equal(t, "web", out.ServiceIdentities[0].ServiceName)
require.Equal(t, r1.ID, out.Roles[0].ID)
})
t.Run("autogen-ids", func(t *testing.T) {
in := &structs.ACLToken{
Description: "test",
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
}
b := autoConfigBackend{Server: srv}
out, err := b.CreateACLToken(in)
require.NoError(t, err)
require.NotEmpty(t, out.AccessorID)
require.NotEmpty(t, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.NodeIdentities, 1)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
})
}

View File

@ -109,7 +109,6 @@ type AutoConfigBackend interface {
CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error)
DatacenterJoinAddresses(segment string) ([]string, error)
ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error)
GetCARoots() (*structs.IndexedCARoots, error)
SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error)
}

View File

@ -196,7 +196,7 @@ func TestAutoConfigInitialConfiguration(t *testing.T) {
waitForLeaderEstablishment(t, s)
roots, err := s.GetCARoots()
roots, err := s.getCARoots(nil, s.fsm.State())
require.NoError(t, err)
pbroots, err := translateCARootsToProtobuf(roots)

View File

@ -192,7 +192,7 @@ func (s *ConnectCA) Sign(
}
}
cert, err := s.srv.SignCertificate(csr, spiffeID)
cert, err := s.srv.caManager.SignCertificate(csr, spiffeID)
if err != nil {
return err
}

View File

@ -492,7 +492,6 @@ func (c *FSM) applyConnectCALeafOperation(buf []byte, index uint64) interface{}
switch req.Op {
case structs.CALeafOpIncrementIndex:
// Use current index as the new value as well as the value to write at.
// TODO(banks) do we even use this op any more?
if err := c.state.CALeafSetIndex(index, index); err != nil {
return err
}

View File

@ -2,8 +2,10 @@ package consul
import (
"context"
"crypto/x509"
"errors"
"fmt"
"net/url"
"reflect"
"strings"
"sync"
@ -11,6 +13,9 @@ import (
"github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/lib/semaphore"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/connect/ca"
@ -34,6 +39,7 @@ const (
type caServerDelegate interface {
ca.ConsulProviderStateDelegate
IsLeader() bool
ApplyCALeafRequest() (uint64, error)
forwardDC(method, dc string, args interface{}, reply interface{}) error
generateCASignRequest(csr string) *structs.CASignRequest
@ -48,6 +54,8 @@ type CAManager struct {
delegate caServerDelegate
serverConf *Config
logger hclog.Logger
// rate limiter to use when signing leaf certificates
caLeafLimiter connectSignRateLimiter
providerLock sync.RWMutex
// provider is the current CA provider in use for Connect. This is
@ -82,6 +90,32 @@ func (c *caDelegateWithState) ApplyCARequest(req *structs.CARequest) (interface{
return c.Server.raftApplyMsgpack(structs.ConnectCARequestType, req)
}
func (c *caDelegateWithState) ApplyCALeafRequest() (uint64, error) {
// TODO(banks): when we implement IssuedCerts table we can use the insert to
// that as the raft index to return in response.
//
// UPDATE(mkeeler): The original implementation relied on updating the CAConfig
// and using its index as the ModifyIndex for certs. This was buggy. The long
// term goal is still to insert some metadata into raft about the certificates
// and use that raft index for the ModifyIndex. This is a partial step in that
// direction except that we only are setting an index and not storing the
// metadata.
req := structs.CALeafRequest{
Op: structs.CALeafOpIncrementIndex,
Datacenter: c.Server.config.Datacenter,
}
resp, err := c.Server.raftApplyMsgpack(structs.ConnectCALeafRequestType|structs.IgnoreUnknownTypeFlag, &req)
if err != nil {
return 0, err
}
modIdx, ok := resp.(uint64)
if !ok {
return 0, fmt.Errorf("Invalid response from updating the leaf cert index")
}
return modIdx, err
}
func (c *caDelegateWithState) generateCASignRequest(csr string) *structs.CASignRequest {
return &structs.CASignRequest{
Datacenter: c.Server.config.PrimaryDatacenter,
@ -149,8 +183,8 @@ func (c *CAManager) getPrimaryRoots() structs.IndexedCARoots {
// when setting up the CA during establishLeadership. The state should be set to
// non-ready before calling this.
func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) {
state := c.delegate.State()
_, config, err := state.CAConfig(nil)
st := c.delegate.State()
_, config, err := st.CAConfig(nil)
if err != nil {
return nil, err
}
@ -1287,3 +1321,195 @@ func (c *CAManager) configuredSecondaryCA() bool {
defer c.stateLock.Unlock()
return c.actingSecondaryCA
}
type connectSignRateLimiter struct {
// csrRateLimiter limits the rate of signing new certs if configured. Lazily
// initialized from current config to support dynamic changes.
// csrRateLimiterMu must be held while dereferencing the pointer or storing a
// new one, but methods can be called on the limiter object outside of the
// locked section. This is done only in the getCSRRateLimiterWithLimit method.
csrRateLimiter *rate.Limiter
csrRateLimiterMu sync.RWMutex
// csrConcurrencyLimiter is a dynamically resizable semaphore used to limit
// Sign RPC concurrency if configured. The zero value is usable as soon as
// SetSize is called which we do dynamically in the RPC handler to avoid
// having to hook elaborate synchronization mechanisms through the CA config
// endpoint and config reload etc.
csrConcurrencyLimiter semaphore.Dynamic
}
// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set.
// It uses the shared server-wide limiter unless the limit has been changed in
// config or the limiter has not been setup yet in which case it just-in-time
// configures the new limiter. We assume that limit changes are relatively rare
// and that all callers (there is currently only one) use the same config value
// as the limit. There might be some flapping if there are multiple concurrent
// requests in flight at the time the config changes where A sees the new value
// and updates, B sees the old but then gets this lock second and changes back.
// Eventually though and very soon (once all current RPCs are complete) we are
// guaranteed to have the correct limit set by the next RPC that comes in so I
// assume this is fine. If we observe strange behavior because of it, we could
// add hysteresis that prevents changes too soon after a previous change but
// that seems unnecessary for now.
func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter {
l.csrRateLimiterMu.RLock()
lim := l.csrRateLimiter
l.csrRateLimiterMu.RUnlock()
// If there is a current limiter with the same limit, return it. This should
// be the common case.
if lim != nil && lim.Limit() == limit {
return lim
}
// Need to change limiter, get write lock
l.csrRateLimiterMu.Lock()
defer l.csrRateLimiterMu.Unlock()
// No limiter yet, or limit changed in CA config, reconfigure a new limiter.
// We use burst of 1 for a hard limit. Note that either bursting or waiting is
// necessary to get expected behavior in fact of random arrival times, but we
// don't need both and we use Wait with a small delay to smooth noise. See
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md.
l.csrRateLimiter = rate.NewLimiter(limit, 1)
return l.csrRateLimiter
}
func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) {
provider, caRoot := c.getCAProvider()
if provider == nil {
return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil")
} else if caRoot == nil {
return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate")
}
// Verify that the CSR entity is in the cluster's trust domain
state := c.delegate.State()
_, config, err := state.CAConfig(nil)
if err != nil {
return nil, err
}
signingID := connect.SpiffeIDSigningForCluster(config)
serviceID, isService := spiffeID.(*connect.SpiffeIDService)
agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent)
if !isService && !isAgent {
return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID")
}
var entMeta structs.EnterpriseMeta
if isService {
if !signingID.CanSign(spiffeID) {
return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+
"we are %s", serviceID.Host, signingID.Host())
}
entMeta.Merge(serviceID.GetEnterpriseMeta())
} else {
// isAgent - if we support more ID types then this would need to be an else if
// here we are just automatically fixing the trust domain. For auto-encrypt and
// auto-config they make certificate requests before learning about the roots
// so they will have a dummy trust domain in the CSR.
trustDomain := signingID.Host()
if agentID.Host != trustDomain {
originalURI := agentID.URI()
agentID.Host = trustDomain
// recreate the URIs list
uris := make([]*url.URL, len(csr.URIs))
for i, uri := range csr.URIs {
if originalURI.String() == uri.String() {
uris[i] = agentID.URI()
} else {
uris[i] = uri
}
}
csr.URIs = uris
}
entMeta.Merge(structs.DefaultEnterpriseMeta())
}
commonCfg, err := config.GetCommonConfig()
if err != nil {
return nil, err
}
if commonCfg.CSRMaxPerSecond > 0 {
lim := c.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond))
// Wait up to the small threshold we allow for a token.
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
defer cancel()
if lim.Wait(ctx) != nil {
return nil, ErrRateLimited
}
} else if commonCfg.CSRMaxConcurrent > 0 {
c.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent))
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
defer cancel()
if err := c.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil {
return nil, ErrRateLimited
}
defer c.caLeafLimiter.csrConcurrencyLimiter.Release()
}
connect.HackSANExtensionForCSR(csr)
// All seems to be in order, actually sign it.
pem, err := provider.Sign(csr)
if err == ca.ErrRateLimited {
return nil, ErrRateLimited
}
if err != nil {
return nil, err
}
// Append any intermediates needed by this root.
for _, p := range caRoot.IntermediateCerts {
pem = pem + ca.EnsureTrailingNewline(p)
}
// Append our local CA's intermediate if there is one.
inter, err := provider.ActiveIntermediate()
if err != nil {
return nil, err
}
root, err := provider.ActiveRoot()
if err != nil {
return nil, err
}
if inter != root {
pem = pem + ca.EnsureTrailingNewline(inter)
}
modIdx, err := c.delegate.ApplyCALeafRequest()
if err != nil {
return nil, err
}
cert, err := connect.ParseCert(pem)
if err != nil {
return nil, err
}
// Set the response
reply := structs.IssuedCert{
SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber),
CertPEM: pem,
ValidAfter: cert.NotBefore,
ValidBefore: cert.NotAfter,
EnterpriseMeta: entMeta,
RaftIndex: structs.RaftIndex{
ModifyIndex: modIdx,
CreateIndex: modIdx,
},
}
if isService {
reply.Service = serviceID.Service
reply.ServiceURI = cert.URIs[0].String()
} else if isAgent {
reply.Agent = agentID.Agent
reply.AgentURI = cert.URIs[0].String()
}
return &reply, nil
}

View File

@ -62,6 +62,10 @@ func (m *mockCAServerDelegate) CheckServers(datacenter string, fn func(*metadata
})
}
func (m *mockCAServerDelegate) ApplyCALeafRequest() (uint64, error) {
return 3, nil
}
// ApplyCARequest mirrors FSM.applyConnectCAOperation because that functionality
// is not exported.
func (m *mockCAServerDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {

View File

@ -873,7 +873,7 @@ func (s *Server) setupRPC() error {
authz = &disabledAuthorizer{}
}
// now register with the insecure RPC server
s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, s, authz))
s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, autoConfigBackend{Server: s}, authz))
ln, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {
@ -1450,72 +1450,6 @@ func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
}
// CreateACLToken will create an ACL token from the given template
func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) {
// we have to require local tokens or else it would require having these servers use a token with acl:write to make a
// token create RPC to the servers in the primary DC.
if !s.LocalTokensEnabled() {
return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter)
}
newToken := *template
// generate the accessor id
if newToken.AccessorID == "" {
accessor, err := lib.GenerateUUID(s.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.AccessorID = accessor
}
// generate the secret id
if newToken.SecretID == "" {
secret, err := lib.GenerateUUID(s.checkTokenUUID)
if err != nil {
return nil, err
}
newToken.SecretID = secret
}
newToken.CreateTime = time.Now()
req := structs.ACLTokenBatchSetRequest{
Tokens: structs.ACLTokens{&newToken},
CAS: false,
}
// perform the request to mint the new token
if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil {
return nil, err
}
// return the full token definition from the FSM
_, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta)
return token, err
}
// DatacenterJoinAddresses will return all the strings suitable for usage in
// retry join operations to connect to the the LAN or LAN segment gossip pool.
func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) {
members, err := s.LANSegmentMembers(segment)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err)
}
var joinAddrs []string
for _, m := range members {
if ok, _ := metadata.IsConsulServer(m); ok {
serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)}
joinAddrs = append(joinAddrs, serfAddr.String())
}
}
return joinAddrs, nil
}
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.

View File

@ -1,80 +1,16 @@
package consul
import (
"context"
"crypto/x509"
"fmt"
"net/url"
"sync"
memdb "github.com/hashicorp/go-memdb"
"golang.org/x/time/rate"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/semaphore"
)
type connectSignRateLimiter struct {
// csrRateLimiter limits the rate of signing new certs if configured. Lazily
// initialized from current config to support dynamic changes.
// csrRateLimiterMu must be held while dereferencing the pointer or storing a
// new one, but methods can be called on the limiter object outside of the
// locked section. This is done only in the getCSRRateLimiterWithLimit method.
csrRateLimiter *rate.Limiter
csrRateLimiterMu sync.RWMutex
// csrConcurrencyLimiter is a dynamically resizable semaphore used to limit
// Sign RPC concurrency if configured. The zero value is usable as soon as
// SetSize is called which we do dynamically in the RPC handler to avoid
// having to hook elaborate synchronization mechanisms through the CA config
// endpoint and config reload etc.
csrConcurrencyLimiter semaphore.Dynamic
}
// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set.
// It uses the shared server-wide limiter unless the limit has been changed in
// config or the limiter has not been setup yet in which case it just-in-time
// configures the new limiter. We assume that limit changes are relatively rare
// and that all callers (there is currently only one) use the same config value
// as the limit. There might be some flapping if there are multiple concurrent
// requests in flight at the time the config changes where A sees the new value
// and updates, B sees the old but then gets this lock second and changes back.
// Eventually though and very soon (once all current RPCs are complete) we are
// guaranteed to have the correct limit set by the next RPC that comes in so I
// assume this is fine. If we observe strange behavior because of it, we could
// add hysteresis that prevents changes too soon after a previous change but
// that seems unnecessary for now.
func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter {
l.csrRateLimiterMu.RLock()
lim := l.csrRateLimiter
l.csrRateLimiterMu.RUnlock()
// If there is a current limiter with the same limit, return it. This should
// be the common case.
if lim != nil && lim.Limit() == limit {
return lim
}
// Need to change limiter, get write lock
l.csrRateLimiterMu.Lock()
defer l.csrRateLimiterMu.Unlock()
// No limiter yet, or limit changed in CA config, reconfigure a new limiter.
// We use burst of 1 for a hard limit. Note that either bursting or waiting is
// necessary to get expected behavior in fact of random arrival times, but we
// don't need both and we use Wait with a small delay to smooth noise. See
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md.
l.csrRateLimiter = rate.NewLimiter(limit, 1)
return l.csrRateLimiter
}
// GetCARoots will retrieve
func (s *Server) GetCARoots() (*structs.IndexedCARoots, error) {
return s.getCARoots(nil, s.fsm.State())
}
func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.IndexedCARoots, error) {
index, roots, config, err := state.CARootsAndConfig(ws)
if err != nil {
@ -137,162 +73,3 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind
return indexedRoots, nil
}
// TODO: Move this off Server. This is only called by RPC endpoints.
func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) {
provider, caRoot := s.caManager.getCAProvider()
if provider == nil {
return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil")
} else if caRoot == nil {
return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate")
}
// Verify that the CSR entity is in the cluster's trust domain
state := s.fsm.State()
_, config, err := state.CAConfig(nil)
if err != nil {
return nil, err
}
signingID := connect.SpiffeIDSigningForCluster(config)
serviceID, isService := spiffeID.(*connect.SpiffeIDService)
agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent)
if !isService && !isAgent {
return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID")
}
var entMeta structs.EnterpriseMeta
if isService {
if !signingID.CanSign(spiffeID) {
return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+
"we are %s", serviceID.Host, signingID.Host())
}
entMeta.Merge(serviceID.GetEnterpriseMeta())
} else {
// isAgent - if we support more ID types then this would need to be an else if
// here we are just automatically fixing the trust domain. For auto-encrypt and
// auto-config they make certificate requests before learning about the roots
// so they will have a dummy trust domain in the CSR.
trustDomain := signingID.Host()
if agentID.Host != trustDomain {
originalURI := agentID.URI()
agentID.Host = trustDomain
// recreate the URIs list
uris := make([]*url.URL, len(csr.URIs))
for i, uri := range csr.URIs {
if originalURI.String() == uri.String() {
uris[i] = agentID.URI()
} else {
uris[i] = uri
}
}
csr.URIs = uris
}
entMeta.Merge(structs.DefaultEnterpriseMeta())
}
commonCfg, err := config.GetCommonConfig()
if err != nil {
return nil, err
}
if commonCfg.CSRMaxPerSecond > 0 {
lim := s.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond))
// Wait up to the small threshold we allow for a token.
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
defer cancel()
if lim.Wait(ctx) != nil {
return nil, ErrRateLimited
}
} else if commonCfg.CSRMaxConcurrent > 0 {
s.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent))
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
defer cancel()
if err := s.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil {
return nil, ErrRateLimited
}
defer s.caLeafLimiter.csrConcurrencyLimiter.Release()
}
connect.HackSANExtensionForCSR(csr)
// All seems to be in order, actually sign it.
pem, err := provider.Sign(csr)
if err == ca.ErrRateLimited {
return nil, ErrRateLimited
}
if err != nil {
return nil, err
}
// Append any intermediates needed by this root.
for _, p := range caRoot.IntermediateCerts {
pem = pem + ca.EnsureTrailingNewline(p)
}
// Append our local CA's intermediate if there is one.
inter, err := provider.ActiveIntermediate()
if err != nil {
return nil, err
}
root, err := provider.ActiveRoot()
if err != nil {
return nil, err
}
if inter != root {
pem = pem + ca.EnsureTrailingNewline(inter)
}
// TODO(banks): when we implement IssuedCerts table we can use the insert to
// that as the raft index to return in response.
//
// UPDATE(mkeeler): The original implementation relied on updating the CAConfig
// and using its index as the ModifyIndex for certs. This was buggy. The long
// term goal is still to insert some metadata into raft about the certificates
// and use that raft index for the ModifyIndex. This is a partial step in that
// direction except that we only are setting an index and not storing the
// metadata.
req := structs.CALeafRequest{
Op: structs.CALeafOpIncrementIndex,
Datacenter: s.config.Datacenter,
}
resp, err := s.raftApply(structs.ConnectCALeafRequestType|structs.IgnoreUnknownTypeFlag, &req)
if err != nil {
return nil, err
}
modIdx, ok := resp.(uint64)
if !ok {
return nil, fmt.Errorf("Invalid response from updating the leaf cert index")
}
cert, err := connect.ParseCert(pem)
if err != nil {
return nil, err
}
// Set the response
reply := structs.IssuedCert{
SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber),
CertPEM: pem,
ValidAfter: cert.NotBefore,
ValidBefore: cert.NotAfter,
EnterpriseMeta: entMeta,
RaftIndex: structs.RaftIndex{
ModifyIndex: modIdx,
CreateIndex: modIdx,
},
}
if isService {
reply.Service = serviceID.Service
reply.ServiceURI = cert.URIs[0].String()
} else if isAgent {
reply.Agent = agentID.Agent
reply.AgentURI = cert.URIs[0].String()
}
return &reply, nil
}

View File

@ -1654,105 +1654,3 @@ func TestServer_CALogging(t *testing.T) {
require.Contains(t, buf.String(), "consul CA provider configured")
}
func TestServer_DatacenterJoinAddresses(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
conf := testClusterConfig{
Datacenter: "primary",
Servers: 3,
}
nodes := newTestCluster(t, &conf)
var expected []string
for _, srv := range nodes.Servers {
expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort))
}
actual, err := nodes.Servers[0].DatacenterJoinAddresses("")
require.NoError(t, err)
require.ElementsMatch(t, expected, actual)
}
func TestServer_CreateACLToken(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, srv, codec := testACLServerWithConfig(t, nil, false)
waitForLeaderEstablishment(t, srv)
r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1")
require.NoError(t, err)
t.Run("predefined-ids", func(t *testing.T) {
accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3"
secret := "ef453f31-ad58-4ec8-8bf8-342e99763026"
in := &structs.ACLToken{
AccessorID: accessor,
SecretID: secret,
Description: "test",
Policies: []structs.ACLTokenPolicyLink{
{
ID: structs.ACLPolicyGlobalManagementID,
},
},
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
ServiceIdentities: []*structs.ACLServiceIdentity{
{
ServiceName: "web",
},
},
Roles: []structs.ACLTokenRoleLink{
{
ID: r1.ID,
},
},
}
out, err := srv.CreateACLToken(in)
require.NoError(t, err)
require.Equal(t, accessor, out.AccessorID)
require.Equal(t, secret, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.Policies, 1)
require.Len(t, out.Roles, 1)
require.Len(t, out.NodeIdentities, 1)
require.Len(t, out.ServiceIdentities, 1)
require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
require.Equal(t, "web", out.ServiceIdentities[0].ServiceName)
require.Equal(t, r1.ID, out.Roles[0].ID)
})
t.Run("autogen-ids", func(t *testing.T) {
in := &structs.ACLToken{
Description: "test",
NodeIdentities: []*structs.ACLNodeIdentity{
{
NodeName: "foo",
Datacenter: "bar",
},
},
}
out, err := srv.CreateACLToken(in)
require.NoError(t, err)
require.NotEmpty(t, out.AccessorID)
require.NotEmpty(t, out.SecretID)
require.Equal(t, "test", out.Description)
require.NotZero(t, out.CreateTime)
require.Len(t, out.NodeIdentities, 1)
require.Equal(t, "foo", out.NodeIdentities[0].NodeName)
})
}