From e40afd6a73e4ceac373aa9210380c541471defc3 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Wed, 21 Mar 2018 10:10:53 -0700 Subject: [PATCH] agent/consul: CAS operations for setting the CA root --- agent/agent_test.go | 9 +++ agent/connect_ca_endpoint_test.go | 21 ++++++ agent/consul/connect_ca_endpoint.go | 3 + agent/consul/connect_ca_endpoint_test.go | 9 ++- agent/consul/fsm/commands_oss.go | 26 +++++++ agent/consul/state/connect_ca.go | 88 +++++++++++++----------- agent/consul/testing.go | 26 +++++++ agent/consul/testing_endpoint.go | 43 ++++++++++++ agent/consul/testing_endpoint_test.go | 42 +++++++++++ agent/consul/testing_test.go | 13 ++++ agent/structs/connect_ca.go | 22 ++++++ agent/structs/structs.go | 1 + 12 files changed, 259 insertions(+), 44 deletions(-) create mode 100644 agent/consul/testing.go create mode 100644 agent/consul/testing_endpoint.go create mode 100644 agent/consul/testing_endpoint_test.go create mode 100644 agent/consul/testing_test.go diff --git a/agent/agent_test.go b/agent/agent_test.go index 58ada5561d..df1593bd90 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/hashicorp/consul/agent/checks" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil" @@ -25,6 +26,14 @@ import ( "github.com/pascaldekloe/goe/verify" ) +// TestMain is the main entrypoint for `go test`. +func TestMain(m *testing.M) { + // Enable the test RPC endpoints + consul.TestEndpoint() + + os.Exit(m.Run()) +} + func externalIP() (string, error) { addrs, err := net.InterfaceAddrs() if err != nil { diff --git a/agent/connect_ca_endpoint_test.go b/agent/connect_ca_endpoint_test.go index ee30f57a90..cec8382c08 100644 --- a/agent/connect_ca_endpoint_test.go +++ b/agent/connect_ca_endpoint_test.go @@ -5,6 +5,7 @@ import ( "net/http/httptest" "testing" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/assert" ) @@ -25,3 +26,23 @@ func TestConnectCARoots_empty(t *testing.T) { assert.Equal(value.ActiveRootID, "") assert.Len(value.Roots, 0) } + +func TestConnectCARoots_list(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + state := consul.TestServerState(a.Agent.delegate.(*consul.Server)) + t.Log(state.CARoots(nil)) + + req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.ConnectCARoots(resp, req) + assert.Nil(err) + + value := obj.(structs.IndexedCARoots) + assert.Equal(value.ActiveRootID, "") + assert.Len(value.Roots, 0) +} diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index 1702c87403..d6ddaef58b 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -99,6 +99,9 @@ func (s *ConnectCA) Sign( if err != nil { return err } + if root == nil { + return fmt.Errorf("no active CA found") + } // Determine the signing certificate. It is the set signing cert // unless that is empty, in which case it is identically to the public diff --git a/agent/consul/connect_ca_endpoint_test.go b/agent/consul/connect_ca_endpoint_test.go index 375d751152..8a3f1b4f25 100644 --- a/agent/consul/connect_ca_endpoint_test.go +++ b/agent/consul/connect_ca_endpoint_test.go @@ -30,8 +30,9 @@ func TestConnectCARoots(t *testing.T) { ca1 := connect.TestCA(t, nil) ca2 := connect.TestCA(t, nil) ca2.Active = false - assert.Nil(state.CARootSet(1, ca1)) - assert.Nil(state.CARootSet(2, ca2)) + ok, err := state.CARootSetCAS(1, 0, []*structs.CARoot{ca1, ca2}) + assert.True(ok) + assert.Nil(err) // Request args := &structs.DCSpecificRequest{ @@ -70,7 +71,9 @@ func TestConnectCASign(t *testing.T) { // Insert a CA state := s1.fsm.State() ca := connect.TestCA(t, nil) - assert.Nil(state.CARootSet(1, ca)) + ok, err := state.CARootSetCAS(1, 0, []*structs.CARoot{ca}) + assert.True(ok) + assert.Nil(err) // Generate a CSR and request signing args := &structs.CASignRequest{ diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 51f127899d..2d26277484 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -21,6 +21,7 @@ func init() { registerCommand(structs.TxnRequestType, (*FSM).applyTxn) registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate) registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation) + registerCommand(structs.ConnectCARequestType, (*FSM).applyConnectCAOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -269,3 +270,28 @@ func (c *FSM) applyIntentionOperation(buf []byte, index uint64) interface{} { return fmt.Errorf("Invalid Intention operation '%s'", req.Op) } } + +// applyConnectCAOperation applies the given CA operation to the state store. +func (c *FSM) applyConnectCAOperation(buf []byte, index uint64) interface{} { + var req structs.CARequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "ca"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + defer metrics.MeasureSinceWithLabels([]string{"fsm", "ca"}, time.Now(), + []metrics.Label{{Name: "op", Value: string(req.Op)}}) + switch req.Op { + case structs.CAOpSet: + act, err := c.state.CARootSetCAS(index, req.Index, req.Roots) + if err != nil { + return err + } + + return act + default: + c.logger.Printf("[WARN] consul.fsm: Invalid CA operation '%s'", req.Op) + return fmt.Errorf("Invalid CA operation '%s'", req.Op) + } +} diff --git a/agent/consul/state/connect_ca.go b/agent/consul/state/connect_ca.go index 9c19b65c2b..3b66a07c6d 100644 --- a/agent/consul/state/connect_ca.go +++ b/agent/consul/state/connect_ca.go @@ -73,52 +73,58 @@ func (s *Store) CARootActive(ws memdb.WatchSet) (uint64, *structs.CARoot, error) return idx, result, err } -// CARootSet creates or updates a CA root. +// CARootSetCAS sets the current CA root state using a check-and-set operation. +// On success, this will replace the previous set of CARoots completely with +// the given set of roots. // -// NOTE(mitchellh): I have a feeling we'll want a CARootMultiSetCAS to -// perform a check-and-set on the entire set of CARoots versus an individual -// set, since we'll want to modify them atomically during events such as -// rotation. -func (s *Store) CARootSet(idx uint64, v *structs.CARoot) error { +// The first boolean result returns whether the transaction succeeded or not. +func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() - if err := s.caRootSetTxn(tx, idx, v); err != nil { - return err + // Get the current max index + if midx := maxIndexTxn(tx, caRootTableName); midx != cidx { + return false, nil + } + + // Go through and find any existing matching CAs so we can preserve and + // update their Create/ModifyIndex values. + for _, r := range rs { + if r.ID == "" { + return false, ErrMissingCARootID + } + + existing, err := tx.First(caRootTableName, "id", r.ID) + if err != nil { + return false, fmt.Errorf("failed CA root lookup: %s", err) + } + + if existing != nil { + r.CreateIndex = existing.(*structs.CARoot).CreateIndex + } else { + r.CreateIndex = idx + } + r.ModifyIndex = idx + } + + // Delete all + _, err := tx.DeleteAll(caRootTableName, "id") + if err != nil { + return false, err + } + + // Insert all + for _, r := range rs { + if err := tx.Insert(caRootTableName, r); err != nil { + return false, err + } + } + + // Update the index + if err := tx.Insert("index", &IndexEntry{caRootTableName, idx}); err != nil { + return false, fmt.Errorf("failed updating index: %s", err) } tx.Commit() - return nil -} - -// caRootSetTxn is the inner method used to insert or update a CA root with -// the proper indexes into the state store. -func (s *Store) caRootSetTxn(tx *memdb.Txn, idx uint64, v *structs.CARoot) error { - // ID is required - if v.ID == "" { - return ErrMissingCARootID - } - - // Check for an existing value - existing, err := tx.First(caRootTableName, "id", v.ID) - if err != nil { - return fmt.Errorf("failed CA root lookup: %s", err) - } - if existing != nil { - old := existing.(*structs.CARoot) - v.CreateIndex = old.CreateIndex - } else { - v.CreateIndex = idx - } - v.ModifyIndex = idx - - // Insert - if err := tx.Insert(caRootTableName, v); err != nil { - return err - } - if err := tx.Insert("index", &IndexEntry{caRootTableName, idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - return nil + return true, nil } diff --git a/agent/consul/testing.go b/agent/consul/testing.go new file mode 100644 index 0000000000..afae7c1a19 --- /dev/null +++ b/agent/consul/testing.go @@ -0,0 +1,26 @@ +package consul + +import ( + "sync" +) + +// testEndpointsOnce ensures that endpoints for testing are registered once. +var testEndpointsOnce sync.Once + +// TestEndpoints registers RPC endpoints specifically for testing. These +// endpoints enable some internal data access that we normally disallow, but +// are useful for modifying server state. +// +// To use this, modify TestMain to call this function prior to running tests. +// +// These should NEVER be registered outside of tests. +// +// NOTE(mitchellh): This was created so that the downstream agent tests can +// modify internal Connect CA state. When the CA plugin work comes in with +// a more complete CA API, this may no longer be necessary and we can remove it. +// That would be ideal. +func TestEndpoint() { + testEndpointsOnce.Do(func() { + registerEndpoint(func(s *Server) interface{} { return &Test{s} }) + }) +} diff --git a/agent/consul/testing_endpoint.go b/agent/consul/testing_endpoint.go new file mode 100644 index 0000000000..e47e0e7378 --- /dev/null +++ b/agent/consul/testing_endpoint.go @@ -0,0 +1,43 @@ +package consul + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +// Test is an RPC endpoint that is only available during `go test` when +// `TestEndpoint` is called. This is not and must not ever be available +// during a real running Consul agent, since it this endpoint bypasses +// critical ACL checks. +type Test struct { + // srv is a pointer back to the server. + srv *Server +} + +// ConnectCASetRoots sets the current CA roots state. +func (s *Test) ConnectCASetRoots( + args []*structs.CARoot, + reply *interface{}) error { + + // Get the highest index + state := s.srv.fsm.State() + idx, _, err := state.CARoots(nil) + if err != nil { + return err + } + + // Commit + resp, err := s.srv.raftApply(structs.ConnectCARequestType, &structs.CARequest{ + Op: structs.CAOpSet, + Index: idx, + Roots: args, + }) + if err != nil { + s.srv.logger.Printf("[ERR] consul.test: Apply failed %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + return nil +} diff --git a/agent/consul/testing_endpoint_test.go b/agent/consul/testing_endpoint_test.go new file mode 100644 index 0000000000..e202136955 --- /dev/null +++ b/agent/consul/testing_endpoint_test.go @@ -0,0 +1,42 @@ +package consul + +import ( + "os" + "testing" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/assert" +) + +// Test setting the CAs +func TestTestConnectCASetRoots(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Prepare + ca1 := connect.TestCA(t, nil) + ca2 := connect.TestCA(t, nil) + ca2.Active = false + + // Request + args := []*structs.CARoot{ca1, ca2} + var reply interface{} + assert.Nil(msgpackrpc.CallWithCodec(codec, "Test.ConnectCASetRoots", args, &reply)) + + // Verify they're there + state := s1.fsm.State() + _, actual, err := state.CARoots(nil) + assert.Nil(err) + assert.Len(actual, 2) +} diff --git a/agent/consul/testing_test.go b/agent/consul/testing_test.go new file mode 100644 index 0000000000..98e8dd7431 --- /dev/null +++ b/agent/consul/testing_test.go @@ -0,0 +1,13 @@ +package consul + +import ( + "os" + "testing" +) + +func TestMain(m *testing.M) { + // Register the test RPC endpoint + TestEndpoint() + + os.Exit(m.Run()) +} diff --git a/agent/structs/connect_ca.go b/agent/structs/connect_ca.go index 6723d9b984..0437b27cf9 100644 --- a/agent/structs/connect_ca.go +++ b/agent/structs/connect_ca.go @@ -76,3 +76,25 @@ type IssuedCert struct { // state store, but is present in the sign API response. Cert string `json:",omitempty"` } + +// CAOp is the operation for a request related to intentions. +type CAOp string + +const ( + CAOpSet CAOp = "set" +) + +// CARequest is used to modify connect CA data. This is used by the +// FSM (agent/consul/fsm) to apply changes. +type CARequest struct { + // Op is the type of operation being requested. This determines what + // other fields are required. + Op CAOp + + // Index is used by CAOpSet for a CAS operation. + Index uint64 + + // Roots is a list of roots. This is used for CAOpSet. One root must + // always be active. + Roots []*CARoot +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 95c0ba069d..a4e9422308 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -41,6 +41,7 @@ const ( AreaRequestType = 10 ACLBootstrapRequestType = 11 // FSM snapshots only. IntentionRequestType = 12 + ConnectCARequestType = 13 ) const (