From dfebfe508ef25dbab1bfa5d96c100656ed21d0ba Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 23 Jun 2021 16:30:59 -0400 Subject: [PATCH 1/4] ca: move SignCertificate to CAManager To reduce the scope of Server, and keep all the CA logic together --- agent/consul/auto_config_endpoint.go | 9 +++++- agent/consul/connect_ca_endpoint.go | 2 +- agent/consul/fsm/commands_oss.go | 1 - agent/consul/leader_connect_ca.go | 26 ++++++++++++++++ agent/consul/leader_connect_ca_test.go | 4 +++ agent/consul/server.go | 4 ++- agent/consul/server_connect.go | 41 ++++++++------------------ 7 files changed, 54 insertions(+), 33 deletions(-) diff --git a/agent/consul/auto_config_endpoint.go b/agent/consul/auto_config_endpoint.go index 53007bde73..1b6d64f30d 100644 --- a/agent/consul/auto_config_endpoint.go +++ b/agent/consul/auto_config_endpoint.go @@ -109,11 +109,18 @@ type AutoConfigBackend interface { CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) DatacenterJoinAddresses(segment string) ([]string, error) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) - GetCARoots() (*structs.IndexedCARoots, error) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) } +type autoConfigBackend struct { + *Server +} + +func (b autoConfigBackend) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) { + return b.Server.caManager.SignCertificate(csr, id) +} + // AutoConfig endpoint is used for cluster auto configuration operations type AutoConfig struct { // currently AutoConfig does not support pushing down any configuration that would be reloadable on the servers diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index 93f3b55fb3..9110a5bd3c 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -192,7 +192,7 @@ func (s *ConnectCA) Sign( } } - cert, err := s.srv.SignCertificate(csr, spiffeID) + cert, err := s.srv.caManager.SignCertificate(csr, spiffeID) if err != nil { return err } diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index d556599a2f..a9c4bc699b 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -492,7 +492,6 @@ func (c *FSM) applyConnectCALeafOperation(buf []byte, index uint64) interface{} switch req.Op { case structs.CALeafOpIncrementIndex: // Use current index as the new value as well as the value to write at. - // TODO(banks) do we even use this op any more? if err := c.state.CALeafSetIndex(index, index); err != nil { return err } diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index 9bbf085912..5c1b804ca0 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -34,6 +34,7 @@ const ( type caServerDelegate interface { ca.ConsulProviderStateDelegate IsLeader() bool + ApplyCALeafRequest() (uint64, error) forwardDC(method, dc string, args interface{}, reply interface{}) error generateCASignRequest(csr string) *structs.CASignRequest @@ -48,6 +49,8 @@ type CAManager struct { delegate caServerDelegate serverConf *Config logger hclog.Logger + // rate limiter to use when signing leaf certificates + caLeafLimiter connectSignRateLimiter providerLock sync.RWMutex // provider is the current CA provider in use for Connect. This is @@ -82,6 +85,29 @@ func (c *caDelegateWithState) ApplyCARequest(req *structs.CARequest) (interface{ return c.Server.raftApplyMsgpack(structs.ConnectCARequestType, req) } +func (c *caDelegateWithState) ApplyCALeafRequest() (uint64, error) { + // TODO(banks): when we implement IssuedCerts table we can use the insert to + // that as the raft index to return in response. + // + // UPDATE(mkeeler): The original implementation relied on updating the CAConfig + // and using its index as the ModifyIndex for certs. This was buggy. The long + // term goal is still to insert some metadata into raft about the certificates + // and use that raft index for the ModifyIndex. This is a partial step in that + // direction except that we only are setting an index and not storing the + // metadata. + req := structs.CALeafRequest{ + Op: structs.CALeafOpIncrementIndex, + Datacenter: c.Server.config.Datacenter, + } + resp, err := c.Server.raftApplyMsgpack(structs.ConnectCALeafRequestType|structs.IgnoreUnknownTypeFlag, &req) + + modIdx, ok := resp.(uint64) + if !ok { + return 0, fmt.Errorf("Invalid response from updating the leaf cert index") + } + return modIdx, err +} + func (c *caDelegateWithState) generateCASignRequest(csr string) *structs.CASignRequest { return &structs.CASignRequest{ Datacenter: c.Server.config.PrimaryDatacenter, diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go index 502dda28d8..4ced2c0f23 100644 --- a/agent/consul/leader_connect_ca_test.go +++ b/agent/consul/leader_connect_ca_test.go @@ -62,6 +62,10 @@ func (m *mockCAServerDelegate) CheckServers(datacenter string, fn func(*metadata }) } +func (m *mockCAServerDelegate) ApplyCALeafRequest() (uint64, error) { + return 3, nil +} + // ApplyCARequest mirrors FSM.applyConnectCAOperation because that functionality // is not exported. func (m *mockCAServerDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) { diff --git a/agent/consul/server.go b/agent/consul/server.go index 6bb8bf4d10..dd9cc63de2 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -878,7 +878,7 @@ func (s *Server) setupRPC() error { authz = &disabledAuthorizer{} } // now register with the insecure RPC server - s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, s, authz)) + s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, autoConfigBackend{Server: s}, authz)) ln, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { @@ -1456,6 +1456,7 @@ func (s *Server) isReadyForConsistentReads() bool { } // CreateACLToken will create an ACL token from the given template +// TODO: move to autoConfigBackend func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { // we have to require local tokens or else it would require having these servers use a token with acl:write to make a // token create RPC to the servers in the primary DC. @@ -1504,6 +1505,7 @@ func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, // DatacenterJoinAddresses will return all the strings suitable for usage in // retry join operations to connect to the the LAN or LAN segment gossip pool. +// TODO: move to autoConfigBackend func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) { members, err := s.LANSegmentMembers(segment) if err != nil { diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go index 4626db4ef4..949f6797e6 100644 --- a/agent/consul/server_connect.go +++ b/agent/consul/server_connect.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/lib/semaphore" ) +// TODO: move to leader_connect_ca.go type connectSignRateLimiter struct { // csrRateLimiter limits the rate of signing new certs if configured. Lazily // initialized from current config to support dynamic changes. @@ -70,7 +71,8 @@ func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *r return l.csrRateLimiter } -// GetCARoots will retrieve +// GetCARoots will retrieve CARoots +// TODO: move to autoConfigBackend func (s *Server) GetCARoots() (*structs.IndexedCARoots, error) { return s.getCARoots(nil, s.fsm.State()) } @@ -138,9 +140,9 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind return indexedRoots, nil } -// TODO: Move this off Server. This is only called by RPC endpoints. -func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { - provider, caRoot := s.caManager.getCAProvider() +// TODO: Move this to leader_connect_ca.go +func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { + provider, caRoot := c.getCAProvider() if provider == nil { return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil") } else if caRoot == nil { @@ -148,7 +150,7 @@ func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect. } // Verify that the CSR entity is in the cluster's trust domain - state := s.fsm.State() + state := c.delegate.State() _, config, err := state.CAConfig(nil) if err != nil { return nil, err @@ -198,7 +200,7 @@ func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect. return nil, err } if commonCfg.CSRMaxPerSecond > 0 { - lim := s.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) + lim := c.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) // Wait up to the small threshold we allow for a token. ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) defer cancel() @@ -206,13 +208,13 @@ func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect. return nil, ErrRateLimited } } else if commonCfg.CSRMaxConcurrent > 0 { - s.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) + c.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) defer cancel() - if err := s.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { + if err := c.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { return nil, ErrRateLimited } - defer s.caLeafLimiter.csrConcurrencyLimiter.Release() + defer c.caLeafLimiter.csrConcurrencyLimiter.Release() } connect.HackSANExtensionForCSR(csr) @@ -245,30 +247,11 @@ func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect. pem = pem + ca.EnsureTrailingNewline(inter) } - // TODO(banks): when we implement IssuedCerts table we can use the insert to - // that as the raft index to return in response. - // - // UPDATE(mkeeler): The original implementation relied on updating the CAConfig - // and using its index as the ModifyIndex for certs. This was buggy. The long - // term goal is still to insert some metadata into raft about the certificates - // and use that raft index for the ModifyIndex. This is a partial step in that - // direction except that we only are setting an index and not storing the - // metadata. - req := structs.CALeafRequest{ - Op: structs.CALeafOpIncrementIndex, - Datacenter: s.config.Datacenter, - } - - resp, err := s.raftApply(structs.ConnectCALeafRequestType|structs.IgnoreUnknownTypeFlag, &req) + modIdx, err := c.delegate.ApplyCALeafRequest() if err != nil { return nil, err } - modIdx, ok := resp.(uint64) - if !ok { - return nil, fmt.Errorf("Invalid response from updating the leaf cert index") - } - cert, err := connect.ParseCert(pem) if err != nil { return nil, err From 0512cb2813821aaef49505a44b5f33581fca1b2b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 23 Jun 2021 16:32:59 -0400 Subject: [PATCH 2/4] ca: move SignCertificate to the file where it is used --- agent/consul/leader_connect_ca.go | 201 ++++++++++++++++++++++++++++- agent/consul/server_connect.go | 202 +----------------------------- 2 files changed, 200 insertions(+), 203 deletions(-) diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index 5c1b804ca0..0f9ce63a05 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -2,8 +2,10 @@ package consul import ( "context" + "crypto/x509" "errors" "fmt" + "net/url" "reflect" "strings" "sync" @@ -11,6 +13,9 @@ import ( "github.com/hashicorp/go-hclog" uuid "github.com/hashicorp/go-uuid" + "golang.org/x/time/rate" + + "github.com/hashicorp/consul/lib/semaphore" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" @@ -175,8 +180,8 @@ func (c *CAManager) getPrimaryRoots() structs.IndexedCARoots { // when setting up the CA during establishLeadership. The state should be set to // non-ready before calling this. func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) { - state := c.delegate.State() - _, config, err := state.CAConfig(nil) + st := c.delegate.State() + _, config, err := st.CAConfig(nil) if err != nil { return nil, err } @@ -1313,3 +1318,195 @@ func (c *CAManager) configuredSecondaryCA() bool { defer c.stateLock.Unlock() return c.actingSecondaryCA } + +type connectSignRateLimiter struct { + // csrRateLimiter limits the rate of signing new certs if configured. Lazily + // initialized from current config to support dynamic changes. + // csrRateLimiterMu must be held while dereferencing the pointer or storing a + // new one, but methods can be called on the limiter object outside of the + // locked section. This is done only in the getCSRRateLimiterWithLimit method. + csrRateLimiter *rate.Limiter + csrRateLimiterMu sync.RWMutex + + // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit + // Sign RPC concurrency if configured. The zero value is usable as soon as + // SetSize is called which we do dynamically in the RPC handler to avoid + // having to hook elaborate synchronization mechanisms through the CA config + // endpoint and config reload etc. + csrConcurrencyLimiter semaphore.Dynamic +} + +// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. +// It uses the shared server-wide limiter unless the limit has been changed in +// config or the limiter has not been setup yet in which case it just-in-time +// configures the new limiter. We assume that limit changes are relatively rare +// and that all callers (there is currently only one) use the same config value +// as the limit. There might be some flapping if there are multiple concurrent +// requests in flight at the time the config changes where A sees the new value +// and updates, B sees the old but then gets this lock second and changes back. +// Eventually though and very soon (once all current RPCs are complete) we are +// guaranteed to have the correct limit set by the next RPC that comes in so I +// assume this is fine. If we observe strange behavior because of it, we could +// add hysteresis that prevents changes too soon after a previous change but +// that seems unnecessary for now. +func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { + l.csrRateLimiterMu.RLock() + lim := l.csrRateLimiter + l.csrRateLimiterMu.RUnlock() + + // If there is a current limiter with the same limit, return it. This should + // be the common case. + if lim != nil && lim.Limit() == limit { + return lim + } + + // Need to change limiter, get write lock + l.csrRateLimiterMu.Lock() + defer l.csrRateLimiterMu.Unlock() + // No limiter yet, or limit changed in CA config, reconfigure a new limiter. + // We use burst of 1 for a hard limit. Note that either bursting or waiting is + // necessary to get expected behavior in fact of random arrival times, but we + // don't need both and we use Wait with a small delay to smooth noise. See + // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. + l.csrRateLimiter = rate.NewLimiter(limit, 1) + return l.csrRateLimiter +} + +func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { + provider, caRoot := c.getCAProvider() + if provider == nil { + return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil") + } else if caRoot == nil { + return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate") + } + + // Verify that the CSR entity is in the cluster's trust domain + state := c.delegate.State() + _, config, err := state.CAConfig(nil) + if err != nil { + return nil, err + } + signingID := connect.SpiffeIDSigningForCluster(config) + serviceID, isService := spiffeID.(*connect.SpiffeIDService) + agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent) + if !isService && !isAgent { + return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID") + } + + var entMeta structs.EnterpriseMeta + if isService { + if !signingID.CanSign(spiffeID) { + return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+ + "we are %s", serviceID.Host, signingID.Host()) + } + entMeta.Merge(serviceID.GetEnterpriseMeta()) + } else { + // isAgent - if we support more ID types then this would need to be an else if + // here we are just automatically fixing the trust domain. For auto-encrypt and + // auto-config they make certificate requests before learning about the roots + // so they will have a dummy trust domain in the CSR. + trustDomain := signingID.Host() + if agentID.Host != trustDomain { + originalURI := agentID.URI() + + agentID.Host = trustDomain + + // recreate the URIs list + uris := make([]*url.URL, len(csr.URIs)) + for i, uri := range csr.URIs { + if originalURI.String() == uri.String() { + uris[i] = agentID.URI() + } else { + uris[i] = uri + } + } + + csr.URIs = uris + } + entMeta.Merge(structs.DefaultEnterpriseMeta()) + } + + commonCfg, err := config.GetCommonConfig() + if err != nil { + return nil, err + } + if commonCfg.CSRMaxPerSecond > 0 { + lim := c.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) + // Wait up to the small threshold we allow for a token. + ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) + defer cancel() + if lim.Wait(ctx) != nil { + return nil, ErrRateLimited + } + } else if commonCfg.CSRMaxConcurrent > 0 { + c.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) + ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) + defer cancel() + if err := c.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { + return nil, ErrRateLimited + } + defer c.caLeafLimiter.csrConcurrencyLimiter.Release() + } + + connect.HackSANExtensionForCSR(csr) + + // All seems to be in order, actually sign it. + + pem, err := provider.Sign(csr) + if err == ca.ErrRateLimited { + return nil, ErrRateLimited + } + if err != nil { + return nil, err + } + + // Append any intermediates needed by this root. + for _, p := range caRoot.IntermediateCerts { + pem = pem + ca.EnsureTrailingNewline(p) + } + + // Append our local CA's intermediate if there is one. + inter, err := provider.ActiveIntermediate() + if err != nil { + return nil, err + } + root, err := provider.ActiveRoot() + if err != nil { + return nil, err + } + if inter != root { + pem = pem + ca.EnsureTrailingNewline(inter) + } + + modIdx, err := c.delegate.ApplyCALeafRequest() + if err != nil { + return nil, err + } + + cert, err := connect.ParseCert(pem) + if err != nil { + return nil, err + } + + // Set the response + reply := structs.IssuedCert{ + SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber), + CertPEM: pem, + ValidAfter: cert.NotBefore, + ValidBefore: cert.NotAfter, + EnterpriseMeta: entMeta, + RaftIndex: structs.RaftIndex{ + ModifyIndex: modIdx, + CreateIndex: modIdx, + }, + } + if isService { + reply.Service = serviceID.Service + reply.ServiceURI = cert.URIs[0].String() + } else if isAgent { + reply.Agent = agentID.Agent + reply.AgentURI = cert.URIs[0].String() + } + + return &reply, nil +} diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go index 949f6797e6..d7ff709b5e 100644 --- a/agent/consul/server_connect.go +++ b/agent/consul/server_connect.go @@ -1,76 +1,16 @@ package consul import ( - "context" - "crypto/x509" "fmt" - "net/url" - "sync" - memdb "github.com/hashicorp/go-memdb" - "golang.org/x/time/rate" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/lib/semaphore" ) -// TODO: move to leader_connect_ca.go -type connectSignRateLimiter struct { - // csrRateLimiter limits the rate of signing new certs if configured. Lazily - // initialized from current config to support dynamic changes. - // csrRateLimiterMu must be held while dereferencing the pointer or storing a - // new one, but methods can be called on the limiter object outside of the - // locked section. This is done only in the getCSRRateLimiterWithLimit method. - csrRateLimiter *rate.Limiter - csrRateLimiterMu sync.RWMutex - - // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit - // Sign RPC concurrency if configured. The zero value is usable as soon as - // SetSize is called which we do dynamically in the RPC handler to avoid - // having to hook elaborate synchronization mechanisms through the CA config - // endpoint and config reload etc. - csrConcurrencyLimiter semaphore.Dynamic -} - -// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. -// It uses the shared server-wide limiter unless the limit has been changed in -// config or the limiter has not been setup yet in which case it just-in-time -// configures the new limiter. We assume that limit changes are relatively rare -// and that all callers (there is currently only one) use the same config value -// as the limit. There might be some flapping if there are multiple concurrent -// requests in flight at the time the config changes where A sees the new value -// and updates, B sees the old but then gets this lock second and changes back. -// Eventually though and very soon (once all current RPCs are complete) we are -// guaranteed to have the correct limit set by the next RPC that comes in so I -// assume this is fine. If we observe strange behavior because of it, we could -// add hysteresis that prevents changes too soon after a previous change but -// that seems unnecessary for now. -func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { - l.csrRateLimiterMu.RLock() - lim := l.csrRateLimiter - l.csrRateLimiterMu.RUnlock() - - // If there is a current limiter with the same limit, return it. This should - // be the common case. - if lim != nil && lim.Limit() == limit { - return lim - } - - // Need to change limiter, get write lock - l.csrRateLimiterMu.Lock() - defer l.csrRateLimiterMu.Unlock() - // No limiter yet, or limit changed in CA config, reconfigure a new limiter. - // We use burst of 1 for a hard limit. Note that either bursting or waiting is - // necessary to get expected behavior in fact of random arrival times, but we - // don't need both and we use Wait with a small delay to smooth noise. See - // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. - l.csrRateLimiter = rate.NewLimiter(limit, 1) - return l.csrRateLimiter -} - // GetCARoots will retrieve CARoots // TODO: move to autoConfigBackend func (s *Server) GetCARoots() (*structs.IndexedCARoots, error) { @@ -139,143 +79,3 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind return indexedRoots, nil } - -// TODO: Move this to leader_connect_ca.go -func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { - provider, caRoot := c.getCAProvider() - if provider == nil { - return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil") - } else if caRoot == nil { - return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate") - } - - // Verify that the CSR entity is in the cluster's trust domain - state := c.delegate.State() - _, config, err := state.CAConfig(nil) - if err != nil { - return nil, err - } - signingID := connect.SpiffeIDSigningForCluster(config) - serviceID, isService := spiffeID.(*connect.SpiffeIDService) - agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent) - if !isService && !isAgent { - return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID") - } - - var entMeta structs.EnterpriseMeta - if isService { - if !signingID.CanSign(spiffeID) { - return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+ - "we are %s", serviceID.Host, signingID.Host()) - } - entMeta.Merge(serviceID.GetEnterpriseMeta()) - } else { - // isAgent - if we support more ID types then this would need to be an else if - // here we are just automatically fixing the trust domain. For auto-encrypt and - // auto-config they make certificate requests before learning about the roots - // so they will have a dummy trust domain in the CSR. - trustDomain := signingID.Host() - if agentID.Host != trustDomain { - originalURI := agentID.URI() - - agentID.Host = trustDomain - - // recreate the URIs list - uris := make([]*url.URL, len(csr.URIs)) - for i, uri := range csr.URIs { - if originalURI.String() == uri.String() { - uris[i] = agentID.URI() - } else { - uris[i] = uri - } - } - - csr.URIs = uris - } - entMeta.Merge(structs.DefaultEnterpriseMeta()) - } - - commonCfg, err := config.GetCommonConfig() - if err != nil { - return nil, err - } - if commonCfg.CSRMaxPerSecond > 0 { - lim := c.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) - // Wait up to the small threshold we allow for a token. - ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) - defer cancel() - if lim.Wait(ctx) != nil { - return nil, ErrRateLimited - } - } else if commonCfg.CSRMaxConcurrent > 0 { - c.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) - ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) - defer cancel() - if err := c.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { - return nil, ErrRateLimited - } - defer c.caLeafLimiter.csrConcurrencyLimiter.Release() - } - - connect.HackSANExtensionForCSR(csr) - - // All seems to be in order, actually sign it. - - pem, err := provider.Sign(csr) - if err == ca.ErrRateLimited { - return nil, ErrRateLimited - } - if err != nil { - return nil, err - } - - // Append any intermediates needed by this root. - for _, p := range caRoot.IntermediateCerts { - pem = pem + ca.EnsureTrailingNewline(p) - } - - // Append our local CA's intermediate if there is one. - inter, err := provider.ActiveIntermediate() - if err != nil { - return nil, err - } - root, err := provider.ActiveRoot() - if err != nil { - return nil, err - } - if inter != root { - pem = pem + ca.EnsureTrailingNewline(inter) - } - - modIdx, err := c.delegate.ApplyCALeafRequest() - if err != nil { - return nil, err - } - - cert, err := connect.ParseCert(pem) - if err != nil { - return nil, err - } - - // Set the response - reply := structs.IssuedCert{ - SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber), - CertPEM: pem, - ValidAfter: cert.NotBefore, - ValidBefore: cert.NotAfter, - EnterpriseMeta: entMeta, - RaftIndex: structs.RaftIndex{ - ModifyIndex: modIdx, - CreateIndex: modIdx, - }, - } - if isService { - reply.Service = serviceID.Service - reply.ServiceURI = cert.URIs[0].String() - } else if isAgent { - reply.Agent = agentID.Agent - reply.AgentURI = cert.URIs[0].String() - } - - return &reply, nil -} From 3091026e020e1de040dbc26678b673dfcb78446e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 23 Jun 2021 17:14:28 -0400 Subject: [PATCH 3/4] auto-config: move autoConfigBackend impl off of Server Most of these methods are used exclusively for the AutoConfig RPC endpoint. This PR uses a pattern that we've used in other places as an incremental step to reducing the scope of Server. --- agent/consul/auto_config_backend.go | 96 ++++++++++++++++++ agent/consul/auto_config_backend_test.go | 115 ++++++++++++++++++++++ agent/consul/auto_config_endpoint.go | 8 -- agent/consul/auto_config_endpoint_test.go | 2 +- agent/consul/leader_connect_ca.go | 2 +- agent/consul/server.go | 68 ------------- agent/consul/server_connect.go | 6 -- agent/consul/server_test.go | 102 ------------------- 8 files changed, 213 insertions(+), 186 deletions(-) create mode 100644 agent/consul/auto_config_backend.go create mode 100644 agent/consul/auto_config_backend_test.go diff --git a/agent/consul/auto_config_backend.go b/agent/consul/auto_config_backend.go new file mode 100644 index 0000000000..fa6b1e1689 --- /dev/null +++ b/agent/consul/auto_config_backend.go @@ -0,0 +1,96 @@ +package consul + +import ( + "crypto/x509" + "fmt" + "net" + "time" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" +) + +type autoConfigBackend struct { + Server *Server +} + +func (b autoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) { + return b.Server.ForwardRPC(method, info, reply) +} + +func (b autoConfigBackend) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) { + return b.Server.caManager.SignCertificate(csr, id) +} + +// GetCARoots returns the CA roots. +func (b autoConfigBackend) GetCARoots() (*structs.IndexedCARoots, error) { + return b.Server.getCARoots(nil, b.Server.fsm.State()) +} + +// DatacenterJoinAddresses will return all the strings suitable for usage in +// retry join operations to connect to the the LAN or LAN segment gossip pool. +func (b autoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) { + members, err := b.Server.LANSegmentMembers(segment) + if err != nil { + return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err) + } + + var joinAddrs []string + for _, m := range members { + if ok, _ := metadata.IsConsulServer(m); ok { + serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)} + joinAddrs = append(joinAddrs, serfAddr.String()) + } + } + + return joinAddrs, nil +} + +// CreateACLToken will create an ACL token from the given template +func (b autoConfigBackend) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { + // we have to require local tokens or else it would require having these servers use a token with acl:write to make a + // token create RPC to the servers in the primary DC. + if !b.Server.LocalTokensEnabled() { + return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", b.Server.config.Datacenter) + } + + newToken := *template + + // generate the accessor id + if newToken.AccessorID == "" { + accessor, err := lib.GenerateUUID(b.Server.checkTokenUUID) + if err != nil { + return nil, err + } + + newToken.AccessorID = accessor + } + + // generate the secret id + if newToken.SecretID == "" { + secret, err := lib.GenerateUUID(b.Server.checkTokenUUID) + if err != nil { + return nil, err + } + + newToken.SecretID = secret + } + + newToken.CreateTime = time.Now() + + req := structs.ACLTokenBatchSetRequest{ + Tokens: structs.ACLTokens{&newToken}, + CAS: false, + } + + // perform the request to mint the new token + if _, err := b.Server.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil { + return nil, err + } + + // return the full token definition from the FSM + _, token, err := b.Server.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta) + return token, err +} diff --git a/agent/consul/auto_config_backend_test.go b/agent/consul/auto_config_backend_test.go new file mode 100644 index 0000000000..2e82e8882e --- /dev/null +++ b/agent/consul/auto_config_backend_test.go @@ -0,0 +1,115 @@ +package consul + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" +) + +func TestAutoConfigBackend_DatacenterJoinAddresses(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + conf := testClusterConfig{ + Datacenter: "primary", + Servers: 3, + } + + nodes := newTestCluster(t, &conf) + + var expected []string + for _, srv := range nodes.Servers { + expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort)) + } + + backend := autoConfigBackend{Server: nodes.Servers[0]} + actual, err := backend.DatacenterJoinAddresses("") + require.NoError(t, err) + require.ElementsMatch(t, expected, actual) +} + +func TestAutoConfigBackend_CreateACLToken(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, srv, codec := testACLServerWithConfig(t, nil, false) + + waitForLeaderEstablishment(t, srv) + + r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1") + require.NoError(t, err) + + t.Run("predefined-ids", func(t *testing.T) { + accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3" + secret := "ef453f31-ad58-4ec8-8bf8-342e99763026" + in := &structs.ACLToken{ + AccessorID: accessor, + SecretID: secret, + Description: "test", + Policies: []structs.ACLTokenPolicyLink{ + { + ID: structs.ACLPolicyGlobalManagementID, + }, + }, + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "foo", + Datacenter: "bar", + }, + }, + ServiceIdentities: []*structs.ACLServiceIdentity{ + { + ServiceName: "web", + }, + }, + Roles: []structs.ACLTokenRoleLink{ + { + ID: r1.ID, + }, + }, + } + + b := autoConfigBackend{Server: srv} + out, err := b.CreateACLToken(in) + require.NoError(t, err) + require.Equal(t, accessor, out.AccessorID) + require.Equal(t, secret, out.SecretID) + require.Equal(t, "test", out.Description) + require.NotZero(t, out.CreateTime) + require.Len(t, out.Policies, 1) + require.Len(t, out.Roles, 1) + require.Len(t, out.NodeIdentities, 1) + require.Len(t, out.ServiceIdentities, 1) + require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID) + require.Equal(t, "foo", out.NodeIdentities[0].NodeName) + require.Equal(t, "web", out.ServiceIdentities[0].ServiceName) + require.Equal(t, r1.ID, out.Roles[0].ID) + }) + + t.Run("autogen-ids", func(t *testing.T) { + in := &structs.ACLToken{ + Description: "test", + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "foo", + Datacenter: "bar", + }, + }, + } + + b := autoConfigBackend{Server: srv} + out, err := b.CreateACLToken(in) + require.NoError(t, err) + require.NotEmpty(t, out.AccessorID) + require.NotEmpty(t, out.SecretID) + require.Equal(t, "test", out.Description) + require.NotZero(t, out.CreateTime) + require.Len(t, out.NodeIdentities, 1) + require.Equal(t, "foo", out.NodeIdentities[0].NodeName) + }) +} diff --git a/agent/consul/auto_config_endpoint.go b/agent/consul/auto_config_endpoint.go index 1b6d64f30d..b03c574937 100644 --- a/agent/consul/auto_config_endpoint.go +++ b/agent/consul/auto_config_endpoint.go @@ -113,14 +113,6 @@ type AutoConfigBackend interface { SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) } -type autoConfigBackend struct { - *Server -} - -func (b autoConfigBackend) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) { - return b.Server.caManager.SignCertificate(csr, id) -} - // AutoConfig endpoint is used for cluster auto configuration operations type AutoConfig struct { // currently AutoConfig does not support pushing down any configuration that would be reloadable on the servers diff --git a/agent/consul/auto_config_endpoint_test.go b/agent/consul/auto_config_endpoint_test.go index 563ed42c35..e40a9dbb69 100644 --- a/agent/consul/auto_config_endpoint_test.go +++ b/agent/consul/auto_config_endpoint_test.go @@ -195,7 +195,7 @@ func TestAutoConfigInitialConfiguration(t *testing.T) { waitForLeaderEstablishment(t, s) - roots, err := s.GetCARoots() + roots, err := s.getCARoots(nil, s.fsm.State()) require.NoError(t, err) pbroots, err := translateCARootsToProtobuf(roots) diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index 0f9ce63a05..cb646dfe1e 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -1025,7 +1025,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) (reterr error) // getIntermediateCAPrimary regenerates the intermediate cert in the primary datacenter. // This is only run for CAs that require an intermediary in the primary DC, such as Vault. -// It should only be called while the state lock is held by setting the state to non-ready. +// It should only be called while the state lock is held by setting the state to non-ready. func (c *CAManager) getIntermediateCAPrimary(provider ca.Provider, newActiveRoot *structs.CARoot) error { // Generate and sign an intermediate cert using the root CA. intermediatePEM, err := provider.GenerateIntermediate() diff --git a/agent/consul/server.go b/agent/consul/server.go index dd9cc63de2..e36e1bbad8 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -1455,74 +1455,6 @@ func (s *Server) isReadyForConsistentReads() bool { return atomic.LoadInt32(&s.readyForConsistentReads) == 1 } -// CreateACLToken will create an ACL token from the given template -// TODO: move to autoConfigBackend -func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { - // we have to require local tokens or else it would require having these servers use a token with acl:write to make a - // token create RPC to the servers in the primary DC. - if !s.LocalTokensEnabled() { - return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter) - } - - newToken := *template - - // generate the accessor id - if newToken.AccessorID == "" { - accessor, err := lib.GenerateUUID(s.checkTokenUUID) - if err != nil { - return nil, err - } - - newToken.AccessorID = accessor - } - - // generate the secret id - if newToken.SecretID == "" { - secret, err := lib.GenerateUUID(s.checkTokenUUID) - if err != nil { - return nil, err - } - - newToken.SecretID = secret - } - - newToken.CreateTime = time.Now() - - req := structs.ACLTokenBatchSetRequest{ - Tokens: structs.ACLTokens{&newToken}, - CAS: false, - } - - // perform the request to mint the new token - if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil { - return nil, err - } - - // return the full token definition from the FSM - _, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta) - return token, err -} - -// DatacenterJoinAddresses will return all the strings suitable for usage in -// retry join operations to connect to the the LAN or LAN segment gossip pool. -// TODO: move to autoConfigBackend -func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) { - members, err := s.LANSegmentMembers(segment) - if err != nil { - return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err) - } - - var joinAddrs []string - for _, m := range members { - if ok, _ := metadata.IsConsulServer(m); ok { - serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)} - joinAddrs = append(joinAddrs, serfAddr.String()) - } - } - - return joinAddrs, nil -} - // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go index d7ff709b5e..09453a5ee9 100644 --- a/agent/consul/server_connect.go +++ b/agent/consul/server_connect.go @@ -11,12 +11,6 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -// GetCARoots will retrieve CARoots -// TODO: move to autoConfigBackend -func (s *Server) GetCARoots() (*structs.IndexedCARoots, error) { - return s.getCARoots(nil, s.fsm.State()) -} - func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.IndexedCARoots, error) { index, roots, config, err := state.CARootsAndConfig(ws) if err != nil { diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index f126091ba9..21ae2421a7 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -1654,105 +1654,3 @@ func TestServer_CALogging(t *testing.T) { require.Contains(t, buf.String(), "consul CA provider configured") } - -func TestServer_DatacenterJoinAddresses(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - conf := testClusterConfig{ - Datacenter: "primary", - Servers: 3, - } - - nodes := newTestCluster(t, &conf) - - var expected []string - for _, srv := range nodes.Servers { - expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort)) - } - - actual, err := nodes.Servers[0].DatacenterJoinAddresses("") - require.NoError(t, err) - require.ElementsMatch(t, expected, actual) -} - -func TestServer_CreateACLToken(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - _, srv, codec := testACLServerWithConfig(t, nil, false) - - waitForLeaderEstablishment(t, srv) - - r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1") - require.NoError(t, err) - - t.Run("predefined-ids", func(t *testing.T) { - accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3" - secret := "ef453f31-ad58-4ec8-8bf8-342e99763026" - in := &structs.ACLToken{ - AccessorID: accessor, - SecretID: secret, - Description: "test", - Policies: []structs.ACLTokenPolicyLink{ - { - ID: structs.ACLPolicyGlobalManagementID, - }, - }, - NodeIdentities: []*structs.ACLNodeIdentity{ - { - NodeName: "foo", - Datacenter: "bar", - }, - }, - ServiceIdentities: []*structs.ACLServiceIdentity{ - { - ServiceName: "web", - }, - }, - Roles: []structs.ACLTokenRoleLink{ - { - ID: r1.ID, - }, - }, - } - - out, err := srv.CreateACLToken(in) - require.NoError(t, err) - require.Equal(t, accessor, out.AccessorID) - require.Equal(t, secret, out.SecretID) - require.Equal(t, "test", out.Description) - require.NotZero(t, out.CreateTime) - require.Len(t, out.Policies, 1) - require.Len(t, out.Roles, 1) - require.Len(t, out.NodeIdentities, 1) - require.Len(t, out.ServiceIdentities, 1) - require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID) - require.Equal(t, "foo", out.NodeIdentities[0].NodeName) - require.Equal(t, "web", out.ServiceIdentities[0].ServiceName) - require.Equal(t, r1.ID, out.Roles[0].ID) - }) - - t.Run("autogen-ids", func(t *testing.T) { - in := &structs.ACLToken{ - Description: "test", - NodeIdentities: []*structs.ACLNodeIdentity{ - { - NodeName: "foo", - Datacenter: "bar", - }, - }, - } - - out, err := srv.CreateACLToken(in) - require.NoError(t, err) - require.NotEmpty(t, out.AccessorID) - require.NotEmpty(t, out.SecretID) - require.Equal(t, "test", out.Description) - require.NotZero(t, out.CreateTime) - require.Len(t, out.NodeIdentities, 1) - require.Equal(t, "foo", out.NodeIdentities[0].NodeName) - }) -} From 5ed56fc786a3f68bf9863eaa422d06e6606f1a4d Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Mon, 12 Jul 2021 10:37:25 -0400 Subject: [PATCH 4/4] check error when `raftApplyMsgpack` --- agent/connect/ca/provider_aws.go | 3 +-- agent/consul/leader_connect_ca.go | 3 +++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/connect/ca/provider_aws.go b/agent/connect/ca/provider_aws.go index 73edc95f8c..e03f35984d 100644 --- a/agent/connect/ca/provider_aws.go +++ b/agent/connect/ca/provider_aws.go @@ -12,9 +12,8 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/acmpca" - "github.com/mitchellh/mapstructure" - "github.com/hashicorp/go-hclog" + "github.com/mitchellh/mapstructure" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index cb646dfe1e..9a6f73631c 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -105,6 +105,9 @@ func (c *caDelegateWithState) ApplyCALeafRequest() (uint64, error) { Datacenter: c.Server.config.Datacenter, } resp, err := c.Server.raftApplyMsgpack(structs.ConnectCALeafRequestType|structs.IgnoreUnknownTypeFlag, &req) + if err != nil { + return 0, err + } modIdx, ok := resp.(uint64) if !ok {