agent/consul: CAS operations for setting the CA root

This commit is contained in:
Mitchell Hashimoto 2018-03-21 10:10:53 -07:00
parent 578db06600
commit e40afd6a73
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
12 changed files with 259 additions and 44 deletions

View File

@ -16,6 +16,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
@ -25,6 +26,14 @@ import (
"github.com/pascaldekloe/goe/verify" "github.com/pascaldekloe/goe/verify"
) )
// TestMain is the main entrypoint for `go test`.
func TestMain(m *testing.M) {
// Enable the test RPC endpoints
consul.TestEndpoint()
os.Exit(m.Run())
}
func externalIP() (string, error) { func externalIP() (string, error) {
addrs, err := net.InterfaceAddrs() addrs, err := net.InterfaceAddrs()
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -25,3 +26,23 @@ func TestConnectCARoots_empty(t *testing.T) {
assert.Equal(value.ActiveRootID, "") assert.Equal(value.ActiveRootID, "")
assert.Len(value.Roots, 0) assert.Len(value.Roots, 0)
} }
func TestConnectCARoots_list(t *testing.T) {
t.Parallel()
assert := assert.New(t)
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
state := consul.TestServerState(a.Agent.delegate.(*consul.Server))
t.Log(state.CARoots(nil))
req, _ := http.NewRequest("GET", "/v1/connect/ca/roots", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.ConnectCARoots(resp, req)
assert.Nil(err)
value := obj.(structs.IndexedCARoots)
assert.Equal(value.ActiveRootID, "")
assert.Len(value.Roots, 0)
}

View File

@ -99,6 +99,9 @@ func (s *ConnectCA) Sign(
if err != nil { if err != nil {
return err return err
} }
if root == nil {
return fmt.Errorf("no active CA found")
}
// Determine the signing certificate. It is the set signing cert // Determine the signing certificate. It is the set signing cert
// unless that is empty, in which case it is identically to the public // unless that is empty, in which case it is identically to the public

View File

@ -30,8 +30,9 @@ func TestConnectCARoots(t *testing.T) {
ca1 := connect.TestCA(t, nil) ca1 := connect.TestCA(t, nil)
ca2 := connect.TestCA(t, nil) ca2 := connect.TestCA(t, nil)
ca2.Active = false ca2.Active = false
assert.Nil(state.CARootSet(1, ca1)) ok, err := state.CARootSetCAS(1, 0, []*structs.CARoot{ca1, ca2})
assert.Nil(state.CARootSet(2, ca2)) assert.True(ok)
assert.Nil(err)
// Request // Request
args := &structs.DCSpecificRequest{ args := &structs.DCSpecificRequest{
@ -70,7 +71,9 @@ func TestConnectCASign(t *testing.T) {
// Insert a CA // Insert a CA
state := s1.fsm.State() state := s1.fsm.State()
ca := connect.TestCA(t, nil) ca := connect.TestCA(t, nil)
assert.Nil(state.CARootSet(1, ca)) ok, err := state.CARootSetCAS(1, 0, []*structs.CARoot{ca})
assert.True(ok)
assert.Nil(err)
// Generate a CSR and request signing // Generate a CSR and request signing
args := &structs.CASignRequest{ args := &structs.CASignRequest{

View File

@ -21,6 +21,7 @@ func init() {
registerCommand(structs.TxnRequestType, (*FSM).applyTxn) registerCommand(structs.TxnRequestType, (*FSM).applyTxn)
registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate) registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate)
registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation) registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation)
registerCommand(structs.ConnectCARequestType, (*FSM).applyConnectCAOperation)
} }
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -269,3 +270,28 @@ func (c *FSM) applyIntentionOperation(buf []byte, index uint64) interface{} {
return fmt.Errorf("Invalid Intention operation '%s'", req.Op) return fmt.Errorf("Invalid Intention operation '%s'", req.Op)
} }
} }
// applyConnectCAOperation applies the given CA operation to the state store.
func (c *FSM) applyConnectCAOperation(buf []byte, index uint64) interface{} {
var req structs.CARequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "ca"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "ca"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.CAOpSet:
act, err := c.state.CARootSetCAS(index, req.Index, req.Roots)
if err != nil {
return err
}
return act
default:
c.logger.Printf("[WARN] consul.fsm: Invalid CA operation '%s'", req.Op)
return fmt.Errorf("Invalid CA operation '%s'", req.Op)
}
}

View File

@ -73,52 +73,58 @@ func (s *Store) CARootActive(ws memdb.WatchSet) (uint64, *structs.CARoot, error)
return idx, result, err return idx, result, err
} }
// CARootSet creates or updates a CA root. // CARootSetCAS sets the current CA root state using a check-and-set operation.
// On success, this will replace the previous set of CARoots completely with
// the given set of roots.
// //
// NOTE(mitchellh): I have a feeling we'll want a CARootMultiSetCAS to // The first boolean result returns whether the transaction succeeded or not.
// perform a check-and-set on the entire set of CARoots versus an individual func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, error) {
// set, since we'll want to modify them atomically during events such as
// rotation.
func (s *Store) CARootSet(idx uint64, v *structs.CARoot) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
if err := s.caRootSetTxn(tx, idx, v); err != nil { // Get the current max index
return err if midx := maxIndexTxn(tx, caRootTableName); midx != cidx {
return false, nil
}
// Go through and find any existing matching CAs so we can preserve and
// update their Create/ModifyIndex values.
for _, r := range rs {
if r.ID == "" {
return false, ErrMissingCARootID
}
existing, err := tx.First(caRootTableName, "id", r.ID)
if err != nil {
return false, fmt.Errorf("failed CA root lookup: %s", err)
}
if existing != nil {
r.CreateIndex = existing.(*structs.CARoot).CreateIndex
} else {
r.CreateIndex = idx
}
r.ModifyIndex = idx
}
// Delete all
_, err := tx.DeleteAll(caRootTableName, "id")
if err != nil {
return false, err
}
// Insert all
for _, r := range rs {
if err := tx.Insert(caRootTableName, r); err != nil {
return false, err
}
}
// Update the index
if err := tx.Insert("index", &IndexEntry{caRootTableName, idx}); err != nil {
return false, fmt.Errorf("failed updating index: %s", err)
} }
tx.Commit() tx.Commit()
return nil return true, nil
}
// caRootSetTxn is the inner method used to insert or update a CA root with
// the proper indexes into the state store.
func (s *Store) caRootSetTxn(tx *memdb.Txn, idx uint64, v *structs.CARoot) error {
// ID is required
if v.ID == "" {
return ErrMissingCARootID
}
// Check for an existing value
existing, err := tx.First(caRootTableName, "id", v.ID)
if err != nil {
return fmt.Errorf("failed CA root lookup: %s", err)
}
if existing != nil {
old := existing.(*structs.CARoot)
v.CreateIndex = old.CreateIndex
} else {
v.CreateIndex = idx
}
v.ModifyIndex = idx
// Insert
if err := tx.Insert(caRootTableName, v); err != nil {
return err
}
if err := tx.Insert("index", &IndexEntry{caRootTableName, idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
} }

26
agent/consul/testing.go Normal file
View File

@ -0,0 +1,26 @@
package consul
import (
"sync"
)
// testEndpointsOnce ensures that endpoints for testing are registered once.
var testEndpointsOnce sync.Once
// TestEndpoints registers RPC endpoints specifically for testing. These
// endpoints enable some internal data access that we normally disallow, but
// are useful for modifying server state.
//
// To use this, modify TestMain to call this function prior to running tests.
//
// These should NEVER be registered outside of tests.
//
// NOTE(mitchellh): This was created so that the downstream agent tests can
// modify internal Connect CA state. When the CA plugin work comes in with
// a more complete CA API, this may no longer be necessary and we can remove it.
// That would be ideal.
func TestEndpoint() {
testEndpointsOnce.Do(func() {
registerEndpoint(func(s *Server) interface{} { return &Test{s} })
})
}

View File

@ -0,0 +1,43 @@
package consul
import (
"github.com/hashicorp/consul/agent/structs"
)
// Test is an RPC endpoint that is only available during `go test` when
// `TestEndpoint` is called. This is not and must not ever be available
// during a real running Consul agent, since it this endpoint bypasses
// critical ACL checks.
type Test struct {
// srv is a pointer back to the server.
srv *Server
}
// ConnectCASetRoots sets the current CA roots state.
func (s *Test) ConnectCASetRoots(
args []*structs.CARoot,
reply *interface{}) error {
// Get the highest index
state := s.srv.fsm.State()
idx, _, err := state.CARoots(nil)
if err != nil {
return err
}
// Commit
resp, err := s.srv.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSet,
Index: idx,
Roots: args,
})
if err != nil {
s.srv.logger.Printf("[ERR] consul.test: Apply failed %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}

View File

@ -0,0 +1,42 @@
package consul
import (
"os"
"testing"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
)
// Test setting the CAs
func TestTestConnectCASetRoots(t *testing.T) {
t.Parallel()
assert := assert.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Prepare
ca1 := connect.TestCA(t, nil)
ca2 := connect.TestCA(t, nil)
ca2.Active = false
// Request
args := []*structs.CARoot{ca1, ca2}
var reply interface{}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Test.ConnectCASetRoots", args, &reply))
// Verify they're there
state := s1.fsm.State()
_, actual, err := state.CARoots(nil)
assert.Nil(err)
assert.Len(actual, 2)
}

View File

@ -0,0 +1,13 @@
package consul
import (
"os"
"testing"
)
func TestMain(m *testing.M) {
// Register the test RPC endpoint
TestEndpoint()
os.Exit(m.Run())
}

View File

@ -76,3 +76,25 @@ type IssuedCert struct {
// state store, but is present in the sign API response. // state store, but is present in the sign API response.
Cert string `json:",omitempty"` Cert string `json:",omitempty"`
} }
// CAOp is the operation for a request related to intentions.
type CAOp string
const (
CAOpSet CAOp = "set"
)
// CARequest is used to modify connect CA data. This is used by the
// FSM (agent/consul/fsm) to apply changes.
type CARequest struct {
// Op is the type of operation being requested. This determines what
// other fields are required.
Op CAOp
// Index is used by CAOpSet for a CAS operation.
Index uint64
// Roots is a list of roots. This is used for CAOpSet. One root must
// always be active.
Roots []*CARoot
}

View File

@ -41,6 +41,7 @@ const (
AreaRequestType = 10 AreaRequestType = 10
ACLBootstrapRequestType = 11 // FSM snapshots only. ACLBootstrapRequestType = 11 // FSM snapshots only.
IntentionRequestType = 12 IntentionRequestType = 12
ConnectCARequestType = 13
) )
const ( const (