diff --git a/agent/connect/ca/provider_aws.go b/agent/connect/ca/provider_aws.go index 73edc95f8c..e03f35984d 100644 --- a/agent/connect/ca/provider_aws.go +++ b/agent/connect/ca/provider_aws.go @@ -12,9 +12,8 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/acmpca" - "github.com/mitchellh/mapstructure" - "github.com/hashicorp/go-hclog" + "github.com/mitchellh/mapstructure" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" diff --git a/agent/consul/auto_config_backend.go b/agent/consul/auto_config_backend.go new file mode 100644 index 0000000000..fa6b1e1689 --- /dev/null +++ b/agent/consul/auto_config_backend.go @@ -0,0 +1,96 @@ +package consul + +import ( + "crypto/x509" + "fmt" + "net" + "time" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" +) + +type autoConfigBackend struct { + Server *Server +} + +func (b autoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) { + return b.Server.ForwardRPC(method, info, reply) +} + +func (b autoConfigBackend) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) { + return b.Server.caManager.SignCertificate(csr, id) +} + +// GetCARoots returns the CA roots. +func (b autoConfigBackend) GetCARoots() (*structs.IndexedCARoots, error) { + return b.Server.getCARoots(nil, b.Server.fsm.State()) +} + +// DatacenterJoinAddresses will return all the strings suitable for usage in +// retry join operations to connect to the the LAN or LAN segment gossip pool. +func (b autoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) { + members, err := b.Server.LANSegmentMembers(segment) + if err != nil { + return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err) + } + + var joinAddrs []string + for _, m := range members { + if ok, _ := metadata.IsConsulServer(m); ok { + serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)} + joinAddrs = append(joinAddrs, serfAddr.String()) + } + } + + return joinAddrs, nil +} + +// CreateACLToken will create an ACL token from the given template +func (b autoConfigBackend) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { + // we have to require local tokens or else it would require having these servers use a token with acl:write to make a + // token create RPC to the servers in the primary DC. + if !b.Server.LocalTokensEnabled() { + return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", b.Server.config.Datacenter) + } + + newToken := *template + + // generate the accessor id + if newToken.AccessorID == "" { + accessor, err := lib.GenerateUUID(b.Server.checkTokenUUID) + if err != nil { + return nil, err + } + + newToken.AccessorID = accessor + } + + // generate the secret id + if newToken.SecretID == "" { + secret, err := lib.GenerateUUID(b.Server.checkTokenUUID) + if err != nil { + return nil, err + } + + newToken.SecretID = secret + } + + newToken.CreateTime = time.Now() + + req := structs.ACLTokenBatchSetRequest{ + Tokens: structs.ACLTokens{&newToken}, + CAS: false, + } + + // perform the request to mint the new token + if _, err := b.Server.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil { + return nil, err + } + + // return the full token definition from the FSM + _, token, err := b.Server.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta) + return token, err +} diff --git a/agent/consul/auto_config_backend_test.go b/agent/consul/auto_config_backend_test.go new file mode 100644 index 0000000000..2e82e8882e --- /dev/null +++ b/agent/consul/auto_config_backend_test.go @@ -0,0 +1,115 @@ +package consul + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" +) + +func TestAutoConfigBackend_DatacenterJoinAddresses(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + conf := testClusterConfig{ + Datacenter: "primary", + Servers: 3, + } + + nodes := newTestCluster(t, &conf) + + var expected []string + for _, srv := range nodes.Servers { + expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort)) + } + + backend := autoConfigBackend{Server: nodes.Servers[0]} + actual, err := backend.DatacenterJoinAddresses("") + require.NoError(t, err) + require.ElementsMatch(t, expected, actual) +} + +func TestAutoConfigBackend_CreateACLToken(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, srv, codec := testACLServerWithConfig(t, nil, false) + + waitForLeaderEstablishment(t, srv) + + r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1") + require.NoError(t, err) + + t.Run("predefined-ids", func(t *testing.T) { + accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3" + secret := "ef453f31-ad58-4ec8-8bf8-342e99763026" + in := &structs.ACLToken{ + AccessorID: accessor, + SecretID: secret, + Description: "test", + Policies: []structs.ACLTokenPolicyLink{ + { + ID: structs.ACLPolicyGlobalManagementID, + }, + }, + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "foo", + Datacenter: "bar", + }, + }, + ServiceIdentities: []*structs.ACLServiceIdentity{ + { + ServiceName: "web", + }, + }, + Roles: []structs.ACLTokenRoleLink{ + { + ID: r1.ID, + }, + }, + } + + b := autoConfigBackend{Server: srv} + out, err := b.CreateACLToken(in) + require.NoError(t, err) + require.Equal(t, accessor, out.AccessorID) + require.Equal(t, secret, out.SecretID) + require.Equal(t, "test", out.Description) + require.NotZero(t, out.CreateTime) + require.Len(t, out.Policies, 1) + require.Len(t, out.Roles, 1) + require.Len(t, out.NodeIdentities, 1) + require.Len(t, out.ServiceIdentities, 1) + require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID) + require.Equal(t, "foo", out.NodeIdentities[0].NodeName) + require.Equal(t, "web", out.ServiceIdentities[0].ServiceName) + require.Equal(t, r1.ID, out.Roles[0].ID) + }) + + t.Run("autogen-ids", func(t *testing.T) { + in := &structs.ACLToken{ + Description: "test", + NodeIdentities: []*structs.ACLNodeIdentity{ + { + NodeName: "foo", + Datacenter: "bar", + }, + }, + } + + b := autoConfigBackend{Server: srv} + out, err := b.CreateACLToken(in) + require.NoError(t, err) + require.NotEmpty(t, out.AccessorID) + require.NotEmpty(t, out.SecretID) + require.Equal(t, "test", out.Description) + require.NotZero(t, out.CreateTime) + require.Len(t, out.NodeIdentities, 1) + require.Equal(t, "foo", out.NodeIdentities[0].NodeName) + }) +} diff --git a/agent/consul/auto_config_endpoint.go b/agent/consul/auto_config_endpoint.go index 53007bde73..b03c574937 100644 --- a/agent/consul/auto_config_endpoint.go +++ b/agent/consul/auto_config_endpoint.go @@ -109,7 +109,6 @@ type AutoConfigBackend interface { CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) DatacenterJoinAddresses(segment string) ([]string, error) ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) - GetCARoots() (*structs.IndexedCARoots, error) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) } diff --git a/agent/consul/auto_config_endpoint_test.go b/agent/consul/auto_config_endpoint_test.go index 2dbbdef197..d8e7049da2 100644 --- a/agent/consul/auto_config_endpoint_test.go +++ b/agent/consul/auto_config_endpoint_test.go @@ -196,7 +196,7 @@ func TestAutoConfigInitialConfiguration(t *testing.T) { waitForLeaderEstablishment(t, s) - roots, err := s.GetCARoots() + roots, err := s.getCARoots(nil, s.fsm.State()) require.NoError(t, err) pbroots, err := translateCARootsToProtobuf(roots) diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index 93f3b55fb3..9110a5bd3c 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -192,7 +192,7 @@ func (s *ConnectCA) Sign( } } - cert, err := s.srv.SignCertificate(csr, spiffeID) + cert, err := s.srv.caManager.SignCertificate(csr, spiffeID) if err != nil { return err } diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index d556599a2f..a9c4bc699b 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -492,7 +492,6 @@ func (c *FSM) applyConnectCALeafOperation(buf []byte, index uint64) interface{} switch req.Op { case structs.CALeafOpIncrementIndex: // Use current index as the new value as well as the value to write at. - // TODO(banks) do we even use this op any more? if err := c.state.CALeafSetIndex(index, index); err != nil { return err } diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index 3993ea2b08..fe510c72fe 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -2,8 +2,10 @@ package consul import ( "context" + "crypto/x509" "errors" "fmt" + "net/url" "reflect" "strings" "sync" @@ -11,6 +13,9 @@ import ( "github.com/hashicorp/go-hclog" uuid "github.com/hashicorp/go-uuid" + "golang.org/x/time/rate" + + "github.com/hashicorp/consul/lib/semaphore" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" @@ -34,6 +39,7 @@ const ( type caServerDelegate interface { ca.ConsulProviderStateDelegate IsLeader() bool + ApplyCALeafRequest() (uint64, error) forwardDC(method, dc string, args interface{}, reply interface{}) error generateCASignRequest(csr string) *structs.CASignRequest @@ -48,6 +54,8 @@ type CAManager struct { delegate caServerDelegate serverConf *Config logger hclog.Logger + // rate limiter to use when signing leaf certificates + caLeafLimiter connectSignRateLimiter providerLock sync.RWMutex // provider is the current CA provider in use for Connect. This is @@ -82,6 +90,32 @@ func (c *caDelegateWithState) ApplyCARequest(req *structs.CARequest) (interface{ return c.Server.raftApplyMsgpack(structs.ConnectCARequestType, req) } +func (c *caDelegateWithState) ApplyCALeafRequest() (uint64, error) { + // TODO(banks): when we implement IssuedCerts table we can use the insert to + // that as the raft index to return in response. + // + // UPDATE(mkeeler): The original implementation relied on updating the CAConfig + // and using its index as the ModifyIndex for certs. This was buggy. The long + // term goal is still to insert some metadata into raft about the certificates + // and use that raft index for the ModifyIndex. This is a partial step in that + // direction except that we only are setting an index and not storing the + // metadata. + req := structs.CALeafRequest{ + Op: structs.CALeafOpIncrementIndex, + Datacenter: c.Server.config.Datacenter, + } + resp, err := c.Server.raftApplyMsgpack(structs.ConnectCALeafRequestType|structs.IgnoreUnknownTypeFlag, &req) + if err != nil { + return 0, err + } + + modIdx, ok := resp.(uint64) + if !ok { + return 0, fmt.Errorf("Invalid response from updating the leaf cert index") + } + return modIdx, err +} + func (c *caDelegateWithState) generateCASignRequest(csr string) *structs.CASignRequest { return &structs.CASignRequest{ Datacenter: c.Server.config.PrimaryDatacenter, @@ -149,8 +183,8 @@ func (c *CAManager) getPrimaryRoots() structs.IndexedCARoots { // when setting up the CA during establishLeadership. The state should be set to // non-ready before calling this. func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) { - state := c.delegate.State() - _, config, err := state.CAConfig(nil) + st := c.delegate.State() + _, config, err := st.CAConfig(nil) if err != nil { return nil, err } @@ -994,7 +1028,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) (reterr error) // getIntermediateCAPrimary regenerates the intermediate cert in the primary datacenter. // This is only run for CAs that require an intermediary in the primary DC, such as Vault. -// It should only be called while the state lock is held by setting the state to non-ready. +// It should only be called while the state lock is held by setting the state to non-ready. func (c *CAManager) getIntermediateCAPrimary(provider ca.Provider, newActiveRoot *structs.CARoot) error { // Generate and sign an intermediate cert using the root CA. intermediatePEM, err := provider.GenerateIntermediate() @@ -1287,3 +1321,195 @@ func (c *CAManager) configuredSecondaryCA() bool { defer c.stateLock.Unlock() return c.actingSecondaryCA } + +type connectSignRateLimiter struct { + // csrRateLimiter limits the rate of signing new certs if configured. Lazily + // initialized from current config to support dynamic changes. + // csrRateLimiterMu must be held while dereferencing the pointer or storing a + // new one, but methods can be called on the limiter object outside of the + // locked section. This is done only in the getCSRRateLimiterWithLimit method. + csrRateLimiter *rate.Limiter + csrRateLimiterMu sync.RWMutex + + // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit + // Sign RPC concurrency if configured. The zero value is usable as soon as + // SetSize is called which we do dynamically in the RPC handler to avoid + // having to hook elaborate synchronization mechanisms through the CA config + // endpoint and config reload etc. + csrConcurrencyLimiter semaphore.Dynamic +} + +// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. +// It uses the shared server-wide limiter unless the limit has been changed in +// config or the limiter has not been setup yet in which case it just-in-time +// configures the new limiter. We assume that limit changes are relatively rare +// and that all callers (there is currently only one) use the same config value +// as the limit. There might be some flapping if there are multiple concurrent +// requests in flight at the time the config changes where A sees the new value +// and updates, B sees the old but then gets this lock second and changes back. +// Eventually though and very soon (once all current RPCs are complete) we are +// guaranteed to have the correct limit set by the next RPC that comes in so I +// assume this is fine. If we observe strange behavior because of it, we could +// add hysteresis that prevents changes too soon after a previous change but +// that seems unnecessary for now. +func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { + l.csrRateLimiterMu.RLock() + lim := l.csrRateLimiter + l.csrRateLimiterMu.RUnlock() + + // If there is a current limiter with the same limit, return it. This should + // be the common case. + if lim != nil && lim.Limit() == limit { + return lim + } + + // Need to change limiter, get write lock + l.csrRateLimiterMu.Lock() + defer l.csrRateLimiterMu.Unlock() + // No limiter yet, or limit changed in CA config, reconfigure a new limiter. + // We use burst of 1 for a hard limit. Note that either bursting or waiting is + // necessary to get expected behavior in fact of random arrival times, but we + // don't need both and we use Wait with a small delay to smooth noise. See + // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. + l.csrRateLimiter = rate.NewLimiter(limit, 1) + return l.csrRateLimiter +} + +func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { + provider, caRoot := c.getCAProvider() + if provider == nil { + return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil") + } else if caRoot == nil { + return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate") + } + + // Verify that the CSR entity is in the cluster's trust domain + state := c.delegate.State() + _, config, err := state.CAConfig(nil) + if err != nil { + return nil, err + } + signingID := connect.SpiffeIDSigningForCluster(config) + serviceID, isService := spiffeID.(*connect.SpiffeIDService) + agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent) + if !isService && !isAgent { + return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID") + } + + var entMeta structs.EnterpriseMeta + if isService { + if !signingID.CanSign(spiffeID) { + return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+ + "we are %s", serviceID.Host, signingID.Host()) + } + entMeta.Merge(serviceID.GetEnterpriseMeta()) + } else { + // isAgent - if we support more ID types then this would need to be an else if + // here we are just automatically fixing the trust domain. For auto-encrypt and + // auto-config they make certificate requests before learning about the roots + // so they will have a dummy trust domain in the CSR. + trustDomain := signingID.Host() + if agentID.Host != trustDomain { + originalURI := agentID.URI() + + agentID.Host = trustDomain + + // recreate the URIs list + uris := make([]*url.URL, len(csr.URIs)) + for i, uri := range csr.URIs { + if originalURI.String() == uri.String() { + uris[i] = agentID.URI() + } else { + uris[i] = uri + } + } + + csr.URIs = uris + } + entMeta.Merge(structs.DefaultEnterpriseMeta()) + } + + commonCfg, err := config.GetCommonConfig() + if err != nil { + return nil, err + } + if commonCfg.CSRMaxPerSecond > 0 { + lim := c.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) + // Wait up to the small threshold we allow for a token. + ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) + defer cancel() + if lim.Wait(ctx) != nil { + return nil, ErrRateLimited + } + } else if commonCfg.CSRMaxConcurrent > 0 { + c.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) + ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) + defer cancel() + if err := c.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { + return nil, ErrRateLimited + } + defer c.caLeafLimiter.csrConcurrencyLimiter.Release() + } + + connect.HackSANExtensionForCSR(csr) + + // All seems to be in order, actually sign it. + + pem, err := provider.Sign(csr) + if err == ca.ErrRateLimited { + return nil, ErrRateLimited + } + if err != nil { + return nil, err + } + + // Append any intermediates needed by this root. + for _, p := range caRoot.IntermediateCerts { + pem = pem + ca.EnsureTrailingNewline(p) + } + + // Append our local CA's intermediate if there is one. + inter, err := provider.ActiveIntermediate() + if err != nil { + return nil, err + } + root, err := provider.ActiveRoot() + if err != nil { + return nil, err + } + if inter != root { + pem = pem + ca.EnsureTrailingNewline(inter) + } + + modIdx, err := c.delegate.ApplyCALeafRequest() + if err != nil { + return nil, err + } + + cert, err := connect.ParseCert(pem) + if err != nil { + return nil, err + } + + // Set the response + reply := structs.IssuedCert{ + SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber), + CertPEM: pem, + ValidAfter: cert.NotBefore, + ValidBefore: cert.NotAfter, + EnterpriseMeta: entMeta, + RaftIndex: structs.RaftIndex{ + ModifyIndex: modIdx, + CreateIndex: modIdx, + }, + } + if isService { + reply.Service = serviceID.Service + reply.ServiceURI = cert.URIs[0].String() + } else if isAgent { + reply.Agent = agentID.Agent + reply.AgentURI = cert.URIs[0].String() + } + + return &reply, nil +} diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go index 502dda28d8..4ced2c0f23 100644 --- a/agent/consul/leader_connect_ca_test.go +++ b/agent/consul/leader_connect_ca_test.go @@ -62,6 +62,10 @@ func (m *mockCAServerDelegate) CheckServers(datacenter string, fn func(*metadata }) } +func (m *mockCAServerDelegate) ApplyCALeafRequest() (uint64, error) { + return 3, nil +} + // ApplyCARequest mirrors FSM.applyConnectCAOperation because that functionality // is not exported. func (m *mockCAServerDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) { diff --git a/agent/consul/server.go b/agent/consul/server.go index be79d1be50..10e9e7b0dc 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -873,7 +873,7 @@ func (s *Server) setupRPC() error { authz = &disabledAuthorizer{} } // now register with the insecure RPC server - s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, s, authz)) + s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, autoConfigBackend{Server: s}, authz)) ln, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { @@ -1450,72 +1450,6 @@ func (s *Server) isReadyForConsistentReads() bool { return atomic.LoadInt32(&s.readyForConsistentReads) == 1 } -// CreateACLToken will create an ACL token from the given template -func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { - // we have to require local tokens or else it would require having these servers use a token with acl:write to make a - // token create RPC to the servers in the primary DC. - if !s.LocalTokensEnabled() { - return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter) - } - - newToken := *template - - // generate the accessor id - if newToken.AccessorID == "" { - accessor, err := lib.GenerateUUID(s.checkTokenUUID) - if err != nil { - return nil, err - } - - newToken.AccessorID = accessor - } - - // generate the secret id - if newToken.SecretID == "" { - secret, err := lib.GenerateUUID(s.checkTokenUUID) - if err != nil { - return nil, err - } - - newToken.SecretID = secret - } - - newToken.CreateTime = time.Now() - - req := structs.ACLTokenBatchSetRequest{ - Tokens: structs.ACLTokens{&newToken}, - CAS: false, - } - - // perform the request to mint the new token - if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil { - return nil, err - } - - // return the full token definition from the FSM - _, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta) - return token, err -} - -// DatacenterJoinAddresses will return all the strings suitable for usage in -// retry join operations to connect to the the LAN or LAN segment gossip pool. -func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) { - members, err := s.LANSegmentMembers(segment) - if err != nil { - return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err) - } - - var joinAddrs []string - for _, m := range members { - if ok, _ := metadata.IsConsulServer(m); ok { - serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)} - joinAddrs = append(joinAddrs, serfAddr.String()) - } - } - - return joinAddrs, nil -} - // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go index 4626db4ef4..09453a5ee9 100644 --- a/agent/consul/server_connect.go +++ b/agent/consul/server_connect.go @@ -1,80 +1,16 @@ package consul import ( - "context" - "crypto/x509" "fmt" - "net/url" - "sync" - memdb "github.com/hashicorp/go-memdb" - "golang.org/x/time/rate" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/lib/semaphore" ) -type connectSignRateLimiter struct { - // csrRateLimiter limits the rate of signing new certs if configured. Lazily - // initialized from current config to support dynamic changes. - // csrRateLimiterMu must be held while dereferencing the pointer or storing a - // new one, but methods can be called on the limiter object outside of the - // locked section. This is done only in the getCSRRateLimiterWithLimit method. - csrRateLimiter *rate.Limiter - csrRateLimiterMu sync.RWMutex - - // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit - // Sign RPC concurrency if configured. The zero value is usable as soon as - // SetSize is called which we do dynamically in the RPC handler to avoid - // having to hook elaborate synchronization mechanisms through the CA config - // endpoint and config reload etc. - csrConcurrencyLimiter semaphore.Dynamic -} - -// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. -// It uses the shared server-wide limiter unless the limit has been changed in -// config or the limiter has not been setup yet in which case it just-in-time -// configures the new limiter. We assume that limit changes are relatively rare -// and that all callers (there is currently only one) use the same config value -// as the limit. There might be some flapping if there are multiple concurrent -// requests in flight at the time the config changes where A sees the new value -// and updates, B sees the old but then gets this lock second and changes back. -// Eventually though and very soon (once all current RPCs are complete) we are -// guaranteed to have the correct limit set by the next RPC that comes in so I -// assume this is fine. If we observe strange behavior because of it, we could -// add hysteresis that prevents changes too soon after a previous change but -// that seems unnecessary for now. -func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { - l.csrRateLimiterMu.RLock() - lim := l.csrRateLimiter - l.csrRateLimiterMu.RUnlock() - - // If there is a current limiter with the same limit, return it. This should - // be the common case. - if lim != nil && lim.Limit() == limit { - return lim - } - - // Need to change limiter, get write lock - l.csrRateLimiterMu.Lock() - defer l.csrRateLimiterMu.Unlock() - // No limiter yet, or limit changed in CA config, reconfigure a new limiter. - // We use burst of 1 for a hard limit. Note that either bursting or waiting is - // necessary to get expected behavior in fact of random arrival times, but we - // don't need both and we use Wait with a small delay to smooth noise. See - // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. - l.csrRateLimiter = rate.NewLimiter(limit, 1) - return l.csrRateLimiter -} - -// GetCARoots will retrieve -func (s *Server) GetCARoots() (*structs.IndexedCARoots, error) { - return s.getCARoots(nil, s.fsm.State()) -} - func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.IndexedCARoots, error) { index, roots, config, err := state.CARootsAndConfig(ws) if err != nil { @@ -137,162 +73,3 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind return indexedRoots, nil } - -// TODO: Move this off Server. This is only called by RPC endpoints. -func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { - provider, caRoot := s.caManager.getCAProvider() - if provider == nil { - return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil") - } else if caRoot == nil { - return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate") - } - - // Verify that the CSR entity is in the cluster's trust domain - state := s.fsm.State() - _, config, err := state.CAConfig(nil) - if err != nil { - return nil, err - } - signingID := connect.SpiffeIDSigningForCluster(config) - serviceID, isService := spiffeID.(*connect.SpiffeIDService) - agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent) - if !isService && !isAgent { - return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID") - } - - var entMeta structs.EnterpriseMeta - if isService { - if !signingID.CanSign(spiffeID) { - return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+ - "we are %s", serviceID.Host, signingID.Host()) - } - entMeta.Merge(serviceID.GetEnterpriseMeta()) - } else { - // isAgent - if we support more ID types then this would need to be an else if - // here we are just automatically fixing the trust domain. For auto-encrypt and - // auto-config they make certificate requests before learning about the roots - // so they will have a dummy trust domain in the CSR. - trustDomain := signingID.Host() - if agentID.Host != trustDomain { - originalURI := agentID.URI() - - agentID.Host = trustDomain - - // recreate the URIs list - uris := make([]*url.URL, len(csr.URIs)) - for i, uri := range csr.URIs { - if originalURI.String() == uri.String() { - uris[i] = agentID.URI() - } else { - uris[i] = uri - } - } - - csr.URIs = uris - } - entMeta.Merge(structs.DefaultEnterpriseMeta()) - } - - commonCfg, err := config.GetCommonConfig() - if err != nil { - return nil, err - } - if commonCfg.CSRMaxPerSecond > 0 { - lim := s.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) - // Wait up to the small threshold we allow for a token. - ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) - defer cancel() - if lim.Wait(ctx) != nil { - return nil, ErrRateLimited - } - } else if commonCfg.CSRMaxConcurrent > 0 { - s.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) - ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) - defer cancel() - if err := s.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { - return nil, ErrRateLimited - } - defer s.caLeafLimiter.csrConcurrencyLimiter.Release() - } - - connect.HackSANExtensionForCSR(csr) - - // All seems to be in order, actually sign it. - - pem, err := provider.Sign(csr) - if err == ca.ErrRateLimited { - return nil, ErrRateLimited - } - if err != nil { - return nil, err - } - - // Append any intermediates needed by this root. - for _, p := range caRoot.IntermediateCerts { - pem = pem + ca.EnsureTrailingNewline(p) - } - - // Append our local CA's intermediate if there is one. - inter, err := provider.ActiveIntermediate() - if err != nil { - return nil, err - } - root, err := provider.ActiveRoot() - if err != nil { - return nil, err - } - if inter != root { - pem = pem + ca.EnsureTrailingNewline(inter) - } - - // TODO(banks): when we implement IssuedCerts table we can use the insert to - // that as the raft index to return in response. - // - // UPDATE(mkeeler): The original implementation relied on updating the CAConfig - // and using its index as the ModifyIndex for certs. This was buggy. The long - // term goal is still to insert some metadata into raft about the certificates - // and use that raft index for the ModifyIndex. This is a partial step in that - // direction except that we only are setting an index and not storing the - // metadata. - req := structs.CALeafRequest{ - Op: structs.CALeafOpIncrementIndex, - Datacenter: s.config.Datacenter, - } - - resp, err := s.raftApply(structs.ConnectCALeafRequestType|structs.IgnoreUnknownTypeFlag, &req) - if err != nil { - return nil, err - } - - modIdx, ok := resp.(uint64) - if !ok { - return nil, fmt.Errorf("Invalid response from updating the leaf cert index") - } - - cert, err := connect.ParseCert(pem) - if err != nil { - return nil, err - } - - // Set the response - reply := structs.IssuedCert{ - SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber), - CertPEM: pem, - ValidAfter: cert.NotBefore, - ValidBefore: cert.NotAfter, - EnterpriseMeta: entMeta, - RaftIndex: structs.RaftIndex{ - ModifyIndex: modIdx, - CreateIndex: modIdx, - }, - } - if isService { - reply.Service = serviceID.Service - reply.ServiceURI = cert.URIs[0].String() - } else if isAgent { - reply.Agent = agentID.Agent - reply.AgentURI = cert.URIs[0].String() - } - - return &reply, nil -} diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 22179275ef..398310f128 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -1654,105 +1654,3 @@ func TestServer_CALogging(t *testing.T) { require.Contains(t, buf.String(), "consul CA provider configured") } - -func TestServer_DatacenterJoinAddresses(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - conf := testClusterConfig{ - Datacenter: "primary", - Servers: 3, - } - - nodes := newTestCluster(t, &conf) - - var expected []string - for _, srv := range nodes.Servers { - expected = append(expected, fmt.Sprintf("127.0.0.1:%d", srv.config.SerfLANConfig.MemberlistConfig.BindPort)) - } - - actual, err := nodes.Servers[0].DatacenterJoinAddresses("") - require.NoError(t, err) - require.ElementsMatch(t, expected, actual) -} - -func TestServer_CreateACLToken(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - _, srv, codec := testACLServerWithConfig(t, nil, false) - - waitForLeaderEstablishment(t, srv) - - r1, err := upsertTestRole(codec, TestDefaultMasterToken, "dc1") - require.NoError(t, err) - - t.Run("predefined-ids", func(t *testing.T) { - accessor := "554cd3ab-5d4e-4d6e-952e-4e8b6c77bfb3" - secret := "ef453f31-ad58-4ec8-8bf8-342e99763026" - in := &structs.ACLToken{ - AccessorID: accessor, - SecretID: secret, - Description: "test", - Policies: []structs.ACLTokenPolicyLink{ - { - ID: structs.ACLPolicyGlobalManagementID, - }, - }, - NodeIdentities: []*structs.ACLNodeIdentity{ - { - NodeName: "foo", - Datacenter: "bar", - }, - }, - ServiceIdentities: []*structs.ACLServiceIdentity{ - { - ServiceName: "web", - }, - }, - Roles: []structs.ACLTokenRoleLink{ - { - ID: r1.ID, - }, - }, - } - - out, err := srv.CreateACLToken(in) - require.NoError(t, err) - require.Equal(t, accessor, out.AccessorID) - require.Equal(t, secret, out.SecretID) - require.Equal(t, "test", out.Description) - require.NotZero(t, out.CreateTime) - require.Len(t, out.Policies, 1) - require.Len(t, out.Roles, 1) - require.Len(t, out.NodeIdentities, 1) - require.Len(t, out.ServiceIdentities, 1) - require.Equal(t, structs.ACLPolicyGlobalManagementID, out.Policies[0].ID) - require.Equal(t, "foo", out.NodeIdentities[0].NodeName) - require.Equal(t, "web", out.ServiceIdentities[0].ServiceName) - require.Equal(t, r1.ID, out.Roles[0].ID) - }) - - t.Run("autogen-ids", func(t *testing.T) { - in := &structs.ACLToken{ - Description: "test", - NodeIdentities: []*structs.ACLNodeIdentity{ - { - NodeName: "foo", - Datacenter: "bar", - }, - }, - } - - out, err := srv.CreateACLToken(in) - require.NoError(t, err) - require.NotEmpty(t, out.AccessorID) - require.NotEmpty(t, out.SecretID) - require.Equal(t, "test", out.Description) - require.NotZero(t, out.CreateTime) - require.Len(t, out.NodeIdentities, 1) - require.Equal(t, "foo", out.NodeIdentities[0].NodeName) - }) -}