Wire up agent leaf endpoint to cache framework to support blocking.

This commit is contained in:
Paul Banks 2018-04-30 22:23:49 +01:00 committed by Mitchell Hashimoto
parent d1f4ad3d8a
commit 90c574ebaa
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
13 changed files with 225 additions and 204 deletions

View File

@ -2679,6 +2679,16 @@ func (a *Agent) registerCache() {
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
RPC: a.delegate,
Cache: a.cache,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{
RPC: a.delegate,
}, &cache.RegisterOptions{

View File

@ -28,9 +28,7 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
"github.com/mitchellh/go-testing-interface"
)
type Self struct {
@ -918,24 +916,39 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.
return nil, fmt.Errorf("unknown service ID: %s", id)
}
// Create a CSR.
// TODO(mitchellh): This is obviously not production ready!
csr, pk := connect.TestCSR(&testing.RuntimeT{}, &connect.SpiffeIDService{
Host: "1234.consul",
Namespace: "default",
Datacenter: s.agent.config.Datacenter,
Service: service.Service,
})
args := cachetype.ConnectCALeafRequest{
Service: service.Service, // Need name not ID
}
var qOpts structs.QueryOptions
// Store DC in the ConnectCALeafRequest but query opts separately
if done := s.parse(resp, req, &args.Datacenter, &qOpts); done {
return nil, nil
}
args.MinQueryIndex = qOpts.MinQueryIndex
// Request signing
var reply structs.IssuedCert
args := structs.CASignRequest{CSR: csr}
if err := s.agent.RPC("ConnectCA.Sign", &args, &reply); err != nil {
// Validate token
// TODO(banks): support correct proxy token checking too
rule, err := s.agent.resolveToken(qOpts.Token)
if err != nil {
return nil, err
}
reply.PrivateKeyPEM = pk
if rule != nil && !rule.ServiceWrite(service.Service, nil) {
return nil, acl.ErrPermissionDenied
}
return &reply, nil
raw, err := s.agent.cache.Get(cachetype.ConnectCALeafName, &args)
if err != nil {
return nil, err
}
reply, ok := raw.(*structs.IssuedCert)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
setIndex(resp, reply.ModifyIndex)
return reply, nil
}
// GET /v1/agent/connect/proxy/:proxy_service_id

View File

@ -2,6 +2,7 @@ package agent
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
@ -2105,7 +2106,7 @@ func TestAgentConnectCARoots_empty(t *testing.T) {
t.Parallel()
assert := assert.New(t)
a := NewTestAgent(t.Name(), "")
a := NewTestAgent(t.Name(), "connect { enabled = false }")
defer a.Shutdown()
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/roots", nil)
@ -2128,13 +2129,9 @@ func TestAgentConnectCARoots_list(t *testing.T) {
// Grab the initial cache hit count
cacheHits := a.cache.Hits()
// Set some CAs
var reply interface{}
ca1 := connect.TestCA(t, nil)
ca1.Active = false
ca2 := connect.TestCA(t, nil)
require.Nil(a.RPC("Test.ConnectCASetRoots",
[]*structs.CARoot{ca1, ca2}, &reply))
// Set some CAs. Note that NewTestAgent already bootstraps one CA so this just
// adds a second and makes it active.
ca2 := connect.TestCAConfigSet(t, a, nil)
// List
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/roots", nil)
@ -2152,7 +2149,7 @@ func TestAgentConnectCARoots_list(t *testing.T) {
require.Equal("", r.SigningKey)
}
// That should've been a cache miss, so not hit change
// That should've been a cache miss, so no hit change
require.Equal(cacheHits, a.cache.Hits())
// Test caching
@ -2169,24 +2166,21 @@ func TestAgentConnectCARoots_list(t *testing.T) {
// Test that caching is updated in the background
{
// Set some new CAs
var reply interface{}
ca := connect.TestCA(t, nil)
require.Nil(a.RPC("Test.ConnectCASetRoots",
[]*structs.CARoot{ca}, &reply))
// Set a new CA
ca := connect.TestCAConfigSet(t, a, nil)
retry.Run(t, func(r *retry.R) {
// List it again
obj, err := a.srv.AgentConnectCARoots(httptest.NewRecorder(), req)
if err != nil {
r.Fatal(err)
}
r.Check(err)
value := obj.(structs.IndexedCARoots)
if ca.ID != value.ActiveRootID {
r.Fatalf("%s != %s", ca.ID, value.ActiveRootID)
}
if len(value.Roots) != 1 {
// There are now 3 CAs because we didn't complete rotation on the original
// 2
if len(value.Roots) != 3 {
r.Fatalf("bad len: %d", len(value.Roots))
}
})
@ -2205,13 +2199,16 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
t.Parallel()
assert := assert.New(t)
require := require.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
// Set CAs
var reply interface{}
ca1 := connect.TestCA(t, nil)
assert.Nil(a.RPC("Test.ConnectCASetRoots", []*structs.CARoot{ca1}, &reply))
// CA already setup by default by NewTestAgent but force a new one so we can
// verify it was signed easily.
ca1 := connect.TestCAConfigSet(t, a, nil)
// Grab the initial cache hit count
cacheHits := a.cache.Hits()
{
// Register a local service
@ -2227,7 +2224,7 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args))
resp := httptest.NewRecorder()
_, err := a.srv.AgentRegisterService(resp, req)
assert.Nil(err)
require.NoError(err)
if !assert.Equal(200, resp.Code) {
t.Log("Body: ", resp.Body.String())
}
@ -2237,23 +2234,86 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/leaf/foo", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
assert.Nil(err)
require.NoError(err)
// Get the issued cert
issued, ok := obj.(*structs.IssuedCert)
assert.True(ok)
// Verify that the cert is signed by the CA
requireLeafValidUnderCA(t, issued, ca1)
// Verify blocking index
assert.True(issued.ModifyIndex > 0)
assert.Equal(fmt.Sprintf("%d", issued.ModifyIndex),
resp.Header().Get("X-Consul-Index"))
// That should've been a cache miss, so no hit change
require.Equal(cacheHits, a.cache.Hits())
// Test caching
{
// Fetch it again
obj2, err := a.srv.AgentConnectCALeafCert(httptest.NewRecorder(), req)
require.NoError(err)
require.Equal(obj, obj2)
// Should cache hit this time and not make request
require.Equal(cacheHits+1, a.cache.Hits())
cacheHits++
}
// Test that caching is updated in the background
{
// Set a new CA
ca := connect.TestCAConfigSet(t, a, nil)
retry.Run(t, func(r *retry.R) {
// Try and sign again (note no index/wait arg since cache should update in
// background even if we aren't actively blocking)
obj, err := a.srv.AgentConnectCALeafCert(httptest.NewRecorder(), req)
r.Check(err)
issued2 := obj.(*structs.IssuedCert)
if issued.CertPEM == issued2.CertPEM {
r.Fatalf("leaf has not updated")
}
// Got a new leaf. Sanity check it's a whole new key as well as differnt
// cert.
if issued.PrivateKeyPEM == issued2.PrivateKeyPEM {
r.Fatalf("new leaf has same private key as before")
}
// Verify that the cert is signed by the new CA
requireLeafValidUnderCA(t, issued2, ca)
})
// Should be a cache hit! The data should've updated in the cache
// in the background so this should've been fetched directly from
// the cache.
if v := a.cache.Hits(); v < cacheHits+1 {
t.Fatalf("expected at least one more cache hit, still at %d", v)
}
cacheHits = a.cache.Hits()
}
}
func requireLeafValidUnderCA(t *testing.T, issued *structs.IssuedCert,
ca *structs.CARoot) {
roots := x509.NewCertPool()
assert.True(roots.AppendCertsFromPEM([]byte(ca1.RootCert)))
require.True(t, roots.AppendCertsFromPEM([]byte(ca.RootCert)))
leaf, err := connect.ParseCert(issued.CertPEM)
assert.Nil(err)
require.NoError(t, err)
_, err = leaf.Verify(x509.VerifyOptions{
Roots: roots,
})
assert.Nil(err)
require.NoError(t, err)
// TODO(mitchellh): verify the private key matches the cert
// Verify the private key matches. tls.LoadX509Keypair does this for us!
_, err = tls.X509KeyPair([]byte(issued.CertPEM), []byte(issued.PrivateKeyPEM))
require.NoError(t, err)
}
func TestAgentConnectProxy(t *testing.T) {

View File

@ -18,7 +18,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil"
@ -28,14 +27,6 @@ import (
"github.com/pascaldekloe/goe/verify"
)
// TestMain is the main entrypoint for `go test`.
func TestMain(m *testing.M) {
// Enable the test RPC endpoints
consul.TestEndpoint()
os.Exit(m.Run())
}
func externalIP() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {

View File

@ -48,7 +48,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// is so that the goroutine doesn't block forever if we return for other
// reasons.
newRootCACh := make(chan error, 1)
go c.waitNewRootCA(newRootCACh, opts.Timeout)
go c.waitNewRootCA(reqReal.Datacenter, newRootCACh, opts.Timeout)
// Get our prior cert (if we had one) and use that to determine our
// expiration time. If no cert exists, we expire immediately since we
@ -110,7 +110,10 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// Request signing
var reply structs.IssuedCert
args := structs.CASignRequest{CSR: csr}
args := structs.CASignRequest{
Datacenter: reqReal.Datacenter,
CSR: csr,
}
if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil {
return result, err
}
@ -139,11 +142,12 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// waitNewRootCA blocks until a new root CA is available or the timeout is
// reached (on timeout ErrTimeout is returned on the channel).
func (c *ConnectCALeaf) waitNewRootCA(ch chan<- error, timeout time.Duration) {
func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error,
timeout time.Duration) {
// Fetch some new roots. This will block until our MinQueryIndex is
// matched or the timeout is reached.
rawRoots, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: "",
Datacenter: datacenter,
QueryOptions: structs.QueryOptions{
MinQueryIndex: atomic.LoadUint64(&c.caIndex),
MaxQueryTime: timeout,
@ -186,7 +190,7 @@ func (c *ConnectCALeaf) waitNewRootCA(ch chan<- error, timeout time.Duration) {
}
// ConnectCALeafRequest is the cache.Request implementation for the
// COnnectCALeaf cache type. This is implemented here and not in structs
// ConnectCALeaf cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded
// directly to any Consul servers.
type ConnectCALeafRequest struct {

View File

@ -39,7 +39,6 @@ var testCACounter uint64
// SigningCert.
func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot {
var result structs.CARoot
result.ID = testUUID(t)
result.Active = true
result.Name = fmt.Sprintf("Test CA %d", atomic.AddUint64(&testCACounter, 1))
@ -86,6 +85,10 @@ func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot {
t.Fatalf("error encoding private key: %s", err)
}
result.RootCert = buf.String()
result.ID, err = CalculateCertFingerprint(result.RootCert)
if err != nil {
t.Fatalf("error generating CA ID fingerprint: %s", err)
}
// If there is a prior CA to cross-sign with, then we need to create that
// and set it as the signing cert.
@ -286,3 +289,47 @@ func testUUID(t testing.T) string {
return ret
}
// TestAgentRPC is an interface that an RPC client must implement. This is a
// helper interface that is implemented by the agent delegate so that test
// helpers can make RPCs without introducing an import cycle on `agent`.
type TestAgentRPC interface {
RPC(method string, args interface{}, reply interface{}) error
}
// TestCAConfigSet sets a CARoot returned by TestCA into the TestAgent state. It
// requires that TestAgent had connect enabled in it's config. If ca is nil, a
// new CA is created.
//
// It returns the CARoot passed or created.
//
// Note that we have to use an interface for the TestAgent.RPC method since we
// can't introduce an import cycle by importing `agent.TestAgent` here directly.
// It also means this will work in a few other places we mock that method.
func TestCAConfigSet(t testing.T, a TestAgentRPC,
ca *structs.CARoot) *structs.CARoot {
t.Helper()
if ca == nil {
ca = TestCA(t, nil)
}
newConfig := &structs.CAConfiguration{
Provider: "consul",
Config: map[string]interface{}{
"PrivateKey": ca.SigningKey,
"RootCert": ca.RootCert,
"RotationPeriod": 180 * 24 * time.Hour,
},
}
args := &structs.CARequest{
Datacenter: "dc1",
Config: newConfig,
}
var reply interface{}
err := a.RPC("ConnectCA.ConfigurationSet", args, &reply)
if err != nil {
t.Fatalf("failed to set test CA config: %s", err)
}
return ca
}

View File

@ -14,7 +14,7 @@ func TestConnectCARoots_empty(t *testing.T) {
t.Parallel()
assert := assert.New(t)
a := NewTestAgent(t.Name(), "")
a := NewTestAgent(t.Name(), "connect { enabled = false }")
defer a.Shutdown()
req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil)
@ -34,13 +34,9 @@ func TestConnectCARoots_list(t *testing.T) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
// Set some CAs
var reply interface{}
ca1 := connect.TestCA(t, nil)
ca1.Active = false
ca2 := connect.TestCA(t, nil)
assert.Nil(a.RPC("Test.ConnectCASetRoots",
[]*structs.CARoot{ca1, ca2}, &reply))
// Set some CAs. Note that NewTestAgent already bootstraps one CA so this just
// adds a second and makes it active.
ca2 := connect.TestCAConfigSet(t, a, nil)
// List
req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil)

View File

@ -272,14 +272,6 @@ func (s *ConnectCA) Sign(
return err
}
provider := s.srv.getCAProvider()
// todo(kyhavlov): more validation on the CSR before signing
pem, err := provider.Sign(csr)
if err != nil {
return err
}
// Parse the SPIFFE ID
spiffeId, err := connect.ParseCertURI(csr.URIs[0])
if err != nil {
@ -289,6 +281,27 @@ func (s *ConnectCA) Sign(
if !ok {
return fmt.Errorf("SPIFFE ID in CSR must be a service ID")
}
provider := s.srv.getCAProvider()
// todo(kyhavlov): more validation on the CSR before signing
pem, err := provider.Sign(csr)
if err != nil {
return err
}
// TODO(banks): when we implement IssuedCerts table we can use the insert to
// that as the raft index to return in response. Right now we can rely on only
// the built-in provider being supported and the implementation detail that we
// have to write a SerialIndex update to the provider config table for every
// cert issued so in all cases this index will be higher than any previous
// sign response. This has to happen after the provider.Sign call to observe
// the index update.
modIdx, _, err := s.srv.fsm.State().CAConfig()
if err != nil {
return err
}
cert, err := connect.ParseCert(pem)
if err != nil {
return err
@ -302,6 +315,10 @@ func (s *ConnectCA) Sign(
ServiceURI: cert.URIs[0].String(),
ValidAfter: cert.NotBefore,
ValidBefore: cert.NotAfter,
RaftIndex: structs.RaftIndex{
ModifyIndex: modIdx,
CreateIndex: modIdx,
},
}
return nil

View File

@ -250,7 +250,7 @@ func (c *ConsulCAProvider) Sign(csr *x509.CertificateRequest) (string, error) {
// Create the certificate, PEM encode it and return that value.
var buf bytes.Buffer
bs, err := x509.CreateCertificate(
rand.Reader, &template, caCert, signer.Public(), signer)
rand.Reader, &template, caCert, csr.PublicKey, signer)
if err != nil {
return "", fmt.Errorf("error generating certificate: %s", err)
}
@ -259,7 +259,10 @@ func (c *ConsulCAProvider) Sign(csr *x509.CertificateRequest) (string, error) {
return "", fmt.Errorf("error encoding private key: %s", err)
}
c.incrementSerialIndex(providerState)
err = c.incrementSerialIndex(providerState)
if err != nil {
return "", err
}
// Set the response
return buf.String(), nil
@ -313,15 +316,19 @@ func (c *ConsulCAProvider) CrossSignCA(cert *x509.Certificate) (string, error) {
return "", fmt.Errorf("error encoding private key: %s", err)
}
c.incrementSerialIndex(providerState)
err = c.incrementSerialIndex(providerState)
if err != nil {
return "", err
}
return buf.String(), nil
}
// incrementSerialIndex increments the cert serial number index in the provider state
// incrementSerialIndex increments the cert serial number index in the provider
// state.
func (c *ConsulCAProvider) incrementSerialIndex(providerState *structs.CAConsulProviderState) error {
newState := *providerState
newState.SerialIndex += 1
newState.SerialIndex++
args := &structs.CARequest{
Op: structs.CAOpSetProviderState,
ProviderState: &newState,

View File

@ -1,26 +0,0 @@
package consul
import (
"sync"
)
// testEndpointsOnce ensures that endpoints for testing are registered once.
var testEndpointsOnce sync.Once
// TestEndpoints registers RPC endpoints specifically for testing. These
// endpoints enable some internal data access that we normally disallow, but
// are useful for modifying server state.
//
// To use this, modify TestMain to call this function prior to running tests.
//
// These should NEVER be registered outside of tests.
//
// NOTE(mitchellh): This was created so that the downstream agent tests can
// modify internal Connect CA state. When the CA plugin work comes in with
// a more complete CA API, this may no longer be necessary and we can remove it.
// That would be ideal.
func TestEndpoint() {
testEndpointsOnce.Do(func() {
registerEndpoint(func(s *Server) interface{} { return &Test{s} })
})
}

View File

@ -1,43 +0,0 @@
package consul
import (
"github.com/hashicorp/consul/agent/structs"
)
// Test is an RPC endpoint that is only available during `go test` when
// `TestEndpoint` is called. This is not and must not ever be available
// during a real running Consul agent, since it this endpoint bypasses
// critical ACL checks.
type Test struct {
// srv is a pointer back to the server.
srv *Server
}
// ConnectCASetRoots sets the current CA roots state.
func (s *Test) ConnectCASetRoots(
args []*structs.CARoot,
reply *interface{}) error {
// Get the highest index
state := s.srv.fsm.State()
idx, _, err := state.CARoots(nil)
if err != nil {
return err
}
// Commit
resp, err := s.srv.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSetRoots,
Index: idx,
Roots: args,
})
if err != nil {
s.srv.logger.Printf("[ERR] consul.test: Apply failed %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}

View File

@ -1,42 +0,0 @@
package consul
import (
"os"
"testing"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
)
// Test setting the CAs
func TestTestConnectCASetRoots(t *testing.T) {
t.Parallel()
assert := assert.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Prepare
ca1 := connect.TestCA(t, nil)
ca2 := connect.TestCA(t, nil)
ca2.Active = false
// Request
args := []*structs.CARoot{ca1, ca2}
var reply interface{}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Test.ConnectCASetRoots", args, &reply))
// Verify they're there
state := s1.fsm.State()
_, actual, err := state.CARoots(nil)
assert.Nil(err)
assert.Len(actual, 2)
}

View File

@ -1,13 +0,0 @@
package consul
import (
"os"
"testing"
)
func TestMain(m *testing.M) {
// Register the test RPC endpoint
TestEndpoint()
os.Exit(m.Run())
}