Use proto message for each secrets write op

Previously there was a field indicating the operation that triggered a
secrets write. Now there is a message for each operation and it contains
the secret ID being persisted.
This commit is contained in:
freddygv 2022-08-08 01:41:00 -06:00
parent 8067890787
commit c04515a844
15 changed files with 1453 additions and 792 deletions

View File

@ -720,9 +720,9 @@ func (c *FSM) applyPeeringDelete(buf []byte, index uint64) interface{} {
} }
func (c *FSM) applyPeeringSecretsWrite(buf []byte, index uint64) interface{} { func (c *FSM) applyPeeringSecretsWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringSecretsWriteRequest var req pbpeering.SecretsWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil { if err := structs.DecodeProto(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode peering write request: %v", err)) panic(fmt.Errorf("failed to decode peering secrets write request: %v", err))
} }
defer metrics.MeasureSinceWithLabels([]string{"fsm", "peering_secrets"}, time.Now(), defer metrics.MeasureSinceWithLabels([]string{"fsm", "peering_secrets"}, time.Now(),

View File

@ -483,18 +483,13 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5", ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Name: "baz", Name: "baz",
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ SecretsRequest: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5", Request: &pbpeering.SecretsWriteRequest_PromotePending{
Establishment: &pbpeering.PeeringSecrets_Establishment{ PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
SecretID: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84", ActiveStreamSecret: "baaeea83-8419-4aa8-ac89-14e7246a3d2f",
},
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: "0b7812d4-32d9-4e54-b1b3-4d97084982a0",
ActiveSecretID: "baaeea83-8419-4aa8-ac89-14e7246a3d2f",
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
}, },
})) }))
@ -505,6 +500,27 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
RootPEMs: []string{"qux certificate bundle"}, RootPEMs: []string{"qux certificate bundle"},
})) }))
// Issue two more secrets writes so that there are three secrets associated with the peering:
// - Establishment: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84"
// - Pending: "0b7812d4-32d9-4e54-b1b3-4d97084982a0"
// - Active: "baaeea83-8419-4aa8-ac89-14e7246a3d2f"
require.NoError(t, fsm.state.PeeringSecretsWrite(34, &pbpeering.SecretsWriteRequest{
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
PendingStreamSecret: "0b7812d4-32d9-4e54-b1b3-4d97084982a0",
},
},
}))
require.NoError(t, fsm.state.PeeringSecretsWrite(33, &pbpeering.SecretsWriteRequest{
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
EstablishmentSecret: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84",
},
},
}))
// Snapshot // Snapshot
snap, err := fsm.Snapshot() snap, err := fsm.Snapshot()
require.NoError(t, err) require.NoError(t, err)

View File

@ -141,7 +141,7 @@ func (b *PeeringBackend) ValidateProposedPeeringSecret(id string) (bool, error)
return b.srv.fsm.State().ValidateProposedPeeringSecretUUID(id) return b.srv.fsm.State().ValidateProposedPeeringSecretUUID(id)
} }
func (b *PeeringBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecretsWriteRequest) error { func (b *PeeringBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error {
_, err := b.srv.raftApplyProtobuf(structs.PeeringSecretsWriteType, req) _, err := b.srv.raftApplyProtobuf(structs.PeeringSecretsWriteType, req)
return err return err
} }

View File

@ -174,7 +174,7 @@ func peeringSecretsReadByPeerIDTxn(tx ReadTxn, ws memdb.WatchSet, id string) (*p
return secret, nil return secret, nil
} }
func (s *Store) PeeringSecretsWrite(idx uint64, req *pbpeering.PeeringSecretsWriteRequest) error { func (s *Store) PeeringSecretsWrite(idx uint64, req *pbpeering.SecretsWriteRequest) error {
tx := s.db.WriteTxn(idx) tx := s.db.WriteTxn(idx)
defer tx.Abort() defer tx.Abort()
@ -184,50 +184,55 @@ func (s *Store) PeeringSecretsWrite(idx uint64, req *pbpeering.PeeringSecretsWri
return tx.Commit() return tx.Commit()
} }
func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, req *pbpeering.PeeringSecretsWriteRequest) error { func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, req *pbpeering.SecretsWriteRequest) error {
if req == nil || req.Secrets == nil { if req == nil || req.Request == nil {
return nil return nil
} }
if err := req.Validate(); err != nil {
secret := req.GetSecrets() return fmt.Errorf("invalid secret write request: %w", err)
if err := secret.Validate(); err != nil {
return err
} }
peering, err := peeringReadByIDTxn(tx, nil, secret.PeerID) peering, err := peeringReadByIDTxn(tx, nil, req.PeerID)
if err != nil { if err != nil {
return fmt.Errorf("failed to read peering by id: %w", err) return fmt.Errorf("failed to read peering by id: %w", err)
} }
if peering == nil { if peering == nil {
return fmt.Errorf("unknown peering %q for secret", secret.PeerID) return fmt.Errorf("unknown peering %q for secret", req.PeerID)
} }
// If the peering came from a peering token no validation is done for the given secrets. // If the peering came from a peering token no validation is done for the given secrets.
// Dialing peers do not need to validate uniqueness because the secrets were generated elsewhere. // Dialing peers do not need to validate uniqueness because the secrets were generated elsewhere.
if peering.ShouldDial() { if peering.ShouldDial() {
if err := tx.Insert(tablePeeringSecrets, secret); err != nil { r, ok := req.Request.(*pbpeering.SecretsWriteRequest_Establish)
if !ok {
return fmt.Errorf("invalid request type %T when persisting stream secret for dialing peer", req.Request)
}
secrets := pbpeering.PeeringSecrets{
PeerID: req.PeerID,
Stream: &pbpeering.PeeringSecrets_Stream{
ActiveSecretID: r.Establish.ActiveStreamSecret,
},
}
if err := tx.Insert(tablePeeringSecrets, &secrets); err != nil {
return fmt.Errorf("failed inserting peering: %w", err) return fmt.Errorf("failed inserting peering: %w", err)
} }
return nil return nil
} }
if req.Operation == pbpeering.PeeringSecretsWriteRequest_OPERATION_UNSPECIFIED {
return fmt.Errorf("the operation that triggered the secrets write was not specified")
}
// If the peering token was generated locally, validate that the newly introduced UUID is still unique. // If the peering token was generated locally, validate that the newly introduced UUID is still unique.
// RPC handlers validate that generated IDs are available, but availability cannot be guaranteed until the state store operation. // RPC handlers validate that generated IDs are available, but availability cannot be guaranteed until the state store operation.
var newSecretID string var newSecretID string
switch req.Operation { switch r := req.Request.(type) {
// Establishment secrets are written when generating peering tokens, and no other secret IDs are included. // Establishment secrets are written when generating peering tokens, and no other secret IDs are included.
case pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN: case *pbpeering.SecretsWriteRequest_GenerateToken:
newSecretID = secret.GetEstablishment().GetSecretID() newSecretID = r.GenerateToken.EstablishmentSecret
// When exchanging an establishment secret a new pending stream secret is generated. // When exchanging an establishment secret a new pending stream secret is generated.
// Active stream secrets doesn't need to be checked for uniqueness because it is only ever promoted from pending. // Active stream secrets doesn't need to be checked for uniqueness because it is only ever promoted from pending.
case pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET: case *pbpeering.SecretsWriteRequest_ExchangeSecret:
newSecretID = secret.GetStream().GetPendingSecretID() newSecretID = r.ExchangeSecret.PendingStreamSecret
} }
if newSecretID != "" { if newSecretID != "" {
@ -244,50 +249,101 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, req *pbpeering.PeeringSecret
} }
} }
existing, err := peeringSecretsReadByPeerIDTxn(tx, nil, secret.PeerID) existing, err := peeringSecretsReadByPeerIDTxn(tx, nil, req.PeerID)
if err != nil { if err != nil {
return err return err
} }
secrets := pbpeering.PeeringSecrets{
PeerID: req.PeerID,
}
var toDelete []string var toDelete []string
if existing != nil { // Collect any overwritten UUIDs for deletion.
// Collect any overwritten UUIDs for deletion. switch r := req.Request.(type) {
switch req.Operation { case *pbpeering.SecretsWriteRequest_GenerateToken:
case pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN: // Store the newly-generated establishment secret, overwriting any that existed.
// Merge in existing stream secrets when persisting a new establishment secret. secrets.Establishment = &pbpeering.PeeringSecrets_Establishment{
// This is to avoid invalidating stream secrets when a new peering token SecretID: r.GenerateToken.GetEstablishmentSecret(),
// is generated.
secret.Stream = existing.GetStream()
// When a new token is generated we replace any un-used establishment secrets.
if existingEstablishment := existing.GetEstablishment().GetSecretID(); existingEstablishment != "" {
toDelete = append(toDelete, existingEstablishment)
}
case pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET:
// Avoid invalidating existing active secrets when exchanging establishment secret for pending.
secret.Stream.ActiveSecretID = existing.GetStream().GetActiveSecretID()
// When exchanging an establishment secret we invalidate the existing establishment secret.
if existingEstablishment := existing.GetEstablishment().GetSecretID(); existingEstablishment != "" {
toDelete = append(toDelete, existingEstablishment)
}
// When exchanging an establishment secret unused pending secrets are overwritten.
if existingPending := existing.GetStream().GetPendingSecretID(); existingPending != "" {
toDelete = append(toDelete, existingPending)
}
case pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING:
// Avoid invalidating existing establishment secrets when promoting pending secrets.
secret.Establishment = existing.GetEstablishment()
// If there was previously an active stream secret it gets replaced in favor of the pending secret
// that is being promoted.
if existingActive := existing.GetStream().GetActiveSecretID(); existingActive != "" {
toDelete = append(toDelete, existingActive)
}
} }
// Merge in existing stream secrets when persisting a new establishment secret.
// This is to avoid invalidating stream secrets when a new peering token
// is generated.
secrets.Stream = existing.GetStream()
// When a new token is generated we replace any un-used establishment secrets.
if existingEstablishment := existing.GetEstablishment().GetSecretID(); existingEstablishment != "" {
toDelete = append(toDelete, existingEstablishment)
}
case *pbpeering.SecretsWriteRequest_ExchangeSecret:
if existing == nil {
return fmt.Errorf("cannot exchange peering secret: no known secrets for peering")
}
// Store the newly-generated pending stream secret, overwriting any that existed.
secrets.Stream = &pbpeering.PeeringSecrets_Stream{
PendingSecretID: r.ExchangeSecret.GetPendingStreamSecret(),
// Avoid invalidating existing active secrets when exchanging establishment secret for pending.
ActiveSecretID: existing.GetStream().GetActiveSecretID(),
}
// When exchanging an establishment secret we invalidate the existing establishment secret.
if existingEstablishment := existing.GetEstablishment().GetSecretID(); existingEstablishment != "" {
toDelete = append(toDelete, existingEstablishment)
} else {
// When there is no existing establishment secret we must not proceed because another ExchangeSecret
// RPC already invalidated it. Otherwise, this operation would overwrite the pending secret
// from the previous ExchangeSecret.
return fmt.Errorf("invalid establishment secret: peering was already established")
}
// When exchanging an establishment secret unused pending secrets are overwritten.
if existingPending := existing.GetStream().GetPendingSecretID(); existingPending != "" {
toDelete = append(toDelete, existingPending)
}
case *pbpeering.SecretsWriteRequest_PromotePending:
if existing == nil {
return fmt.Errorf("cannot promote pending secret: no known secrets for peering")
}
if existing.GetStream().GetPendingSecretID() == "" {
// There is a potential race if multiple dialing clusters send an Open request with a valid
// pending secret. The secret could be validated for all concurrently at the RPC layer,
// but then the pending secret is promoted for one dialer before the others.
// In this scenario the end result of promoting the pending secret is the same,
// but we want to avoid initiating peering streams to all of these clusters.
// Therefore, we accept the first write but reject all others.
return fmt.Errorf("invalid pending stream secret: secret was already promoted")
}
// Store the newly-generated pending stream secret, overwriting any that existed.
secrets.Stream = &pbpeering.PeeringSecrets_Stream{
// Promoting a pending secret moves it to active.
PendingSecretID: "",
// Store the newly-promoted pending secret as the active secret.
ActiveSecretID: r.PromotePending.ActiveStreamSecret,
}
// Avoid invalidating existing establishment secrets when promoting pending secrets.
secrets.Establishment = existing.GetEstablishment()
// If there was previously an active stream secret it gets replaced in favor of the pending secret
// that is being promoted.
if existingActive := existing.GetStream().GetActiveSecretID(); existingActive != "" {
toDelete = append(toDelete, existingActive)
}
case *pbpeering.SecretsWriteRequest_Establish:
// This should never happen. Dialing peers are the only ones that can call Establish,
// and the peering secrets for dialing peers should have been inserted earlier in the function.
return fmt.Errorf("an accepting peer should not have called Establish RPC")
default:
return fmt.Errorf("got unexpected request type: %T", req.Request)
} }
for _, id := range toDelete { for _, id := range toDelete {
if err := tx.Delete(tablePeeringSecretUUIDs, id); err != nil { if err := tx.Delete(tablePeeringSecretUUIDs, id); err != nil {
@ -295,7 +351,7 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, req *pbpeering.PeeringSecret
} }
} }
if err := tx.Insert(tablePeeringSecrets, secret); err != nil { if err := tx.Insert(tablePeeringSecrets, &secrets); err != nil {
return fmt.Errorf("failed inserting peering: %w", err) return fmt.Errorf("failed inserting peering: %w", err)
} }
return nil return nil

View File

@ -236,24 +236,45 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
return resp return resp
} }
writeSeed := func(s *Store, req *pbpeering.PeeringWriteRequest) { var (
testSecretOne = testUUID()
testSecretTwo = testUUID()
testSecretThree = testUUID()
testSecretFour = testUUID()
)
type testSeed struct {
peering *pbpeering.Peering
secrets *pbpeering.PeeringSecrets
}
type testcase struct {
name string
seed *testSeed
input *pbpeering.SecretsWriteRequest
expect *pbpeering.PeeringSecrets
expectUUIDs []string
expectErr string
}
writeSeed := func(s *Store, seed *testSeed) {
tx := s.db.WriteTxn(1) tx := s.db.WriteTxn(1)
defer tx.Abort() defer tx.Abort()
if req.Peering != nil { if seed.peering != nil {
require.NoError(t, tx.Insert(tablePeering, req.Peering)) require.NoError(t, tx.Insert(tablePeering, seed.peering))
} }
if secretsReq := req.SecretsRequest; secretsReq != nil { if seed.secrets != nil {
require.NoError(t, tx.Insert(tablePeeringSecrets, secretsReq.Secrets)) require.NoError(t, tx.Insert(tablePeeringSecrets, seed.secrets))
var toInsert []string var toInsert []string
if establishment := secretsReq.Secrets.GetEstablishment().GetSecretID(); establishment != "" { if establishment := seed.secrets.GetEstablishment().GetSecretID(); establishment != "" {
toInsert = append(toInsert, establishment) toInsert = append(toInsert, establishment)
} }
if pending := secretsReq.Secrets.GetStream().GetPendingSecretID(); pending != "" { if pending := seed.secrets.GetStream().GetPendingSecretID(); pending != "" {
toInsert = append(toInsert, pending) toInsert = append(toInsert, pending)
} }
if active := secretsReq.Secrets.GetStream().GetActiveSecretID(); active != "" { if active := seed.secrets.GetStream().GetActiveSecretID(); active != "" {
toInsert = append(toInsert, active) toInsert = append(toInsert, active)
} }
for _, id := range toInsert { for _, id := range toInsert {
@ -264,20 +285,6 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
tx.Commit() tx.Commit()
} }
var (
testSecretOne = testUUID()
testSecretTwo = testUUID()
testSecretThree = testUUID()
)
type testcase struct {
name string
seed *pbpeering.PeeringWriteRequest
input *pbpeering.PeeringSecretsWriteRequest
expect *pbpeering.PeeringSecrets
expectUUIDs []string
expectErr string
}
run := func(t *testing.T, tc testcase) { run := func(t *testing.T, tc testcase) {
s := NewStateStore(nil) s := NewStateStore(nil)
@ -294,7 +301,7 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Validate that we read what we expect // Validate that we read what we expect
secrets, err := s.PeeringSecretsRead(nil, tc.input.GetSecrets().GetPeerID()) secrets, err := s.PeeringSecretsRead(nil, tc.input.GetPeerID())
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, secrets) require.NotNil(t, secrets)
prototest.AssertDeepEqual(t, tc.expect, secrets) prototest.AssertDeepEqual(t, tc.expect, secrets)
@ -305,46 +312,129 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
tcs := []testcase{ tcs := []testcase{
{ {
name: "missing peer id", name: "missing peer id",
input: &pbpeering.PeeringSecretsWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{}, Request: &pbpeering.SecretsWriteRequest_GenerateToken{},
}, },
expectErr: "missing peer ID", expectErr: "missing peer ID",
}, },
{
name: "no secret IDs were embedded",
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
},
},
expectErr: "no secret IDs were embedded",
},
{ {
name: "unknown peer id", name: "unknown peer id",
input: &pbpeering.PeeringSecretsWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, Request: &pbpeering.SecretsWriteRequest_GenerateToken{
Establishment: &pbpeering.PeeringSecrets_Establishment{ GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
SecretID: testFooSecretID, EstablishmentSecret: testFooSecretID,
}, },
}, },
}, },
expectErr: "unknown peering", expectErr: "unknown peering",
}, },
{ {
name: "dialing peer does not track UUIDs", name: "no secret IDs were embedded when generating token",
seed: &pbpeering.PeeringWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Peering: &pbpeering.Peering{ PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_GenerateToken{},
},
expectErr: "missing secret ID",
},
{
name: "no secret IDs were embedded when establishing peering",
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_Establish{},
},
expectErr: "missing secret ID",
},
{
name: "no secret IDs were embedded when exchanging secret",
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{},
},
expectErr: "missing secret ID",
},
{
name: "no secret IDs were embedded when promoting pending secret",
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_PromotePending{},
},
expectErr: "missing secret ID",
},
{
name: "dialing peer invalid request type - generate token",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testFooPeerID, ID: testFooPeerID,
PeerServerAddresses: []string{"10.0.0.1:5300"}, PeerServerAddresses: []string{"10.0.0.1:5300"},
}, },
}, },
input: &pbpeering.PeeringSecretsWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, // Dialing peer must only write secrets from Establish
Stream: &pbpeering.PeeringSecrets_Stream{ Request: &pbpeering.SecretsWriteRequest_GenerateToken{
ActiveSecretID: testFooSecretID, GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
EstablishmentSecret: testFooSecretID,
},
},
},
expectErr: "invalid request type",
},
{
name: "dialing peer invalid request type - exchange secret",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
PeerServerAddresses: []string{"10.0.0.1:5300"},
},
},
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
// Dialing peer must only write secrets from Establish
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
PendingStreamSecret: testFooSecretID,
},
},
},
expectErr: "invalid request type",
},
{
name: "dialing peer invalid request type - promote pending",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
PeerServerAddresses: []string{"10.0.0.1:5300"},
},
},
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
// Dialing peer must only write secrets from Establish
Request: &pbpeering.SecretsWriteRequest_PromotePending{
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
ActiveStreamSecret: testFooSecretID,
},
},
},
expectErr: "invalid request type",
},
{
name: "dialing peer does not track UUIDs",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
PeerServerAddresses: []string{"10.0.0.1:5300"},
},
},
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_Establish{
Establish: &pbpeering.SecretsWriteRequest_EstablishRequest{
ActiveStreamSecret: testFooSecretID,
}, },
}, },
}, },
@ -357,49 +447,28 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
// UUIDs are only tracked for uniqueness in the generating cluster. // UUIDs are only tracked for uniqueness in the generating cluster.
expectUUIDs: []string{}, expectUUIDs: []string{},
}, },
{
name: "unspecified operation",
seed: &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
},
input: &pbpeering.PeeringSecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testFooSecretID,
},
},
},
expectErr: "the operation that triggered the secrets write was not specified",
},
{ {
name: "generate new establishment secret when secrets already existed", name: "generate new establishment secret when secrets already existed",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testFooPeerID, ID: testFooPeerID,
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ secrets: &pbpeering.PeeringSecrets{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, Stream: &pbpeering.PeeringSecrets_Stream{
Stream: &pbpeering.PeeringSecrets_Stream{ PendingSecretID: testSecretOne,
PendingSecretID: testSecretOne, ActiveSecretID: testSecretTwo,
ActiveSecretID: testSecretTwo,
},
}, },
}, },
}, },
input: &pbpeering.PeeringSecretsWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, Request: &pbpeering.SecretsWriteRequest_GenerateToken{
Establishment: &pbpeering.PeeringSecrets_Establishment{ GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
SecretID: testSecretThree, EstablishmentSecret: testSecretThree,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
}, },
expect: &pbpeering.PeeringSecrets{ expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID, PeerID: testFooPeerID,
@ -416,29 +485,26 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
}, },
{ {
name: "generate new token to replace establishment secret", name: "generate new token to replace establishment secret",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testFooPeerID, ID: testFooPeerID,
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ secrets: &pbpeering.PeeringSecrets{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, Establishment: &pbpeering.PeeringSecrets_Establishment{
Establishment: &pbpeering.PeeringSecrets_Establishment{ SecretID: testSecretOne,
SecretID: testSecretOne,
},
}, },
}, },
}, },
input: &pbpeering.PeeringSecretsWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, Request: &pbpeering.SecretsWriteRequest_GenerateToken{
Establishment: &pbpeering.PeeringSecrets_Establishment{ GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
// Two replaces One // Two replaces One
SecretID: testSecretTwo, EstablishmentSecret: testSecretTwo,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
}, },
expect: &pbpeering.PeeringSecrets{ expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID, PeerID: testFooPeerID,
@ -449,59 +515,110 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
expectUUIDs: []string{testSecretTwo}, expectUUIDs: []string{testSecretTwo},
}, },
{ {
name: "exchange secret to generate new pending secret", name: "cannot exchange secret without existing secrets",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testFooPeerID, ID: testFooPeerID,
}, },
// Do not seed an establishment secret.
}, },
input: &pbpeering.PeeringSecretsWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
PendingStreamSecret: testSecretOne,
},
},
},
expectErr: "no known secrets for peering",
},
{
name: "cannot exchange secret without establishment secret",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID, PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{ Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne, PendingSecretID: testSecretOne,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET, },
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
// Attempt to replace One with Two
PendingStreamSecret: testSecretTwo,
},
},
},
expectErr: "peering was already established",
},
{
name: "exchange secret to generate new pending secret",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testSecretOne,
},
},
},
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
PendingStreamSecret: testSecretTwo,
},
},
}, },
expect: &pbpeering.PeeringSecrets{ expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID, PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{ Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretOne, PendingSecretID: testSecretTwo,
}, },
}, },
expectUUIDs: []string{testSecretOne}, // Establishment secret testSecretOne is discarded when exchanging for a stream secret
expectUUIDs: []string{testSecretTwo},
}, },
{ {
name: "exchange secret replaces pending stream secret", name: "exchange secret replaces pending stream secret",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testFooPeerID, ID: testFooPeerID,
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ secrets: &pbpeering.PeeringSecrets{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, Establishment: &pbpeering.PeeringSecrets_Establishment{
Stream: &pbpeering.PeeringSecrets_Stream{ SecretID: testSecretFour,
ActiveSecretID: testSecretOne, },
PendingSecretID: testSecretTwo, Stream: &pbpeering.PeeringSecrets_Stream{
}, ActiveSecretID: testSecretOne,
PendingSecretID: testSecretTwo,
}, },
}, },
}, },
input: &pbpeering.PeeringSecretsWriteRequest{ input: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testFooPeerID,
PeerID: testFooPeerID, Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
Stream: &pbpeering.PeeringSecrets_Stream{ ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
// Three replaces two // Three replaces two
PendingSecretID: testSecretThree, PendingStreamSecret: testSecretThree,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
}, },
expect: &pbpeering.PeeringSecrets{ expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID, PeerID: testFooPeerID,
// Establishment secret is discarded in favor of new pending secret.
Stream: &pbpeering.PeeringSecrets_Stream{ Stream: &pbpeering.PeeringSecrets_Stream{
// Active secret is not deleted until the new pending secret is promoted // Active secret is not deleted until the new pending secret is promoted
ActiveSecretID: testSecretOne, ActiveSecretID: testSecretOne,
@ -511,34 +628,75 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
expectUUIDs: []string{testSecretOne, testSecretThree}, expectUUIDs: []string{testSecretOne, testSecretThree},
}, },
{ {
name: "promote pending secret and delete active", name: "cannot promote pending without existing secrets",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testFooPeerID, ID: testFooPeerID,
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ // Do not seed a pending secret.
Secrets: &pbpeering.PeeringSecrets{ },
PeerID: testFooPeerID, input: &pbpeering.SecretsWriteRequest{
Establishment: &pbpeering.PeeringSecrets_Establishment{ PeerID: testFooPeerID,
SecretID: testSecretThree, Request: &pbpeering.SecretsWriteRequest_PromotePending{
}, PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
Stream: &pbpeering.PeeringSecrets_Stream{ ActiveStreamSecret: testSecretOne,
PendingSecretID: testSecretTwo,
ActiveSecretID: testSecretOne,
},
}, },
}, },
}, },
input: &pbpeering.PeeringSecretsWriteRequest{ expectErr: "no known secrets for peering",
Secrets: &pbpeering.PeeringSecrets{ },
{
name: "cannot promote pending without existing pending secret",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID, PeerID: testFooPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{ Stream: &pbpeering.PeeringSecrets_Stream{
// Two gets promoted over One ActiveSecretID: testSecretOne,
ActiveSecretID: testSecretTwo, },
},
},
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_PromotePending{
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
// Attempt to replace One with Two
ActiveStreamSecret: testSecretTwo,
},
},
},
expectErr: "secret was already promoted",
},
{
name: "promote pending secret and delete active",
seed: &testSeed{
peering: &pbpeering.Peering{
Name: "foo",
ID: testFooPeerID,
},
secrets: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID,
Establishment: &pbpeering.PeeringSecrets_Establishment{
SecretID: testSecretThree,
},
Stream: &pbpeering.PeeringSecrets_Stream{
PendingSecretID: testSecretTwo,
ActiveSecretID: testSecretOne,
},
},
},
input: &pbpeering.SecretsWriteRequest{
PeerID: testFooPeerID,
Request: &pbpeering.SecretsWriteRequest_PromotePending{
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
// Two gets promoted over One
ActiveStreamSecret: testSecretTwo,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
}, },
expect: &pbpeering.PeeringSecrets{ expect: &pbpeering.PeeringSecrets{
PeerID: testFooPeerID, PeerID: testFooPeerID,
@ -936,14 +1094,13 @@ func TestStore_PeeringWrite(t *testing.T) {
Name: "baz", Name: "baz",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ SecretsRequest: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testBazPeerID,
PeerID: testBazPeerID, Request: &pbpeering.SecretsWriteRequest_GenerateToken{
Establishment: &pbpeering.PeeringSecrets_Establishment{ GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
SecretID: testBazSecretID, EstablishmentSecret: testBazSecretID,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
}, },
}, },
expectSecrets: &pbpeering.PeeringSecrets{ expectSecrets: &pbpeering.PeeringSecrets{

View File

@ -99,7 +99,7 @@ type Backend interface {
GetLeaderAddress() string GetLeaderAddress() string
ValidateProposedPeeringSecret(id string) (bool, error) ValidateProposedPeeringSecret(id string) (bool, error)
PeeringSecretsWrite(req *pbpeering.PeeringSecretsWriteRequest) error PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
CatalogRegister(req *structs.RegisterRequest) error CatalogRegister(req *structs.RegisterRequest) error

View File

@ -26,12 +26,13 @@ func TestServer_ExchangeSecret(t *testing.T) {
var secret string var secret string
testutil.RunStep(t, "known establishment secret is accepted", func(t *testing.T) { testutil.RunStep(t, "known establishment secret is accepted", func(t *testing.T) {
// First write the establishment secret so that it can be exchanged // First write the establishment secret so that it can be exchanged
require.NoError(t, store.PeeringSecretsWrite(1, &pbpeering.PeeringSecretsWriteRequest{ require.NoError(t, store.PeeringSecretsWrite(1, &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testPeerID,
PeerID: testPeerID, Request: &pbpeering.SecretsWriteRequest_GenerateToken{
Establishment: &pbpeering.PeeringSecrets_Establishment{SecretID: testEstablishmentSecretID}, GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
EstablishmentSecret: testEstablishmentSecretID,
},
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
})) }))
// Exchange the now-valid establishment secret for a stream secret // Exchange the now-valid establishment secret for a stream secret

View File

@ -77,21 +77,14 @@ func (s *Server) ExchangeSecret(ctx context.Context, req *pbpeerstream.ExchangeS
return nil, grpcstatus.Errorf(codes.Internal, "failed to generate peering stream secret: %v", err) return nil, grpcstatus.Errorf(codes.Internal, "failed to generate peering stream secret: %v", err)
} }
writeReq := &pbpeering.PeeringSecretsWriteRequest{ writeReq := &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: req.PeerID,
PeerID: req.PeerID, Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
Stream: &pbpeering.PeeringSecrets_Stream{ ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
// Overwriting any existing un-utilized pending stream secret. // Overwrite any existing un-utilized pending stream secret.
PendingSecretID: id, PendingStreamSecret: id,
// If there is an active stream secret ID it is NOT invalidated here.
// It remains active until the pending secret ID is used and promoted to active.
// This allows dialing clusters with the active stream secret to continue to dial successfully until they
// receive the new secret.
ActiveSecretID: existing.GetStream().GetActiveSecretID(),
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
} }
err = s.Backend.PeeringSecretsWrite(writeReq) err = s.Backend.PeeringSecretsWrite(writeReq)
if err != nil { if err != nil {
@ -194,18 +187,14 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
} }
authorized = true authorized = true
promoted := &pbpeering.PeeringSecretsWriteRequest{ promoted := &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: p.ID,
PeerID: req.PeerID, Request: &pbpeering.SecretsWriteRequest_PromotePending{
Stream: &pbpeering.PeeringSecrets_Stream{ PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
ActiveSecretID: pending, // Overwrite any existing un-utilized pending stream secret.
ActiveStreamSecret: pending,
// The PendingSecretID is intentionally zeroed out since we want to avoid re-triggering this
// promotion process with the same pending secret.
PendingSecretID: "",
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
} }
err = s.Backend.PeeringSecretsWrite(promoted) err = s.Backend.PeeringSecretsWrite(promoted)
if err != nil { if err != nil {

View File

@ -181,9 +181,13 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
} }
func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) { func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
type testSeed struct {
peering *pbpeering.Peering
secrets []*pbpeering.SecretsWriteRequest
}
type testCase struct { type testCase struct {
name string name string
seed *pbpeering.PeeringWriteRequest seed *testSeed
input *pbpeerstream.ReplicationMessage input *pbpeerstream.ReplicationMessage
wantErr error wantErr error
} }
@ -194,7 +198,13 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
srv, store := newTestServer(t, nil) srv, store := newTestServer(t, nil)
// Write a seed peering. // Write a seed peering.
require.NoError(t, store.PeeringWrite(1, tc.seed)) if tc.seed != nil {
require.NoError(t, store.PeeringWrite(1, &pbpeering.PeeringWriteRequest{Peering: tc.seed.peering}))
for _, s := range tc.seed.secrets {
require.NoError(t, store.PeeringSecretsWrite(1, s))
}
}
// Set the initial roots and CA configuration. // Set the initial roots and CA configuration.
_, _ = writeInitialRootsAndCA(t, store) _, _ = writeInitialRootsAndCA(t, store)
@ -223,12 +233,14 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
} else { } else {
require.NoError(t, err) require.NoError(t, err)
} }
client.Close()
} }
tt := []testCase{ tt := []testCase{
{ {
name: "no secret for peering", name: "no secret for peering",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: peeringWithoutSecrets, ID: peeringWithoutSecrets,
}, },
@ -244,19 +256,20 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
}, },
{ {
name: "unknown secret", name: "unknown secret",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testPeerID, ID: testPeerID,
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ secrets: []*pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ {
PeerID: testPeerID, PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{ Request: &pbpeering.SecretsWriteRequest_GenerateToken{
ActiveSecretID: testActiveStreamSecretID, GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
EstablishmentSecret: testEstablishmentSecretID,
},
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
}, },
}, },
input: &pbpeerstream.ReplicationMessage{ input: &pbpeerstream.ReplicationMessage{
@ -270,46 +283,73 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
wantErr: status.Error(codes.PermissionDenied, "invalid peering stream secret"), wantErr: status.Error(codes.PermissionDenied, "invalid peering stream secret"),
}, },
{ {
name: "known active secret", name: "known pending secret",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testPeerID, ID: testPeerID,
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ secrets: []*pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ {
PeerID: testPeerID, PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{ Request: &pbpeering.SecretsWriteRequest_GenerateToken{
ActiveSecretID: testActiveStreamSecretID, GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
EstablishmentSecret: testEstablishmentSecretID,
},
},
},
{
PeerID: testPeerID,
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
PendingStreamSecret: testPendingStreamSecretID,
},
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_PROMOTEPENDING,
}, },
}, },
input: &pbpeerstream.ReplicationMessage{ input: &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Open_{ Payload: &pbpeerstream.ReplicationMessage_Open_{
Open: &pbpeerstream.ReplicationMessage_Open{ Open: &pbpeerstream.ReplicationMessage_Open{
PeerID: testPeerID, PeerID: testPeerID,
StreamSecretID: testActiveStreamSecretID, StreamSecretID: testPendingStreamSecretID,
}, },
}, },
}, },
}, },
{ {
name: "known pending secret", name: "known active secret",
seed: &pbpeering.PeeringWriteRequest{ seed: &testSeed{
Peering: &pbpeering.Peering{ peering: &pbpeering.Peering{
Name: "foo", Name: "foo",
ID: testPeerID, ID: testPeerID,
}, },
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ secrets: []*pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ {
PeerID: testPeerID, PeerID: testPeerID,
Stream: &pbpeering.PeeringSecrets_Stream{ Request: &pbpeering.SecretsWriteRequest_GenerateToken{
PendingSecretID: testPendingStreamSecretID, GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
EstablishmentSecret: testEstablishmentSecretID,
},
},
},
{
PeerID: testPeerID,
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
PendingStreamSecret: testPendingStreamSecretID,
},
},
},
{
PeerID: testPeerID,
Request: &pbpeering.SecretsWriteRequest_PromotePending{
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
// Pending gets promoted to active.
ActiveStreamSecret: testPendingStreamSecretID,
},
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET,
}, },
}, },
input: &pbpeerstream.ReplicationMessage{ input: &pbpeerstream.ReplicationMessage{
@ -1399,7 +1439,7 @@ func (b *testStreamBackend) ValidateProposedPeeringSecret(id string) (bool, erro
return true, nil return true, nil
} }
func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecretsWriteRequest) error { func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error {
return b.store.PeeringSecretsWrite(1, req) return b.store.PeeringSecretsWrite(1, req)
} }
@ -1640,16 +1680,25 @@ func writeTestPeering(t *testing.T, store *state.Store, idx uint64, peerName, re
if remotePeerID != "" { if remotePeerID != "" {
peering.PeerServerAddresses = []string{"127.0.0.1:5300"} peering.PeerServerAddresses = []string{"127.0.0.1:5300"}
} }
require.NoError(t, store.PeeringWrite(idx, &pbpeering.PeeringWriteRequest{ require.NoError(t, store.PeeringWrite(idx, &pbpeering.PeeringWriteRequest{
Peering: &peering, Peering: &peering,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ SecretsRequest: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: testPeerID,
PeerID: testPeerID, // Simulate generating a stream secret by first generating a token then exchanging for a stream secret.
Stream: &pbpeering.PeeringSecrets_Stream{ Request: &pbpeering.SecretsWriteRequest_GenerateToken{
PendingSecretID: testPendingStreamSecretID, GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
EstablishmentSecret: testEstablishmentSecretID,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_EXCHANGESECRET, },
}))
require.NoError(t, store.PeeringSecretsWrite(idx, &pbpeering.SecretsWriteRequest{
PeerID: testPeerID,
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
PendingStreamSecret: testPendingStreamSecretID,
},
}, },
})) }))

View File

@ -260,14 +260,13 @@ func (s *Server) GenerateToken(
writeReq := &pbpeering.PeeringWriteRequest{ writeReq := &pbpeering.PeeringWriteRequest{
Peering: peering, Peering: peering,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ SecretsRequest: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: peering.ID,
PeerID: peering.ID, Request: &pbpeering.SecretsWriteRequest_GenerateToken{
Establishment: &pbpeering.PeeringSecrets_Establishment{ GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
SecretID: secretID, EstablishmentSecret: secretID,
}, },
}, },
Operation: pbpeering.PeeringSecretsWriteRequest_OPERATION_GENERATETOKEN,
}, },
} }
if err := s.Backend.PeeringWrite(writeReq); err != nil { if err := s.Backend.PeeringWrite(writeReq); err != nil {
@ -434,19 +433,19 @@ func (s *Server) Establish(
return nil, dialErrors return nil, dialErrors
} }
// As soon as a peering is written with a list of ServerAddresses that is // As soon as a peering is written with a non-empty list of ServerAddresses
// non-empty, the leader routine will see the peering and attempt to // and an active stream secret, a leader routine will see the peering and
// establish a connection with the remote peer. // attempt to establish a peering stream with the remote peer.
// //
// This peer now has a record of both the LocalPeerID(ID) and // This peer now has a record of both the LocalPeerID(ID) and
// RemotePeerID(PeerID) but at this point the other peer does not. // RemotePeerID(PeerID) but at this point the other peer does not.
writeReq := &pbpeering.PeeringWriteRequest{ writeReq := &pbpeering.PeeringWriteRequest{
Peering: peering, Peering: peering,
SecretsRequest: &pbpeering.PeeringSecretsWriteRequest{ SecretsRequest: &pbpeering.SecretsWriteRequest{
Secrets: &pbpeering.PeeringSecrets{ PeerID: peering.ID,
PeerID: peering.ID, Request: &pbpeering.SecretsWriteRequest_Establish{
Stream: &pbpeering.PeeringSecrets_Stream{ Establish: &pbpeering.SecretsWriteRequest_EstablishRequest{
ActiveSecretID: exchangeResp.StreamSecret, ActiveStreamSecret: exchangeResp.StreamSecret,
}, },
}, },
}, },

View File

@ -155,17 +155,32 @@ func (p *Peering) IsActive() bool {
} }
// Validate is a validation helper that checks whether a secret ID is embedded in the container type. // Validate is a validation helper that checks whether a secret ID is embedded in the container type.
func (p *PeeringSecrets) Validate() error { func (s *SecretsWriteRequest) Validate() error {
if p.GetPeerID() == "" { if s.PeerID == "" {
return errors.New("missing peer ID") return errors.New("missing peer ID")
} }
if p.GetEstablishment().GetSecretID() != "" { switch r := s.Request.(type) {
return nil case *SecretsWriteRequest_GenerateToken:
if r != nil && r.GenerateToken.GetEstablishmentSecret() != "" {
return nil
}
case *SecretsWriteRequest_Establish:
if r != nil && r.Establish.GetActiveStreamSecret() != "" {
return nil
}
case *SecretsWriteRequest_ExchangeSecret:
if r != nil && r.ExchangeSecret.GetPendingStreamSecret() != "" {
return nil
}
case *SecretsWriteRequest_PromotePending:
if r != nil && r.PromotePending.GetActiveStreamSecret() != "" {
return nil
}
default:
return fmt.Errorf("unexpected request type %T", s.Request)
} }
if p.GetStream().GetPendingSecretID() != "" || p.GetStream().GetActiveSecretID() != "" {
return nil return errors.New("missing secret ID")
}
return errors.New("no secret IDs were embedded")
} }
// TLSDialOption returns the gRPC DialOption to secure the transport if CAPems // TLSDialOption returns the gRPC DialOption to secure the transport if CAPems

View File

@ -8,12 +8,52 @@ import (
) )
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringSecretsWriteRequest) MarshalBinary() ([]byte, error) { func (msg *SecretsWriteRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)
} }
// UnmarshalBinary implements encoding.BinaryUnmarshaler // UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *PeeringSecretsWriteRequest) UnmarshalBinary(b []byte) error { func (msg *SecretsWriteRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *SecretsWriteRequest_GenerateTokenRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *SecretsWriteRequest_GenerateTokenRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *SecretsWriteRequest_ExchangeSecretRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *SecretsWriteRequest_ExchangeSecretRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *SecretsWriteRequest_PromotePendingRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *SecretsWriteRequest_PromotePendingRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *SecretsWriteRequest_EstablishRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *SecretsWriteRequest_EstablishRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }

File diff suppressed because it is too large Load Diff

View File

@ -53,22 +53,54 @@ enum PeeringState {
TERMINATED = 6; TERMINATED = 6;
} }
message PeeringSecretsWriteRequest { // SecretsWriteRequest encodes a request to write a peering secret as the result
enum Operation { // of some operation. Different operations, such as generating a peering token,
OPERATION_UNSPECIFIED = 0; // lead to modifying the known secrets associated with a peering.
message SecretsWriteRequest {
// PeerID is the local UUID of the peering this request applies to.
string PeerID = 1;
OPERATION_GENERATETOKEN = 1; oneof Request {
GenerateTokenRequest generate_token = 2;
OPERATION_EXCHANGESECRET = 2; ExchangeSecretRequest exchange_secret = 3;
PromotePendingRequest promote_pending = 4;
OPERATION_PROMOTEPENDING = 3; EstablishRequest establish = 5;
} }
// Secret contains the peering secrets to write. // GenerateTokenRequest encodes a request to persist a peering establishment
PeeringSecrets secrets = 1; // secret. It is triggered by generating a new peering token for a peer cluster.
message GenerateTokenRequest{
// establishment_secret is the proposed secret ID to store as the establishment
// secret for this peering.
string establishment_secret = 1;
}
// Operation defines which action triggered the secrets write. // ExchangeSecretRequest encodes a request to persist a pending stream secret
Operation operation = 2; // secret. It is triggered by an acceptor peer generating a long-lived stream secret
// in exchange for an establishment secret.
message ExchangeSecretRequest {
// pending_stream_secret is the proposed secret ID to store as the pending stream
// secret for this peering.
string pending_stream_secret = 1;
}
// PromotePendingRequest encodes a request to promote a pending stream secret
// to be an active stream secret. It is triggered when the accepting stream handler
// validates an Open request from a peer with a pending stream secret.
message PromotePendingRequest {
// active_stream_secret is the proposed secret ID to store as the active stream
// secret for this peering.
string active_stream_secret = 1;
}
// EstablishRequest encodes a request to persist an active stream secret.
// It is triggered after a dialing peer exchanges their establishment secret
// for a long-lived active stream secret.
message EstablishRequest {
// active_stream_secret is the proposed secret ID to store as the active stream
// secret for this peering.
string active_stream_secret = 1;
}
} }
// PeeringSecrets defines a secret used for authenticating/authorizing peer clusters. // PeeringSecrets defines a secret used for authenticating/authorizing peer clusters.
@ -213,10 +245,10 @@ message PeeringWriteRequest {
// Peering is the peering to write with the request. // Peering is the peering to write with the request.
Peering Peering = 1; Peering Peering = 1;
// Secret contains the optional peering secrets to persist // SecretsWriteRequest contains the optional peering secrets to persist
// with the peering. Peering secrets are not embedded in the peering // with the peering. Peering secrets are not embedded in the peering
// object to avoid leaking them. // object to avoid leaking them.
PeeringSecretsWriteRequest SecretsRequest = 2; SecretsWriteRequest SecretsRequest = 2;
map<string, string> Meta = 3; map<string, string> Meta = 3;
} }

View File

@ -21,7 +21,7 @@ func testLogLevel() hclog.Level {
if level != hclog.NoLevel { if level != hclog.NoLevel {
return level return level
} }
return hclog.Trace return hclog.Warn
} }
func Logger(t TestingTB) hclog.InterceptLogger { func Logger(t TestingTB) hclog.InterceptLogger {