Pass explicit signal with op for secrets write

Previously the updates to the peering secrets UUID table relied on
inferring what action triggered the update based on a reconciliation
against the existing secrets.

Instead we now explicitly require the operation to be given so that the
inference isn't necessary. This makes the UUID table logic easier to
reason about and fixes some related bugs.

There is also an update so that the peering secrets get handled on
snapshots/restores.
This commit is contained in:
freddygv 2022-08-02 16:20:07 -06:00
parent 9ca687bc7c
commit 60d6e28c97
16 changed files with 830 additions and 483 deletions

View File

@ -720,7 +720,7 @@ func (c *FSM) applyPeeringDelete(buf []byte, index uint64) interface{} {
}
func (c *FSM) applyPeeringSecretsWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringSecrets
var req pbpeering.PeeringSecretsWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode peering write request: %v", err))
}

View File

@ -38,6 +38,7 @@ func init() {
registerRestorer(structs.FreeVirtualIPRequestType, restoreFreeVirtualIP)
registerRestorer(structs.PeeringWriteType, restorePeering)
registerRestorer(structs.PeeringTrustBundleWriteType, restorePeeringTrustBundle)
registerRestorer(structs.PeeringSecretsWriteType, restorePeeringSecrets)
}
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
@ -95,6 +96,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
if err := s.persistPeeringTrustBundles(sink, encoder); err != nil {
return err
}
if err := s.persistPeeringSecrets(sink, encoder); err != nil {
return err
}
return nil
}
@ -582,6 +586,24 @@ func (s *snapshot) persistPeeringTrustBundles(sink raft.SnapshotSink, encoder *c
return nil
}
func (s *snapshot) persistPeeringSecrets(sink raft.SnapshotSink, encoder *codec.Encoder) error {
secrets, err := s.state.PeeringSecrets()
if err != nil {
return err
}
for entry := secrets.Next(); entry != nil; entry = secrets.Next() {
if _, err := sink.Write([]byte{byte(structs.PeeringSecretsWriteType)}); err != nil {
return err
}
if err := encoder.Encode(entry.(*pbpeering.PeeringSecrets)); err != nil {
return err
}
}
return nil
}
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.RegisterRequest
if err := decoder.Decode(&req); err != nil {
@ -906,3 +928,14 @@ func restorePeeringTrustBundle(header *SnapshotHeader, restore *state.Restore, d
}
return nil
}
func restorePeeringSecrets(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req pbpeering.PeeringSecrets
if err := decoder.Decode(&req); err != nil {
return err
}
if err := restore.PeeringSecrets(&req); err != nil {
return err
}
return nil
}

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/testutil"
)
@ -482,6 +483,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Name: "baz",
},
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84",
},
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: "0b7812d4-32d9-4e54-b1b3-4d97084982a0",
ActiveSecretID: "baaeea83-8419-4aa8-ac89-14e7246a3d2f",
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
},
}))
// Peering Trust Bundles
@ -797,6 +811,30 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NotNil(t, prngRestored)
require.Equal(t, "baz", prngRestored.Name)
// Verify peering secrets are restored
secretsRestored, err := fsm2.state.PeeringSecretsRead(nil, "1fabcd52-1d46-49b0-b1d8-71559aee47f5")
require.NoError(t, err)
expectSecrets := &pbpeering.PeeringSecrets{
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84",
},
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: "0b7812d4-32d9-4e54-b1b3-4d97084982a0",
ActiveSecretID: "baaeea83-8419-4aa8-ac89-14e7246a3d2f",
},
}
prototest.AssertDeepEqual(t, expectSecrets, secretsRestored)
uuids := []string{"389bbcdf-1c31-47d6-ae96-f2a3f4c45f84", "0b7812d4-32d9-4e54-b1b3-4d97084982a0", "baaeea83-8419-4aa8-ac89-14e7246a3d2f"}
for _, id := range uuids {
free, err := fsm2.state.ValidateProposedPeeringSecretUUID(id)
require.NoError(t, err)
// The UUIDs in the peering secret should be tracked as in use.
require.False(t, free)
}
// Verify peering trust bundle is restored
idx, ptbRestored, err := fsm2.state.PeeringTrustBundleRead(nil, state.Query{
Value: "qux",

View File

@ -465,8 +465,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
//
// To test this, we start the two peer servers (accepting and dialing), set up peering, and then shut down
// the accepting peer. This terminates the connection without sending a Terminated message.
// We then restart the accepting peer (we actually spin up a new server with the same config and port) and then
// assert that the dialing peer reestablishes the connection.
// We then restart the accepting peer and assert that the dialing peer reestablishes the connection.
func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
@ -579,20 +578,17 @@ func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
// Have to manually shut down the gRPC server otherwise it stays bound to the port.
acceptingServer.externalGRPCServer.Stop()
// Mimic the server restarting by starting a new server with the same config.
// Restart the server by re-using the previous acceptor's data directory and node id.
_, acceptingServerRestart := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptingServer.dc1"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCPort = acceptingServerPort
c.DataDir = acceptingServer.config.DataDir
c.NodeID = acceptingServer.config.NodeID
})
testrpc.WaitForLeader(t, acceptingServerRestart.RPC, "dc1")
// Re-insert the peering state, mimicking a snapshot restore.
require.NoError(t, acceptingServerRestart.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{
Peering: peering.Peering,
Secret: secrets,
}))
testrpc.WaitForLeader(t, acceptingServerRestart.RPC, "dc1")
// The dialing peer should eventually reconnect.
retry.Run(t, func(r *retry.R) {

View File

@ -141,7 +141,7 @@ func (b *PeeringBackend) ValidateProposedPeeringSecret(id string) (bool, error)
return b.srv.fsm.State().ValidateProposedPeeringSecretUUID(id)
}
func (b *PeeringBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecrets) error {
func (b *PeeringBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecretsWriteRequest) error {
_, err := b.srv.raftApplyProtobuf(structs.PeeringSecretsWriteType, req)
return err
}

View File

@ -7,13 +7,12 @@ import (
"strings"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/go-memdb"
)
const (
@ -175,20 +174,22 @@ func peeringSecretsReadByPeerIDTxn(tx ReadTxn, ws memdb.WatchSet, id string) (*p
return secret, nil
}
func (s *Store) PeeringSecretsWrite(idx uint64, secret *pbpeering.PeeringSecrets) error {
func (s *Store) PeeringSecretsWrite(idx uint64, req *pbpeering.PeeringSecretsWriteRequest) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
if err := s.peeringSecretsWriteTxn(tx, secret); err != nil {
if err := s.peeringSecretsWriteTxn(tx, req); err != nil {
return fmt.Errorf("failed to write peering secret: %w", err)
}
return tx.Commit()
}
func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, secret *pbpeering.PeeringSecrets) error {
if secret == nil {
func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, req *pbpeering.PeeringSecretsWriteRequest) error {
if req == nil || req.Secrets == nil {
return nil
}
secret := req.GetSecrets()
if err := secret.Validate(); err != nil {
return err
}
@ -210,24 +211,23 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, secret *pbpeering.PeeringSec
return nil
}
if req.Operation == pbpeering.PeeringSecretsWriteRequest_OPERATION_UNSPECIFIED {
return fmt.Errorf("the operation that triggered the secrets write was not specified")
}
// If the peering token was generated locally, validate that the newly introduced UUID is still unique.
// RPC handlers validate that generated IDs are available, but availability cannot be guaranteed until the state store operation.
var newSecretID string
switch {
// Establishment secrets are written when generating peering tokens, and no other secret IDs are included.
case secret.GetEstablishment() != nil:
newSecretID = secret.GetEstablishment().SecretID
// Stream secrets can be written as:
// - A new PendingSecretID from the ExchangeSecret RPC
// - An ActiveSecretID when promoting a pending secret on first use
case secret.GetStream() != nil:
if pending := secret.GetStream().GetPendingSecretID(); pending != "" {
newSecretID = pending
}
switch req.Operation {
// We do not need to check the long-lived Stream.ActiveSecretID for uniqueness because:
// - In the cluster that generated it the secret is always introduced as a PendingSecretID, then promoted to ActiveSecretID.
// This means that the promoted secret is already known to be unique.
// Establishment secrets are written when generating peering tokens, and no other secret IDs are included.
case pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN:
newSecretID = secret.GetEstablishment().GetSecretID()
// When exchanging an establishment secret a new pending stream secret is generated.
// Active stream secrets doesn't need to be checked for uniqueness because it is only ever promoted from pending.
case pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET:
newSecretID = secret.GetStream().GetPendingSecretID()
}
if newSecretID != "" {
@ -251,45 +251,39 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, secret *pbpeering.PeeringSec
var toDelete []string
if existing != nil {
// Merge in existing stream secrets when persisting a new establishment secret.
// This is to avoid invalidating stream secrets when a new peering token
// is generated.
//
// We purposely DO NOT do the reverse of inheriting an existing establishment secret.
// When exchanging establishment secrets for stream secrets, we invalidate the
// establishment secret by deleting it.
if secret.GetEstablishment() != nil && secret.GetStream() == nil && existing.GetStream() != nil {
secret.Stream = existing.Stream
}
// Collect any overwritten UUIDs for deletion.
//
// Old establishment secret ID are always cleaned up when they don't match.
// They will either be replaced by a new one or deleted in the secret exchange RPC.
existingEstablishment := existing.GetEstablishment().GetSecretID()
if existingEstablishment != "" && secret.GetEstablishment().GetSecretID() != "" && existingEstablishment != secret.GetEstablishment().GetSecretID() {
toDelete = append(toDelete, existingEstablishment)
}
switch req.Operation {
case pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN:
// Merge in existing stream secrets when persisting a new establishment secret.
// This is to avoid invalidating stream secrets when a new peering token
// is generated.
secret.Stream = existing.GetStream()
// Old active secret IDs are always cleaned up when they don't match.
// They are only ever replaced when promoting a pending secret ID.
existingActive := existing.GetStream().GetActiveSecretID()
if existingActive != "" && existingActive != secret.GetStream().GetActiveSecretID() {
toDelete = append(toDelete, existingActive)
}
// When a new token is generated we replace any un-used establishment secrets.
if existingEstablishment := existing.GetEstablishment().GetSecretID(); existingEstablishment != "" {
toDelete = append(toDelete, existingEstablishment)
}
// Pending secrets can change in three ways:
// - Generating a new pending secret: Nothing to delete here since there's no old pending secret being replaced.
// - Re-establishing a peering, and re-generating a pending secret: should delete the old one if both are non-empty.
// - Promoting a pending secret: Nothing to delete here since the pending secret is now active and still in use.
existingPending := existing.GetStream().GetPendingSecretID()
newPending := secret.GetStream().GetPendingSecretID()
if existingPending != "" &&
// The value of newPending indicates whether a peering is being generated/re-established (not empty)
// or whether a pending secret is being promoted (empty).
newPending != "" &&
newPending != existingPending {
toDelete = append(toDelete, existingPending)
case pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET:
// When exchanging an establishment secret we invalidate the existing establishment secret.
if existingEstablishment := existing.GetEstablishment().GetSecretID(); existingEstablishment != "" {
toDelete = append(toDelete, existingEstablishment)
}
// When exchanging an establishment secret unused pending secrets are overwritten.
if existingPending := existing.GetStream().GetPendingSecretID(); existingPending != "" {
toDelete = append(toDelete, existingPending)
}
case pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING:
// Avoid invalidating existing establishment secrets when promoting pending secrets.
secret.Establishment = existing.GetEstablishment()
// If there was previously an active stream secret it gets replaced in favor of the pending secret
// that is being promoted.
if existingActive := existing.GetStream().GetActiveSecretID(); existingActive != "" {
toDelete = append(toDelete, existingActive)
}
}
}
for _, id := range toDelete {
@ -537,7 +531,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
}
// Write any secrets generated with the peering.
err = s.peeringSecretsWriteTxn(tx, req.GetSecret())
err = s.peeringSecretsWriteTxn(tx, req.GetSecretsRequest())
if err != nil {
return fmt.Errorf("failed to write peering establishment secret: %w", err)
}
@ -1102,6 +1096,10 @@ func (s *Snapshot) PeeringTrustBundles() (memdb.ResultIterator, error) {
return s.tx.Get(tablePeeringTrustBundles, indexID)
}
func (s *Snapshot) PeeringSecrets() (memdb.ResultIterator, error) {
return s.tx.Get(tablePeeringSecrets, indexID)
}
func (r *Restore) Peering(p *pbpeering.Peering) error {
if err := r.tx.Insert(tablePeering, p); err != nil {
return fmt.Errorf("failed restoring peering: %w", err)
@ -1124,6 +1122,30 @@ func (r *Restore) PeeringTrustBundle(ptb *pbpeering.PeeringTrustBundle) error {
return nil
}
func (r *Restore) PeeringSecrets(p *pbpeering.PeeringSecrets) error {
if err := r.tx.Insert(tablePeeringSecrets, p); err != nil {
return fmt.Errorf("failed restoring peering secrets: %w", err)
}
var uuids []string
if establishment := p.GetEstablishment().GetSecretID(); establishment != "" {
uuids = append(uuids, establishment)
}
if pending := p.GetStream().GetPendingSecretID(); pending != "" {
uuids = append(uuids, pending)
}
if active := p.GetStream().GetActiveSecretID(); active != "" {
uuids = append(uuids, active)
}
for _, id := range uuids {
if err := r.tx.Insert(tablePeeringSecretUUIDs, id); err != nil {
return fmt.Errorf("failed restoring peering secret UUIDs: %w", err)
}
}
return nil
}
// peersForServiceTxn returns the names of all peers that a service is exported to.
func peersForServiceTxn(
tx ReadTxn,

View File

@ -243,17 +243,17 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
if req.Peering != nil {
require.NoError(t, tx.Insert(tablePeering, req.Peering))
}
if req.Secret != nil {
require.NoError(t, tx.Insert(tablePeeringSecrets, req.Secret))
if secretsReq := req.SecretsRequest; secretsReq != nil {
require.NoError(t, tx.Insert(tablePeeringSecrets, secretsReq.Secrets))
var toInsert []string
if establishment := req.Secret.GetEstablishment().GetSecretID(); establishment != "" {
if establishment := secretsReq.Secrets.GetEstablishment().GetSecretID(); establishment != "" {
toInsert = append(toInsert, establishment)
}
if pending := req.Secret.GetStream().GetPendingSecretID(); pending != "" {
if pending := secretsReq.Secrets.GetStream().GetPendingSecretID(); pending != "" {
toInsert = append(toInsert, pending)
}
if active := req.Secret.GetStream().GetActiveSecretID(); active != "" {
if active := secretsReq.Secrets.GetStream().GetActiveSecretID(); active != "" {
toInsert = append(toInsert, active)
}
for _, id := range toInsert {
@ -273,7 +273,7 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
type testcase struct {
name string
seed *pbpeering.PeeringWriteRequest
input *pbpeering.PeeringSecrets
input *pbpeering.PeeringSecretsWriteRequest
expect *pbpeering.PeeringSecrets
expectUUIDs []string
expectErr string
@ -294,7 +294,7 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
require.NoError(t, err)
// Validate that we read what we expect
secrets, err := s.PeeringSecretsRead(nil, tc.input.PeerID)
secrets, err := s.PeeringSecretsRead(nil, tc.input.GetSecrets().GetPeerID())
require.NoError(t, err)
require.NotNil(t, secrets)
prototest.AssertDeepEqual(t, tc.expect, secrets)
@ -304,23 +304,29 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
}
tcs := []testcase{
{
name: "missing peer id",
input: &pbpeering.PeeringSecrets{},
name: "missing peer id",
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{},
},
expectErr: "missing peer ID",
},
{
name: "no secret IDs were embedded",
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
},
},
expectErr: "no secret IDs were embedded",
},
{
name: "unknown peer id",
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testFooSecretID,
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testFooSecretID,
},
},
},
expectErr: "unknown peering",
@ -334,10 +340,12 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
PeerServerAddresses: []string{"10.0.0.1:5300"},
},
},
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testFooSecretID,
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testFooSecretID,
},
},
},
expect: &pbpeering.PeeringSecrets{
@ -350,25 +358,48 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
expectUUIDs: []string{},
},
{
name: "generate new establishment secret",
name: "unspecified operation",
seed: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
Secret: &pbpeering.PeeringSecrets{
},
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne,
ActiveSecretID: testSecretTwo,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testFooSecretID,
},
},
},
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testSecretThree,
expectErr: "the operation that triggered the secrets write was not specified",
},
{
name: "generate new establishment secret when secrets already existed",
seed: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne,
ActiveSecretID: testSecretTwo,
},
},
},
},
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testSecretThree,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
},
expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
@ -384,25 +415,30 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
expectUUIDs: []string{testSecretOne, testSecretTwo, testSecretThree},
},
{
name: "replace establishment secret",
name: "generate new token to replace establishment secret",
seed: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
Secret: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testSecretOne,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testSecretOne,
},
},
},
},
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
// Two replaces One
SecretID: testSecretTwo,
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
// Two replaces One
SecretID: testSecretTwo,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
},
expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
@ -413,18 +449,21 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
expectUUIDs: []string{testSecretTwo},
},
{
name: "generate new pending secret",
name: "exchange secret to generate new pending secret",
seed: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
},
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne,
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
},
expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
@ -435,25 +474,30 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
expectUUIDs: []string{testSecretOne},
},
{
name: "replace pending secret",
name: "exchange secret replaces pending stream secret",
seed: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
Secret: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne,
},
},
},
},
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
// Two replaces One
PendingSecretID: testSecretTwo,
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
// Two replaces One
PendingSecretID: testSecretTwo,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
},
expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
@ -470,28 +514,40 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
Name: "foo",
ID: testFooPeerID,
},
Secret: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretTwo,
ActiveSecretID: testSecretOne,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testSecretThree,
},
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretTwo,
ActiveSecretID: testSecretOne,
},
},
},
},
input: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
// Two gets promoted over One
ActiveSecretID: testSecretTwo,
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
// Two gets promoted over One
ActiveSecretID: testSecretTwo,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
},
expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
// Establishment secret remains valid when promoting a stream secret.
SecretID: testSecretThree,
},
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testSecretTwo,
},
},
expectUUIDs: []string{testSecretTwo},
expectUUIDs: []string{testSecretTwo, testSecretThree},
},
}
for _, tc := range tcs {
@ -877,11 +933,14 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
Secret: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testBazSecretID,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testBazPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testBazSecretID,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
},
},
expectSecrets: &pbpeering.PeeringSecrets{

View File

@ -99,7 +99,7 @@ type Backend interface {
GetLeaderAddress() string
ValidateProposedPeeringSecret(id string) (bool, error)
PeeringSecretsWrite(req *pbpeering.PeeringSecrets) error
PeeringSecretsWrite(req *pbpeering.PeeringSecretsWriteRequest) error
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
CatalogRegister(req *structs.RegisterRequest) error

View File

@ -25,13 +25,16 @@ func TestServer_ExchangeSecret(t *testing.T) {
var secret string
testutil.RunStep(t, "known establishment secret is accepted", func(t *testing.T) {
require.NoError(t, store.PeeringSecretsWrite(1, &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{SecretID: testEstablishmentSecretID},
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testActiveStreamSecretID,
// First write the establishment secret so that it can be exchanged
require.NoError(t, store.PeeringSecretsWrite(1, &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{SecretID: testEstablishmentSecretID},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
}))
// Exchange the now-valid establishment secret for a stream secret
resp, err := srv.ExchangeSecret(context.Background(), &pbpeerstream.ExchangeSecretRequest{
PeerID: testPeerID,
EstablishmentSecret: testEstablishmentSecretID,
@ -47,8 +50,5 @@ func TestServer_ExchangeSecret(t *testing.T) {
require.NoError(t, err)
require.Equal(t, secret, s.GetStream().GetPendingSecretID())
// Active stream secret persists until pending secret is promoted during peering establishment.
require.Equal(t, testActiveStreamSecretID, s.GetStream().GetActiveSecretID())
})
}

View File

@ -77,20 +77,23 @@ func (s *Server) ExchangeSecret(ctx context.Context, req *pbpeerstream.ExchangeS
return nil, grpcstatus.Errorf(codes.Internal, "failed to generate peering stream secret: %v", err)
}
secrets := &pbpeering.PeeringSecrets{
PeerID: req.PeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
// Overwriting any existing un-utilized pending stream secret.
PendingSecretID: id,
writeReq := &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: req.PeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
// Overwriting any existing un-utilized pending stream secret.
PendingSecretID: id,
// If there is an active stream secret ID it is NOT invalidated here.
// It remains active until the pending secret ID is used and promoted to active.
// This allows dialing clusters with the active stream secret to continue to dial successfully until they
// receive the new secret.
ActiveSecretID: existing.GetStream().GetActiveSecretID(),
// If there is an active stream secret ID it is NOT invalidated here.
// It remains active until the pending secret ID is used and promoted to active.
// This allows dialing clusters with the active stream secret to continue to dial successfully until they
// receive the new secret.
ActiveSecretID: existing.GetStream().GetActiveSecretID(),
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
}
err = s.Backend.PeeringSecretsWrite(secrets)
err = s.Backend.PeeringSecretsWrite(writeReq)
if err != nil {
return nil, grpcstatus.Errorf(codes.Internal, "failed to persist peering secret: %v", err)
}
@ -191,15 +194,18 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
}
authorized = true
promoted := &pbpeering.PeeringSecrets{
PeerID: req.PeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: pending,
promoted := &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: req.PeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: pending,
// The PendingSecretID is intentionally zeroed out since we want to avoid re-triggering this
// promotion process with the same pending secret.
PendingSecretID: "",
// The PendingSecretID is intentionally zeroed out since we want to avoid re-triggering this
// promotion process with the same pending secret.
PendingSecretID: "",
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
}
err = s.Backend.PeeringSecretsWrite(promoted)
if err != nil {

View File

@ -249,11 +249,14 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
Name: "foo",
ID: testPeerID,
},
Secret: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testActiveStreamSecretID,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testActiveStreamSecretID,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
},
},
input: &pbpeerstream.ReplicationMessage{
@ -273,11 +276,14 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
Name: "foo",
ID: testPeerID,
},
Secret: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testActiveStreamSecretID,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: testActiveStreamSecretID,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
},
},
input: &pbpeerstream.ReplicationMessage{
@ -296,11 +302,14 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
Name: "foo",
ID: testPeerID,
},
Secret: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testPendingStreamSecretID,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testPendingStreamSecretID,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
},
},
input: &pbpeerstream.ReplicationMessage{
@ -1390,7 +1399,7 @@ func (b *testStreamBackend) ValidateProposedPeeringSecret(id string) (bool, erro
return true, nil
}
func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecrets) error {
func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecretsWriteRequest) error {
return b.store.PeeringSecretsWrite(1, req)
}
@ -1633,11 +1642,14 @@ func writeTestPeering(t *testing.T, store *state.Store, idx uint64, peerName, re
}
require.NoError(t, store.PeeringWrite(idx, &pbpeering.PeeringWriteRequest{
Peering: &peering,
Secret: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testPendingStreamSecretID,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testPendingStreamSecretID,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
},
}))

View File

@ -260,11 +260,14 @@ func (s *Server) GenerateToken(
writeReq := &pbpeering.PeeringWriteRequest{
Peering: peering,
Secret: &pbpeering.PeeringSecrets{
PeerID: peering.ID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: secretID,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: peering.ID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: secretID,
},
},
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
},
}
if err := s.Backend.PeeringWrite(writeReq); err != nil {
@ -439,10 +442,12 @@ func (s *Server) Establish(
// RemotePeerID(PeerID) but at this point the other peer does not.
writeReq := &pbpeering.PeeringWriteRequest{
Peering: peering,
Secret: &pbpeering.PeeringSecrets{
PeerID: peering.ID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: exchangeResp.StreamSecret,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: peering.ID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: exchangeResp.StreamSecret,
},
},
},
}

View File

@ -7,6 +7,16 @@ import (
"github.com/golang/protobuf/proto"
)
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringSecretsWriteRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *PeeringSecretsWriteRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringSecrets) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -53,6 +53,24 @@ enum PeeringState {
TERMINATED = 6;
}
message PeeringSecretsWriteRequest {
enum Operation {
OPERATION_UNSPECIFIED = 0;
OPERATION_GENERATETOKEN = 1;
OPERATION_EXCHANGESECRET = 2;
OPERATION_PROMOTEPENDING = 3;
}
// Secret contains the peering secrets to write.
PeeringSecrets secrets = 1;
// Operation defines which action triggered the secrets write.
Operation operation = 2;
}
// PeeringSecrets defines a secret used for authenticating/authorizing peer clusters.
message PeeringSecrets {
// PeerID is the local UUID of the peering this secret was generated for.
@ -195,10 +213,10 @@ message PeeringWriteRequest {
// Peering is the peering to write with the request.
Peering Peering = 1;
// PeeringSecrets contains the optional peering secrets to persist
// Secret contains the optional peering secrets to persist
// with the peering. Peering secrets are not embedded in the peering
// object to avoid leaking them.
PeeringSecrets Secret = 2;
PeeringSecretsWriteRequest SecretsRequest = 2;
map<string, string> Meta = 3;
}

View File

@ -21,7 +21,7 @@ func testLogLevel() hclog.Level {
if level != hclog.NoLevel {
return level
}
return hclog.Warn
return hclog.Trace
}
func Logger(t TestingTB) hclog.InterceptLogger {