mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
Merge pull request #14024 from hashicorp/peering/secrets-fixes
This commit is contained in:
commit
d7be72c7fd
@ -720,9 +720,9 @@ func (c *FSM) applyPeeringDelete(buf []byte, index uint64) interface{} {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *FSM) applyPeeringSecretsWrite(buf []byte, index uint64) interface{} {
|
func (c *FSM) applyPeeringSecretsWrite(buf []byte, index uint64) interface{} {
|
||||||
var req pbpeering.PeeringSecrets
|
var req pbpeering.SecretsWriteRequest
|
||||||
if err := structs.DecodeProto(buf, &req); err != nil {
|
if err := structs.DecodeProto(buf, &req); err != nil {
|
||||||
panic(fmt.Errorf("failed to decode peering write request: %v", err))
|
panic(fmt.Errorf("failed to decode peering secrets write request: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "peering_secrets"}, time.Now(),
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "peering_secrets"}, time.Now(),
|
||||||
|
@ -38,6 +38,7 @@ func init() {
|
|||||||
registerRestorer(structs.FreeVirtualIPRequestType, restoreFreeVirtualIP)
|
registerRestorer(structs.FreeVirtualIPRequestType, restoreFreeVirtualIP)
|
||||||
registerRestorer(structs.PeeringWriteType, restorePeering)
|
registerRestorer(structs.PeeringWriteType, restorePeering)
|
||||||
registerRestorer(structs.PeeringTrustBundleWriteType, restorePeeringTrustBundle)
|
registerRestorer(structs.PeeringTrustBundleWriteType, restorePeeringTrustBundle)
|
||||||
|
registerRestorer(structs.PeeringSecretsWriteType, restorePeeringSecrets)
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||||
@ -95,6 +96,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
|
|||||||
if err := s.persistPeeringTrustBundles(sink, encoder); err != nil {
|
if err := s.persistPeeringTrustBundles(sink, encoder); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := s.persistPeeringSecrets(sink, encoder); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -582,6 +586,24 @@ func (s *snapshot) persistPeeringTrustBundles(sink raft.SnapshotSink, encoder *c
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistPeeringSecrets(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||||
|
secrets, err := s.state.PeeringSecrets()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry := secrets.Next(); entry != nil; entry = secrets.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.PeeringSecretsWriteType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(entry.(*pbpeering.PeeringSecrets)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
var req structs.RegisterRequest
|
var req structs.RegisterRequest
|
||||||
if err := decoder.Decode(&req); err != nil {
|
if err := decoder.Decode(&req); err != nil {
|
||||||
@ -906,3 +928,14 @@ func restorePeeringTrustBundle(header *SnapshotHeader, restore *state.Restore, d
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func restorePeeringSecrets(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
|
var req pbpeering.PeeringSecrets
|
||||||
|
if err := decoder.Decode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := restore.PeeringSecrets(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/lib/stringslice"
|
"github.com/hashicorp/consul/lib/stringslice"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
|
"github.com/hashicorp/consul/proto/prototest"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -482,6 +483,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||||||
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
||||||
Name: "baz",
|
Name: "baz",
|
||||||
},
|
},
|
||||||
|
SecretsRequest: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: "baaeea83-8419-4aa8-ac89-14e7246a3d2f",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
// Peering Trust Bundles
|
// Peering Trust Bundles
|
||||||
@ -491,6 +500,27 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||||||
RootPEMs: []string{"qux certificate bundle"},
|
RootPEMs: []string{"qux certificate bundle"},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
// Issue two more secrets writes so that there are three secrets associated with the peering:
|
||||||
|
// - Establishment: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84"
|
||||||
|
// - Pending: "0b7812d4-32d9-4e54-b1b3-4d97084982a0"
|
||||||
|
require.NoError(t, fsm.state.PeeringSecretsWrite(34, &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
EstablishmentSecret: "baaeea83-8419-4aa8-ac89-14e7246a3d2f",
|
||||||
|
PendingStreamSecret: "0b7812d4-32d9-4e54-b1b3-4d97084982a0",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
require.NoError(t, fsm.state.PeeringSecretsWrite(33, &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
// Snapshot
|
// Snapshot
|
||||||
snap, err := fsm.Snapshot()
|
snap, err := fsm.Snapshot()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -797,6 +827,29 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||||||
require.NotNil(t, prngRestored)
|
require.NotNil(t, prngRestored)
|
||||||
require.Equal(t, "baz", prngRestored.Name)
|
require.Equal(t, "baz", prngRestored.Name)
|
||||||
|
|
||||||
|
// Verify peering secrets are restored
|
||||||
|
secretsRestored, err := fsm2.state.PeeringSecretsRead(nil, "1fabcd52-1d46-49b0-b1d8-71559aee47f5")
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectSecrets := &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
||||||
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
SecretID: "389bbcdf-1c31-47d6-ae96-f2a3f4c45f84",
|
||||||
|
},
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
PendingSecretID: "0b7812d4-32d9-4e54-b1b3-4d97084982a0",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
prototest.AssertDeepEqual(t, expectSecrets, secretsRestored)
|
||||||
|
|
||||||
|
uuids := []string{"389bbcdf-1c31-47d6-ae96-f2a3f4c45f84", "0b7812d4-32d9-4e54-b1b3-4d97084982a0"}
|
||||||
|
for _, id := range uuids {
|
||||||
|
free, err := fsm2.state.ValidateProposedPeeringSecretUUID(id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// The UUIDs in the peering secret should be tracked as in use.
|
||||||
|
require.False(t, free)
|
||||||
|
}
|
||||||
|
|
||||||
// Verify peering trust bundle is restored
|
// Verify peering trust bundle is restored
|
||||||
idx, ptbRestored, err := fsm2.state.PeeringTrustBundleRead(nil, state.Query{
|
idx, ptbRestored, err := fsm2.state.PeeringTrustBundleRead(nil, state.Query{
|
||||||
Value: "qux",
|
Value: "qux",
|
||||||
|
@ -465,8 +465,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||||||
//
|
//
|
||||||
// To test this, we start the two peer servers (accepting and dialing), set up peering, and then shut down
|
// To test this, we start the two peer servers (accepting and dialing), set up peering, and then shut down
|
||||||
// the accepting peer. This terminates the connection without sending a Terminated message.
|
// the accepting peer. This terminates the connection without sending a Terminated message.
|
||||||
// We then restart the accepting peer (we actually spin up a new server with the same config and port) and then
|
// We then restart the accepting peer and assert that the dialing peer reestablishes the connection.
|
||||||
// assert that the dialing peer reestablishes the connection.
|
|
||||||
func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
|
func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
@ -579,20 +578,17 @@ func TestLeader_Peering_DialerReestablishesConnectionOnError(t *testing.T) {
|
|||||||
// Have to manually shut down the gRPC server otherwise it stays bound to the port.
|
// Have to manually shut down the gRPC server otherwise it stays bound to the port.
|
||||||
acceptingServer.externalGRPCServer.Stop()
|
acceptingServer.externalGRPCServer.Stop()
|
||||||
|
|
||||||
// Mimic the server restarting by starting a new server with the same config.
|
// Restart the server by re-using the previous acceptor's data directory and node id.
|
||||||
_, acceptingServerRestart := testServerWithConfig(t, func(c *Config) {
|
_, acceptingServerRestart := testServerWithConfig(t, func(c *Config) {
|
||||||
c.NodeName = "acceptingServer.dc1"
|
c.NodeName = "acceptingServer.dc1"
|
||||||
c.Datacenter = "dc1"
|
c.Datacenter = "dc1"
|
||||||
c.TLSConfig.Domain = "consul"
|
c.TLSConfig.Domain = "consul"
|
||||||
c.GRPCPort = acceptingServerPort
|
c.GRPCPort = acceptingServerPort
|
||||||
|
c.DataDir = acceptingServer.config.DataDir
|
||||||
|
c.NodeID = acceptingServer.config.NodeID
|
||||||
})
|
})
|
||||||
testrpc.WaitForLeader(t, acceptingServerRestart.RPC, "dc1")
|
|
||||||
|
|
||||||
// Re-insert the peering state, mimicking a snapshot restore.
|
testrpc.WaitForLeader(t, acceptingServerRestart.RPC, "dc1")
|
||||||
require.NoError(t, acceptingServerRestart.fsm.State().PeeringWrite(2000, &pbpeering.PeeringWriteRequest{
|
|
||||||
Peering: peering.Peering,
|
|
||||||
Secret: secrets,
|
|
||||||
}))
|
|
||||||
|
|
||||||
// The dialing peer should eventually reconnect.
|
// The dialing peer should eventually reconnect.
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
@ -141,7 +141,7 @@ func (b *PeeringBackend) ValidateProposedPeeringSecret(id string) (bool, error)
|
|||||||
return b.srv.fsm.State().ValidateProposedPeeringSecretUUID(id)
|
return b.srv.fsm.State().ValidateProposedPeeringSecretUUID(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *PeeringBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecrets) error {
|
func (b *PeeringBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error {
|
||||||
_, err := b.srv.raftApplyProtobuf(structs.PeeringSecretsWriteType, req)
|
_, err := b.srv.raftApplyProtobuf(structs.PeeringSecretsWriteType, req)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -7,13 +7,12 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/hashicorp/go-memdb"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/configentry"
|
"github.com/hashicorp/consul/agent/configentry"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/lib/maps"
|
"github.com/hashicorp/consul/lib/maps"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -175,36 +174,47 @@ func peeringSecretsReadByPeerIDTxn(tx ReadTxn, ws memdb.WatchSet, id string) (*p
|
|||||||
return secret, nil
|
return secret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) PeeringSecretsWrite(idx uint64, secret *pbpeering.PeeringSecrets) error {
|
func (s *Store) PeeringSecretsWrite(idx uint64, req *pbpeering.SecretsWriteRequest) error {
|
||||||
tx := s.db.WriteTxn(idx)
|
tx := s.db.WriteTxn(idx)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
if err := s.peeringSecretsWriteTxn(tx, secret); err != nil {
|
if err := s.peeringSecretsWriteTxn(tx, req); err != nil {
|
||||||
return fmt.Errorf("failed to write peering secret: %w", err)
|
return fmt.Errorf("failed to write peering secret: %w", err)
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, secret *pbpeering.PeeringSecrets) error {
|
func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, req *pbpeering.SecretsWriteRequest) error {
|
||||||
if secret == nil {
|
if req == nil || req.Request == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := secret.Validate(); err != nil {
|
if err := req.Validate(); err != nil {
|
||||||
return err
|
return fmt.Errorf("invalid secret write request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
peering, err := peeringReadByIDTxn(tx, nil, secret.PeerID)
|
peering, err := peeringReadByIDTxn(tx, nil, req.PeerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read peering by id: %w", err)
|
return fmt.Errorf("failed to read peering by id: %w", err)
|
||||||
}
|
}
|
||||||
if peering == nil {
|
if peering == nil {
|
||||||
return fmt.Errorf("unknown peering %q for secret", secret.PeerID)
|
return fmt.Errorf("unknown peering %q for secret", req.PeerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the peering came from a peering token no validation is done for the given secrets.
|
// If the peering came from a peering token no validation is done for the given secrets.
|
||||||
// Dialing peers do not need to validate uniqueness because the secrets were generated elsewhere.
|
// Dialing peers do not need to validate uniqueness because the secrets were generated elsewhere.
|
||||||
if peering.ShouldDial() {
|
if peering.ShouldDial() {
|
||||||
if err := tx.Insert(tablePeeringSecrets, secret); err != nil {
|
r, ok := req.Request.(*pbpeering.SecretsWriteRequest_Establish)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("invalid request type %T when persisting stream secret for dialing peer", req.Request)
|
||||||
|
}
|
||||||
|
|
||||||
|
secrets := pbpeering.PeeringSecrets{
|
||||||
|
PeerID: req.PeerID,
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
ActiveSecretID: r.Establish.ActiveStreamSecret,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := tx.Insert(tablePeeringSecrets, &secrets); err != nil {
|
||||||
return fmt.Errorf("failed inserting peering: %w", err)
|
return fmt.Errorf("failed inserting peering: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -213,21 +223,16 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, secret *pbpeering.PeeringSec
|
|||||||
// If the peering token was generated locally, validate that the newly introduced UUID is still unique.
|
// If the peering token was generated locally, validate that the newly introduced UUID is still unique.
|
||||||
// RPC handlers validate that generated IDs are available, but availability cannot be guaranteed until the state store operation.
|
// RPC handlers validate that generated IDs are available, but availability cannot be guaranteed until the state store operation.
|
||||||
var newSecretID string
|
var newSecretID string
|
||||||
switch {
|
switch r := req.Request.(type) {
|
||||||
// Establishment secrets are written when generating peering tokens, and no other secret IDs are included.
|
|
||||||
case secret.GetEstablishment() != nil:
|
|
||||||
newSecretID = secret.GetEstablishment().SecretID
|
|
||||||
// Stream secrets can be written as:
|
|
||||||
// - A new PendingSecretID from the ExchangeSecret RPC
|
|
||||||
// - An ActiveSecretID when promoting a pending secret on first use
|
|
||||||
case secret.GetStream() != nil:
|
|
||||||
if pending := secret.GetStream().GetPendingSecretID(); pending != "" {
|
|
||||||
newSecretID = pending
|
|
||||||
}
|
|
||||||
|
|
||||||
// We do not need to check the long-lived Stream.ActiveSecretID for uniqueness because:
|
// Establishment secrets are written when generating peering tokens, and no other secret IDs are included.
|
||||||
// - In the cluster that generated it the secret is always introduced as a PendingSecretID, then promoted to ActiveSecretID.
|
case *pbpeering.SecretsWriteRequest_GenerateToken:
|
||||||
// This means that the promoted secret is already known to be unique.
|
newSecretID = r.GenerateToken.EstablishmentSecret
|
||||||
|
|
||||||
|
// When exchanging an establishment secret a new pending stream secret is generated.
|
||||||
|
// Active stream secrets doesn't need to be checked for uniqueness because it is only ever promoted from pending.
|
||||||
|
case *pbpeering.SecretsWriteRequest_ExchangeSecret:
|
||||||
|
newSecretID = r.ExchangeSecret.PendingStreamSecret
|
||||||
}
|
}
|
||||||
|
|
||||||
if newSecretID != "" {
|
if newSecretID != "" {
|
||||||
@ -244,53 +249,106 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, secret *pbpeering.PeeringSec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
existing, err := peeringSecretsReadByPeerIDTxn(tx, nil, secret.PeerID)
|
existing, err := peeringSecretsReadByPeerIDTxn(tx, nil, req.PeerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
secrets := pbpeering.PeeringSecrets{
|
||||||
|
PeerID: req.PeerID,
|
||||||
|
}
|
||||||
|
|
||||||
var toDelete []string
|
var toDelete []string
|
||||||
if existing != nil {
|
// Collect any overwritten UUIDs for deletion.
|
||||||
|
switch r := req.Request.(type) {
|
||||||
|
case *pbpeering.SecretsWriteRequest_GenerateToken:
|
||||||
|
// Store the newly-generated establishment secret, overwriting any that existed.
|
||||||
|
secrets.Establishment = &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
SecretID: r.GenerateToken.GetEstablishmentSecret(),
|
||||||
|
}
|
||||||
|
|
||||||
// Merge in existing stream secrets when persisting a new establishment secret.
|
// Merge in existing stream secrets when persisting a new establishment secret.
|
||||||
// This is to avoid invalidating stream secrets when a new peering token
|
// This is to avoid invalidating stream secrets when a new peering token
|
||||||
// is generated.
|
// is generated.
|
||||||
//
|
secrets.Stream = existing.GetStream()
|
||||||
// We purposely DO NOT do the reverse of inheriting an existing establishment secret.
|
|
||||||
// When exchanging establishment secrets for stream secrets, we invalidate the
|
|
||||||
// establishment secret by deleting it.
|
|
||||||
if secret.GetEstablishment() != nil && secret.GetStream() == nil && existing.GetStream() != nil {
|
|
||||||
secret.Stream = existing.Stream
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect any overwritten UUIDs for deletion.
|
// When a new token is generated we replace any un-used establishment secrets.
|
||||||
//
|
if existingEstablishment := existing.GetEstablishment().GetSecretID(); existingEstablishment != "" {
|
||||||
// Old establishment secret ID are always cleaned up when they don't match.
|
|
||||||
// They will either be replaced by a new one or deleted in the secret exchange RPC.
|
|
||||||
existingEstablishment := existing.GetEstablishment().GetSecretID()
|
|
||||||
if existingEstablishment != "" && existingEstablishment != secret.GetEstablishment().GetSecretID() {
|
|
||||||
toDelete = append(toDelete, existingEstablishment)
|
toDelete = append(toDelete, existingEstablishment)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Old active secret IDs are always cleaned up when they don't match.
|
case *pbpeering.SecretsWriteRequest_ExchangeSecret:
|
||||||
// They are only ever replaced when promoting a pending secret ID.
|
if existing == nil {
|
||||||
existingActive := existing.GetStream().GetActiveSecretID()
|
return fmt.Errorf("cannot exchange peering secret: no known secrets for peering")
|
||||||
if existingActive != "" && existingActive != secret.GetStream().GetActiveSecretID() {
|
}
|
||||||
|
|
||||||
|
// Store the newly-generated pending stream secret, overwriting any that existed.
|
||||||
|
secrets.Stream = &pbpeering.PeeringSecrets_Stream{
|
||||||
|
PendingSecretID: r.ExchangeSecret.GetPendingStreamSecret(),
|
||||||
|
|
||||||
|
// Avoid invalidating existing active secrets when exchanging establishment secret for pending.
|
||||||
|
ActiveSecretID: existing.GetStream().GetActiveSecretID(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// When exchanging an establishment secret we invalidate the existing establishment secret.
|
||||||
|
existingEstablishment := existing.GetEstablishment().GetSecretID()
|
||||||
|
switch {
|
||||||
|
case existingEstablishment == "":
|
||||||
|
// When there is no existing establishment secret we must not proceed because another ExchangeSecret
|
||||||
|
// RPC already invalidated it. Otherwise, this operation would overwrite the pending secret
|
||||||
|
// from the previous ExchangeSecret.
|
||||||
|
return fmt.Errorf("invalid establishment secret: peering was already established")
|
||||||
|
|
||||||
|
case existingEstablishment != r.ExchangeSecret.GetEstablishmentSecret():
|
||||||
|
// If there is an existing establishment secret but it is not the one from the request then
|
||||||
|
// we must not proceed because a newer one was generated.
|
||||||
|
return fmt.Errorf("invalid establishment secret")
|
||||||
|
|
||||||
|
default:
|
||||||
|
toDelete = append(toDelete, existingEstablishment)
|
||||||
|
}
|
||||||
|
|
||||||
|
// When exchanging an establishment secret unused pending secrets are overwritten.
|
||||||
|
if existingPending := existing.GetStream().GetPendingSecretID(); existingPending != "" {
|
||||||
|
toDelete = append(toDelete, existingPending)
|
||||||
|
}
|
||||||
|
|
||||||
|
case *pbpeering.SecretsWriteRequest_PromotePending:
|
||||||
|
if existing == nil {
|
||||||
|
return fmt.Errorf("cannot promote pending secret: no known secrets for peering")
|
||||||
|
}
|
||||||
|
if existing.GetStream().GetPendingSecretID() != r.PromotePending.GetActiveStreamSecret() {
|
||||||
|
// There is a potential race if multiple dialing clusters send an Open request with a valid
|
||||||
|
// pending secret. The secret could be validated for all concurrently at the RPC layer,
|
||||||
|
// but then the pending secret is promoted or otherwise changes for one dialer before the others.
|
||||||
|
return fmt.Errorf("invalid pending stream secret")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the newly-generated pending stream secret, overwriting any that existed.
|
||||||
|
secrets.Stream = &pbpeering.PeeringSecrets_Stream{
|
||||||
|
// Promoting a pending secret moves it to active.
|
||||||
|
PendingSecretID: "",
|
||||||
|
|
||||||
|
// Store the newly-promoted pending secret as the active secret.
|
||||||
|
ActiveSecretID: r.PromotePending.GetActiveStreamSecret(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avoid invalidating existing establishment secrets when promoting pending secrets.
|
||||||
|
secrets.Establishment = existing.GetEstablishment()
|
||||||
|
|
||||||
|
// If there was previously an active stream secret it gets replaced in favor of the pending secret
|
||||||
|
// that is being promoted.
|
||||||
|
if existingActive := existing.GetStream().GetActiveSecretID(); existingActive != "" {
|
||||||
toDelete = append(toDelete, existingActive)
|
toDelete = append(toDelete, existingActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pending secrets can change in three ways:
|
case *pbpeering.SecretsWriteRequest_Establish:
|
||||||
// - Generating a new pending secret: Nothing to delete here since there's no old pending secret being replaced.
|
// This should never happen. Dialing peers are the only ones that can call Establish,
|
||||||
// - Re-establishing a peering, and re-generating a pending secret: should delete the old one if both are non-empty.
|
// and the peering secrets for dialing peers should have been inserted earlier in the function.
|
||||||
// - Promoting a pending secret: Nothing to delete here since the pending secret is now active and still in use.
|
return fmt.Errorf("an accepting peer should not have called Establish RPC")
|
||||||
existingPending := existing.GetStream().GetPendingSecretID()
|
|
||||||
newPending := secret.GetStream().GetPendingSecretID()
|
default:
|
||||||
if existingPending != "" &&
|
return fmt.Errorf("got unexpected request type: %T", req.Request)
|
||||||
// The value of newPending indicates whether a peering is being generated/re-established (not empty)
|
|
||||||
// or whether a pending secret is being promoted (empty).
|
|
||||||
newPending != "" &&
|
|
||||||
newPending != existingPending {
|
|
||||||
toDelete = append(toDelete, existingPending)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for _, id := range toDelete {
|
for _, id := range toDelete {
|
||||||
if err := tx.Delete(tablePeeringSecretUUIDs, id); err != nil {
|
if err := tx.Delete(tablePeeringSecretUUIDs, id); err != nil {
|
||||||
@ -298,23 +356,23 @@ func (s *Store) peeringSecretsWriteTxn(tx WriteTxn, secret *pbpeering.PeeringSec
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Insert(tablePeeringSecrets, secret); err != nil {
|
if err := tx.Insert(tablePeeringSecrets, &secrets); err != nil {
|
||||||
return fmt.Errorf("failed inserting peering: %w", err)
|
return fmt.Errorf("failed inserting peering: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) PeeringSecretsDelete(idx uint64, peerID string) error {
|
func (s *Store) PeeringSecretsDelete(idx uint64, peerID string, dialer bool) error {
|
||||||
tx := s.db.WriteTxn(idx)
|
tx := s.db.WriteTxn(idx)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
if err := peeringSecretsDeleteTxn(tx, peerID); err != nil {
|
if err := peeringSecretsDeleteTxn(tx, peerID, dialer); err != nil {
|
||||||
return fmt.Errorf("failed to write peering secret: %w", err)
|
return fmt.Errorf("failed to write peering secret: %w", err)
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func peeringSecretsDeleteTxn(tx WriteTxn, peerID string) error {
|
func peeringSecretsDeleteTxn(tx WriteTxn, peerID string, dialer bool) error {
|
||||||
secretRaw, err := tx.First(tablePeeringSecrets, indexID, peerID)
|
secretRaw, err := tx.First(tablePeeringSecrets, indexID, peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to fetch secret for peering: %w", err)
|
return fmt.Errorf("failed to fetch secret for peering: %w", err)
|
||||||
@ -326,6 +384,11 @@ func peeringSecretsDeleteTxn(tx WriteTxn, peerID string) error {
|
|||||||
return fmt.Errorf("failed to delete secret for peering: %w", err)
|
return fmt.Errorf("failed to delete secret for peering: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dialing peers do not track secrets in tablePeeringSecretUUIDs.
|
||||||
|
if dialer {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
secrets, ok := secretRaw.(*pbpeering.PeeringSecrets)
|
secrets, ok := secretRaw.(*pbpeering.PeeringSecrets)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid type %T", secretRaw)
|
return fmt.Errorf("invalid type %T", secretRaw)
|
||||||
@ -520,7 +583,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
|
|||||||
|
|
||||||
// Ensure associated secrets are cleaned up when a peering is marked for deletion.
|
// Ensure associated secrets are cleaned up when a peering is marked for deletion.
|
||||||
if req.Peering.State == pbpeering.PeeringState_DELETING {
|
if req.Peering.State == pbpeering.PeeringState_DELETING {
|
||||||
if err := peeringSecretsDeleteTxn(tx, req.Peering.ID); err != nil {
|
if err := peeringSecretsDeleteTxn(tx, req.Peering.ID, req.Peering.ShouldDial()); err != nil {
|
||||||
return fmt.Errorf("failed to delete peering secrets: %w", err)
|
return fmt.Errorf("failed to delete peering secrets: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -532,7 +595,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write any secrets generated with the peering.
|
// Write any secrets generated with the peering.
|
||||||
err = s.peeringSecretsWriteTxn(tx, req.GetSecret())
|
err = s.peeringSecretsWriteTxn(tx, req.GetSecretsRequest())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to write peering establishment secret: %w", err)
|
return fmt.Errorf("failed to write peering establishment secret: %w", err)
|
||||||
}
|
}
|
||||||
@ -1097,6 +1160,10 @@ func (s *Snapshot) PeeringTrustBundles() (memdb.ResultIterator, error) {
|
|||||||
return s.tx.Get(tablePeeringTrustBundles, indexID)
|
return s.tx.Get(tablePeeringTrustBundles, indexID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Snapshot) PeeringSecrets() (memdb.ResultIterator, error) {
|
||||||
|
return s.tx.Get(tablePeeringSecrets, indexID)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Restore) Peering(p *pbpeering.Peering) error {
|
func (r *Restore) Peering(p *pbpeering.Peering) error {
|
||||||
if err := r.tx.Insert(tablePeering, p); err != nil {
|
if err := r.tx.Insert(tablePeering, p); err != nil {
|
||||||
return fmt.Errorf("failed restoring peering: %w", err)
|
return fmt.Errorf("failed restoring peering: %w", err)
|
||||||
@ -1119,6 +1186,30 @@ func (r *Restore) PeeringTrustBundle(ptb *pbpeering.PeeringTrustBundle) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Restore) PeeringSecrets(p *pbpeering.PeeringSecrets) error {
|
||||||
|
if err := r.tx.Insert(tablePeeringSecrets, p); err != nil {
|
||||||
|
return fmt.Errorf("failed restoring peering secrets: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var uuids []string
|
||||||
|
if establishment := p.GetEstablishment().GetSecretID(); establishment != "" {
|
||||||
|
uuids = append(uuids, establishment)
|
||||||
|
}
|
||||||
|
if pending := p.GetStream().GetPendingSecretID(); pending != "" {
|
||||||
|
uuids = append(uuids, pending)
|
||||||
|
}
|
||||||
|
if active := p.GetStream().GetActiveSecretID(); active != "" {
|
||||||
|
uuids = append(uuids, active)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, id := range uuids {
|
||||||
|
if err := r.tx.Insert(tablePeeringSecretUUIDs, id); err != nil {
|
||||||
|
return fmt.Errorf("failed restoring peering secret UUIDs: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// peersForServiceTxn returns the names of all peers that a service is exported to.
|
// peersForServiceTxn returns the names of all peers that a service is exported to.
|
||||||
func peersForServiceTxn(
|
func peersForServiceTxn(
|
||||||
tx ReadTxn,
|
tx ReadTxn,
|
||||||
|
@ -58,7 +58,7 @@ func insertTestPeerings(t *testing.T, s *Store) {
|
|||||||
require.NoError(t, tx.Commit())
|
require.NoError(t, tx.Commit())
|
||||||
}
|
}
|
||||||
|
|
||||||
func insertTestPeeringSecret(t *testing.T, s *Store, secret *pbpeering.PeeringSecrets) {
|
func insertTestPeeringSecret(t *testing.T, s *Store, secret *pbpeering.PeeringSecrets, dialer bool) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
tx := s.db.WriteTxn(0)
|
tx := s.db.WriteTxn(0)
|
||||||
@ -78,9 +78,12 @@ func insertTestPeeringSecret(t *testing.T, s *Store, secret *pbpeering.PeeringSe
|
|||||||
uuids = append(uuids, active)
|
uuids = append(uuids, active)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, id := range uuids {
|
// Dialing peers do not track secret UUIDs because they don't generate them.
|
||||||
err = tx.Insert(tablePeeringSecretUUIDs, id)
|
if !dialer {
|
||||||
require.NoError(t, err)
|
for _, id := range uuids {
|
||||||
|
err = tx.Insert(tablePeeringSecretUUIDs, id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, tx.Commit())
|
require.NoError(t, tx.Commit())
|
||||||
@ -182,7 +185,7 @@ func TestStateStore_PeeringSecretsRead(t *testing.T) {
|
|||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
SecretID: testFooSecretID,
|
SecretID: testFooSecretID,
|
||||||
},
|
},
|
||||||
})
|
}, false)
|
||||||
|
|
||||||
type testcase struct {
|
type testcase struct {
|
||||||
name string
|
name string
|
||||||
@ -233,24 +236,45 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
writeSeed := func(s *Store, req *pbpeering.PeeringWriteRequest) {
|
var (
|
||||||
|
testSecretOne = testUUID()
|
||||||
|
testSecretTwo = testUUID()
|
||||||
|
testSecretThree = testUUID()
|
||||||
|
testSecretFour = testUUID()
|
||||||
|
)
|
||||||
|
|
||||||
|
type testSeed struct {
|
||||||
|
peering *pbpeering.Peering
|
||||||
|
secrets *pbpeering.PeeringSecrets
|
||||||
|
}
|
||||||
|
|
||||||
|
type testcase struct {
|
||||||
|
name string
|
||||||
|
seed *testSeed
|
||||||
|
input *pbpeering.SecretsWriteRequest
|
||||||
|
expect *pbpeering.PeeringSecrets
|
||||||
|
expectUUIDs []string
|
||||||
|
expectErr string
|
||||||
|
}
|
||||||
|
|
||||||
|
writeSeed := func(s *Store, seed *testSeed) {
|
||||||
tx := s.db.WriteTxn(1)
|
tx := s.db.WriteTxn(1)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
if req.Peering != nil {
|
if seed.peering != nil {
|
||||||
require.NoError(t, tx.Insert(tablePeering, req.Peering))
|
require.NoError(t, tx.Insert(tablePeering, seed.peering))
|
||||||
}
|
}
|
||||||
if req.Secret != nil {
|
if seed.secrets != nil {
|
||||||
require.NoError(t, tx.Insert(tablePeeringSecrets, req.Secret))
|
require.NoError(t, tx.Insert(tablePeeringSecrets, seed.secrets))
|
||||||
|
|
||||||
var toInsert []string
|
var toInsert []string
|
||||||
if establishment := req.Secret.GetEstablishment().GetSecretID(); establishment != "" {
|
if establishment := seed.secrets.GetEstablishment().GetSecretID(); establishment != "" {
|
||||||
toInsert = append(toInsert, establishment)
|
toInsert = append(toInsert, establishment)
|
||||||
}
|
}
|
||||||
if pending := req.Secret.GetStream().GetPendingSecretID(); pending != "" {
|
if pending := seed.secrets.GetStream().GetPendingSecretID(); pending != "" {
|
||||||
toInsert = append(toInsert, pending)
|
toInsert = append(toInsert, pending)
|
||||||
}
|
}
|
||||||
if active := req.Secret.GetStream().GetActiveSecretID(); active != "" {
|
if active := seed.secrets.GetStream().GetActiveSecretID(); active != "" {
|
||||||
toInsert = append(toInsert, active)
|
toInsert = append(toInsert, active)
|
||||||
}
|
}
|
||||||
for _, id := range toInsert {
|
for _, id := range toInsert {
|
||||||
@ -261,20 +285,6 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
tx.Commit()
|
tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
testSecretOne = testUUID()
|
|
||||||
testSecretTwo = testUUID()
|
|
||||||
testSecretThree = testUUID()
|
|
||||||
)
|
|
||||||
|
|
||||||
type testcase struct {
|
|
||||||
name string
|
|
||||||
seed *pbpeering.PeeringWriteRequest
|
|
||||||
input *pbpeering.PeeringSecrets
|
|
||||||
expect *pbpeering.PeeringSecrets
|
|
||||||
expectUUIDs []string
|
|
||||||
expectErr string
|
|
||||||
}
|
|
||||||
run := func(t *testing.T, tc testcase) {
|
run := func(t *testing.T, tc testcase) {
|
||||||
s := NewStateStore(nil)
|
s := NewStateStore(nil)
|
||||||
|
|
||||||
@ -291,7 +301,7 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Validate that we read what we expect
|
// Validate that we read what we expect
|
||||||
secrets, err := s.PeeringSecretsRead(nil, tc.input.PeerID)
|
secrets, err := s.PeeringSecretsRead(nil, tc.input.GetPeerID())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, secrets)
|
require.NotNil(t, secrets)
|
||||||
prototest.AssertDeepEqual(t, tc.expect, secrets)
|
prototest.AssertDeepEqual(t, tc.expect, secrets)
|
||||||
@ -301,40 +311,131 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
tcs := []testcase{
|
tcs := []testcase{
|
||||||
{
|
{
|
||||||
name: "missing peer id",
|
name: "missing peer id",
|
||||||
input: &pbpeering.PeeringSecrets{},
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{},
|
||||||
|
},
|
||||||
expectErr: "missing peer ID",
|
expectErr: "missing peer ID",
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "no secret IDs were embedded",
|
|
||||||
input: &pbpeering.PeeringSecrets{
|
|
||||||
PeerID: testFooPeerID,
|
|
||||||
},
|
|
||||||
expectErr: "no secret IDs were embedded",
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "unknown peer id",
|
name: "unknown peer id",
|
||||||
input: &pbpeering.PeeringSecrets{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
SecretID: testFooSecretID,
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testFooSecretID,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectErr: "unknown peering",
|
expectErr: "unknown peering",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "dialing peer does not track UUIDs",
|
name: "no secret IDs were embedded when generating token",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
Peering: &pbpeering.Peering{
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{},
|
||||||
|
},
|
||||||
|
expectErr: "missing secret ID",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no secret IDs were embedded when establishing peering",
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_Establish{},
|
||||||
|
},
|
||||||
|
expectErr: "missing secret ID",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no secret IDs were embedded when exchanging secret",
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{},
|
||||||
|
},
|
||||||
|
expectErr: "missing secret ID",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no secret IDs were embedded when promoting pending secret",
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{},
|
||||||
|
},
|
||||||
|
expectErr: "missing secret ID",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dialing peer invalid request type - generate token",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testFooPeerID,
|
ID: testFooPeerID,
|
||||||
PeerServerAddresses: []string{"10.0.0.1:5300"},
|
PeerServerAddresses: []string{"10.0.0.1:5300"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
input: &pbpeering.PeeringSecrets{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
// Dialing peer must only write secrets from Establish
|
||||||
ActiveSecretID: testFooSecretID,
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testFooSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "invalid request type",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dialing peer invalid request type - exchange secret",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
PeerServerAddresses: []string{"10.0.0.1:5300"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
// Dialing peer must only write secrets from Establish
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
PendingStreamSecret: testFooSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "invalid request type",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dialing peer invalid request type - promote pending",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
PeerServerAddresses: []string{"10.0.0.1:5300"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
// Dialing peer must only write secrets from Establish
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{
|
||||||
|
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
|
||||||
|
ActiveStreamSecret: testFooSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "invalid request type",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "dialing peer does not track UUIDs",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
PeerServerAddresses: []string{"10.0.0.1:5300"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_Establish{
|
||||||
|
Establish: &pbpeering.SecretsWriteRequest_EstablishRequest{
|
||||||
|
ActiveStreamSecret: testFooSecretID,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expect: &pbpeering.PeeringSecrets{
|
expect: &pbpeering.PeeringSecrets{
|
||||||
@ -347,13 +448,13 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
expectUUIDs: []string{},
|
expectUUIDs: []string{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "generate new establishment secret",
|
name: "generate new establishment secret when secrets already existed",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testFooPeerID,
|
ID: testFooPeerID,
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
PendingSecretID: testSecretOne,
|
PendingSecretID: testSecretOne,
|
||||||
@ -361,10 +462,12 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
input: &pbpeering.PeeringSecrets{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
SecretID: testSecretThree,
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testSecretThree,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expect: &pbpeering.PeeringSecrets{
|
expect: &pbpeering.PeeringSecrets{
|
||||||
@ -381,24 +484,26 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
expectUUIDs: []string{testSecretOne, testSecretTwo, testSecretThree},
|
expectUUIDs: []string{testSecretOne, testSecretTwo, testSecretThree},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "replace establishment secret",
|
name: "generate new token to replace establishment secret",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testFooPeerID,
|
ID: testFooPeerID,
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
SecretID: testSecretOne,
|
SecretID: testSecretOne,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
input: &pbpeering.PeeringSecrets{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
// Two replaces One
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
SecretID: testSecretTwo,
|
// Two replaces One
|
||||||
|
EstablishmentSecret: testSecretTwo,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expect: &pbpeering.PeeringSecrets{
|
expect: &pbpeering.PeeringSecrets{
|
||||||
@ -410,46 +515,96 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
expectUUIDs: []string{testSecretTwo},
|
expectUUIDs: []string{testSecretTwo},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "generate new pending secret",
|
name: "cannot exchange secret without existing secrets",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testFooPeerID,
|
ID: testFooPeerID,
|
||||||
},
|
},
|
||||||
|
// Do not seed an establishment secret.
|
||||||
},
|
},
|
||||||
input: &pbpeering.PeeringSecrets{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
PendingSecretID: testSecretOne,
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
PendingStreamSecret: testSecretOne,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expect: &pbpeering.PeeringSecrets{
|
expectErr: "no known secrets for peering",
|
||||||
PeerID: testFooPeerID,
|
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
|
||||||
PendingSecretID: testSecretOne,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectUUIDs: []string{testSecretOne},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "replace pending secret",
|
name: "cannot exchange secret without establishment secret",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testFooPeerID,
|
ID: testFooPeerID,
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
PendingSecretID: testSecretOne,
|
PendingSecretID: testSecretOne,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
input: &pbpeering.PeeringSecrets{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
// Two replaces One
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
PendingSecretID: testSecretTwo,
|
// Attempt to replace One with Two
|
||||||
|
PendingStreamSecret: testSecretTwo,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "peering was already established",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cannot exchange secret without valid establishment secret",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
},
|
||||||
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
SecretID: testSecretOne,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
// Given secret Three does not match One
|
||||||
|
EstablishmentSecret: testSecretThree,
|
||||||
|
PendingStreamSecret: testSecretTwo,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "invalid establishment secret",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "exchange secret to generate new pending secret",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
},
|
||||||
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
SecretID: testSecretOne,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
EstablishmentSecret: testSecretOne,
|
||||||
|
PendingStreamSecret: testSecretTwo,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expect: &pbpeering.PeeringSecrets{
|
expect: &pbpeering.PeeringSecrets{
|
||||||
@ -458,16 +613,101 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
PendingSecretID: testSecretTwo,
|
PendingSecretID: testSecretTwo,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
// Establishment secret testSecretOne is discarded when exchanging for a stream secret
|
||||||
expectUUIDs: []string{testSecretTwo},
|
expectUUIDs: []string{testSecretTwo},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "promote pending secret and delete active",
|
name: "exchange secret replaces pending stream secret",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testFooPeerID,
|
ID: testFooPeerID,
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
SecretID: testSecretFour,
|
||||||
|
},
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
ActiveSecretID: testSecretOne,
|
||||||
|
PendingSecretID: testSecretTwo,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
EstablishmentSecret: testSecretFour,
|
||||||
|
|
||||||
|
// Three replaces two
|
||||||
|
PendingStreamSecret: testSecretThree,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expect: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
// Establishment secret is discarded in favor of new pending secret.
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
// Active secret is not deleted until the new pending secret is promoted
|
||||||
|
ActiveSecretID: testSecretOne,
|
||||||
|
PendingSecretID: testSecretThree,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectUUIDs: []string{testSecretOne, testSecretThree},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cannot promote pending without existing secrets",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
},
|
||||||
|
// Do not seed a pending secret.
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{
|
||||||
|
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
|
||||||
|
ActiveStreamSecret: testSecretOne,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "no known secrets for peering",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cannot promote pending without existing pending secret",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
},
|
||||||
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
ActiveSecretID: testSecretOne,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{
|
||||||
|
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
|
||||||
|
// Attempt to replace One with Two
|
||||||
|
ActiveStreamSecret: testSecretTwo,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "invalid pending stream secret",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cannot promote pending without valid pending secret",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
},
|
||||||
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
PendingSecretID: testSecretTwo,
|
PendingSecretID: testSecretTwo,
|
||||||
@ -475,20 +715,55 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
input: &pbpeering.PeeringSecrets{
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{
|
||||||
// Two gets promoted over One
|
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
|
||||||
ActiveSecretID: testSecretTwo,
|
// Attempting to write secret Three, but pending secret is Two
|
||||||
|
ActiveStreamSecret: testSecretThree,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: "invalid pending stream secret",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "promote pending secret and delete active",
|
||||||
|
seed: &testSeed{
|
||||||
|
peering: &pbpeering.Peering{
|
||||||
|
Name: "foo",
|
||||||
|
ID: testFooPeerID,
|
||||||
|
},
|
||||||
|
secrets: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
SecretID: testSecretThree,
|
||||||
|
},
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
PendingSecretID: testSecretTwo,
|
||||||
|
ActiveSecretID: testSecretOne,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
input: &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{
|
||||||
|
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
|
||||||
|
// Two gets promoted over One
|
||||||
|
ActiveStreamSecret: testSecretTwo,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expect: &pbpeering.PeeringSecrets{
|
expect: &pbpeering.PeeringSecrets{
|
||||||
PeerID: testFooPeerID,
|
PeerID: testFooPeerID,
|
||||||
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
// Establishment secret remains valid when promoting a stream secret.
|
||||||
|
SecretID: testSecretThree,
|
||||||
|
},
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
ActiveSecretID: testSecretTwo,
|
ActiveSecretID: testSecretTwo,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectUUIDs: []string{testSecretTwo},
|
expectUUIDs: []string{testSecretTwo, testSecretThree},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
@ -499,40 +774,67 @@ func TestStore_PeeringSecretsWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStore_PeeringSecretsDelete(t *testing.T) {
|
func TestStore_PeeringSecretsDelete(t *testing.T) {
|
||||||
s := NewStateStore(nil)
|
|
||||||
insertTestPeerings(t, s)
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
establishmentID = "b4b9cbae-4bbd-454b-b7ae-441a5c89c3b9"
|
establishmentID = "b4b9cbae-4bbd-454b-b7ae-441a5c89c3b9"
|
||||||
pendingID = "0ba06390-bd77-4c52-8397-f88c0867157d"
|
pendingID = "0ba06390-bd77-4c52-8397-f88c0867157d"
|
||||||
activeID = "0b8a3817-aca0-4c06-94b6-b0763a5cd013"
|
activeID = "0b8a3817-aca0-4c06-94b6-b0763a5cd013"
|
||||||
)
|
)
|
||||||
|
|
||||||
insertTestPeeringSecret(t, s, &pbpeering.PeeringSecrets{
|
type testCase struct {
|
||||||
PeerID: testFooPeerID,
|
dialer bool
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
secret *pbpeering.PeeringSecrets
|
||||||
SecretID: establishmentID,
|
}
|
||||||
},
|
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
|
||||||
PendingSecretID: pendingID,
|
|
||||||
ActiveSecretID: activeID,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
require.NoError(t, s.PeeringSecretsDelete(12, testFooPeerID))
|
run := func(t *testing.T, tc testCase) {
|
||||||
|
s := NewStateStore(nil)
|
||||||
|
|
||||||
// The secrets should be gone
|
insertTestPeerings(t, s)
|
||||||
secrets, err := s.PeeringSecretsRead(nil, testFooPeerID)
|
insertTestPeeringSecret(t, s, tc.secret, tc.dialer)
|
||||||
require.NoError(t, err)
|
|
||||||
require.Nil(t, secrets)
|
|
||||||
|
|
||||||
// The UUIDs should be free
|
require.NoError(t, s.PeeringSecretsDelete(12, testFooPeerID, tc.dialer))
|
||||||
uuids := []string{establishmentID, pendingID, activeID}
|
|
||||||
|
|
||||||
for _, id := range uuids {
|
// The secrets should be gone
|
||||||
free, err := s.ValidateProposedPeeringSecretUUID(id)
|
secrets, err := s.PeeringSecretsRead(nil, testFooPeerID)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, free)
|
require.Nil(t, secrets)
|
||||||
|
|
||||||
|
uuids := []string{establishmentID, pendingID, activeID}
|
||||||
|
for _, id := range uuids {
|
||||||
|
free, err := s.ValidateProposedPeeringSecretUUID(id)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, free)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tt := map[string]testCase{
|
||||||
|
"acceptor": {
|
||||||
|
dialer: false,
|
||||||
|
secret: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
||||||
|
SecretID: establishmentID,
|
||||||
|
},
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
PendingSecretID: pendingID,
|
||||||
|
ActiveSecretID: activeID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"dialer": {
|
||||||
|
dialer: true,
|
||||||
|
secret: &pbpeering.PeeringSecrets{
|
||||||
|
PeerID: testFooPeerID,
|
||||||
|
Stream: &pbpeering.PeeringSecrets_Stream{
|
||||||
|
ActiveSecretID: activeID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range tt {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -847,10 +1149,12 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||||||
Name: "baz",
|
Name: "baz",
|
||||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
SecretsRequest: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testBazPeerID,
|
PeerID: testBazPeerID,
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
SecretID: testBazSecretID,
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testBazSecretID,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -99,7 +99,7 @@ type Backend interface {
|
|||||||
GetLeaderAddress() string
|
GetLeaderAddress() string
|
||||||
|
|
||||||
ValidateProposedPeeringSecret(id string) (bool, error)
|
ValidateProposedPeeringSecret(id string) (bool, error)
|
||||||
PeeringSecretsWrite(req *pbpeering.PeeringSecrets) error
|
PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error
|
||||||
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
|
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
|
||||||
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
||||||
CatalogRegister(req *structs.RegisterRequest) error
|
CatalogRegister(req *structs.RegisterRequest) error
|
||||||
|
@ -25,13 +25,17 @@ func TestServer_ExchangeSecret(t *testing.T) {
|
|||||||
|
|
||||||
var secret string
|
var secret string
|
||||||
testutil.RunStep(t, "known establishment secret is accepted", func(t *testing.T) {
|
testutil.RunStep(t, "known establishment secret is accepted", func(t *testing.T) {
|
||||||
require.NoError(t, store.PeeringSecretsWrite(1, &pbpeering.PeeringSecrets{
|
// First write the establishment secret so that it can be exchanged
|
||||||
PeerID: testPeerID,
|
require.NoError(t, store.PeeringSecretsWrite(1, &pbpeering.SecretsWriteRequest{
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{SecretID: testEstablishmentSecretID},
|
PeerID: testPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
ActiveSecretID: testActiveStreamSecretID,
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
// Exchange the now-valid establishment secret for a stream secret
|
||||||
resp, err := srv.ExchangeSecret(context.Background(), &pbpeerstream.ExchangeSecretRequest{
|
resp, err := srv.ExchangeSecret(context.Background(), &pbpeerstream.ExchangeSecretRequest{
|
||||||
PeerID: testPeerID,
|
PeerID: testPeerID,
|
||||||
EstablishmentSecret: testEstablishmentSecretID,
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
@ -47,8 +51,5 @@ func TestServer_ExchangeSecret(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, secret, s.GetStream().GetPendingSecretID())
|
require.Equal(t, secret, s.GetStream().GetPendingSecretID())
|
||||||
|
|
||||||
// Active stream secret persists until pending secret is promoted during peering establishment.
|
|
||||||
require.Equal(t, testActiveStreamSecretID, s.GetStream().GetActiveSecretID())
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -77,20 +77,21 @@ func (s *Server) ExchangeSecret(ctx context.Context, req *pbpeerstream.ExchangeS
|
|||||||
return nil, grpcstatus.Errorf(codes.Internal, "failed to generate peering stream secret: %v", err)
|
return nil, grpcstatus.Errorf(codes.Internal, "failed to generate peering stream secret: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
secrets := &pbpeering.PeeringSecrets{
|
writeReq := &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: req.PeerID,
|
PeerID: req.PeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
// Overwriting any existing un-utilized pending stream secret.
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
PendingSecretID: id,
|
// Pass the given establishment secret to that it can be re-validated at the state store.
|
||||||
|
// Validating the establishment secret at the RPC is not enough because there can be
|
||||||
|
// concurrent callers with the same establishment secret.
|
||||||
|
EstablishmentSecret: req.EstablishmentSecret,
|
||||||
|
|
||||||
// If there is an active stream secret ID it is NOT invalidated here.
|
// Overwrite any existing un-utilized pending stream secret.
|
||||||
// It remains active until the pending secret ID is used and promoted to active.
|
PendingStreamSecret: id,
|
||||||
// This allows dialing clusters with the active stream secret to continue to dial successfully until they
|
},
|
||||||
// receive the new secret.
|
|
||||||
ActiveSecretID: existing.GetStream().GetActiveSecretID(),
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = s.Backend.PeeringSecretsWrite(secrets)
|
err = s.Backend.PeeringSecretsWrite(writeReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, grpcstatus.Errorf(codes.Internal, "failed to persist peering secret: %v", err)
|
return nil, grpcstatus.Errorf(codes.Internal, "failed to persist peering secret: %v", err)
|
||||||
}
|
}
|
||||||
@ -191,14 +192,13 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
|
|||||||
}
|
}
|
||||||
authorized = true
|
authorized = true
|
||||||
|
|
||||||
promoted := &pbpeering.PeeringSecrets{
|
promoted := &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: req.PeerID,
|
PeerID: p.ID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{
|
||||||
ActiveSecretID: pending,
|
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
|
||||||
|
// Overwrite any existing un-utilized pending stream secret.
|
||||||
// The PendingSecretID is intentionally zeroed out since we want to avoid re-triggering this
|
ActiveStreamSecret: pending,
|
||||||
// promotion process with the same pending secret.
|
},
|
||||||
PendingSecretID: "",
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err = s.Backend.PeeringSecretsWrite(promoted)
|
err = s.Backend.PeeringSecretsWrite(promoted)
|
||||||
|
@ -181,9 +181,13 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
|
func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
|
||||||
|
type testSeed struct {
|
||||||
|
peering *pbpeering.Peering
|
||||||
|
secrets []*pbpeering.SecretsWriteRequest
|
||||||
|
}
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
seed *pbpeering.PeeringWriteRequest
|
seed *testSeed
|
||||||
input *pbpeerstream.ReplicationMessage
|
input *pbpeerstream.ReplicationMessage
|
||||||
wantErr error
|
wantErr error
|
||||||
}
|
}
|
||||||
@ -194,7 +198,13 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
|
|||||||
srv, store := newTestServer(t, nil)
|
srv, store := newTestServer(t, nil)
|
||||||
|
|
||||||
// Write a seed peering.
|
// Write a seed peering.
|
||||||
require.NoError(t, store.PeeringWrite(1, tc.seed))
|
if tc.seed != nil {
|
||||||
|
require.NoError(t, store.PeeringWrite(1, &pbpeering.PeeringWriteRequest{Peering: tc.seed.peering}))
|
||||||
|
|
||||||
|
for _, s := range tc.seed.secrets {
|
||||||
|
require.NoError(t, store.PeeringSecretsWrite(1, s))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Set the initial roots and CA configuration.
|
// Set the initial roots and CA configuration.
|
||||||
_, _ = writeInitialRootsAndCA(t, store)
|
_, _ = writeInitialRootsAndCA(t, store)
|
||||||
@ -223,12 +233,14 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client.Close()
|
||||||
}
|
}
|
||||||
tt := []testCase{
|
tt := []testCase{
|
||||||
{
|
{
|
||||||
name: "no secret for peering",
|
name: "no secret for peering",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: peeringWithoutSecrets,
|
ID: peeringWithoutSecrets,
|
||||||
},
|
},
|
||||||
@ -244,15 +256,19 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "unknown secret",
|
name: "unknown secret",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testPeerID,
|
ID: testPeerID,
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
secrets: []*pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testPeerID,
|
{
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
PeerID: testPeerID,
|
||||||
ActiveSecretID: testActiveStreamSecretID,
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -267,16 +283,29 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
|
|||||||
wantErr: status.Error(codes.PermissionDenied, "invalid peering stream secret"),
|
wantErr: status.Error(codes.PermissionDenied, "invalid peering stream secret"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "known active secret",
|
name: "known pending secret",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testPeerID,
|
ID: testPeerID,
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
secrets: []*pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testPeerID,
|
{
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
PeerID: testPeerID,
|
||||||
ActiveSecretID: testActiveStreamSecretID,
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
PeerID: testPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
PendingStreamSecret: testPendingStreamSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -284,22 +313,44 @@ func TestStreamResources_Server_ActiveSecretValidation(t *testing.T) {
|
|||||||
Payload: &pbpeerstream.ReplicationMessage_Open_{
|
Payload: &pbpeerstream.ReplicationMessage_Open_{
|
||||||
Open: &pbpeerstream.ReplicationMessage_Open{
|
Open: &pbpeerstream.ReplicationMessage_Open{
|
||||||
PeerID: testPeerID,
|
PeerID: testPeerID,
|
||||||
StreamSecretID: testActiveStreamSecretID,
|
StreamSecretID: testPendingStreamSecretID,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "known pending secret",
|
name: "known active secret",
|
||||||
seed: &pbpeering.PeeringWriteRequest{
|
seed: &testSeed{
|
||||||
Peering: &pbpeering.Peering{
|
peering: &pbpeering.Peering{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
ID: testPeerID,
|
ID: testPeerID,
|
||||||
},
|
},
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
secrets: []*pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testPeerID,
|
{
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
PeerID: testPeerID,
|
||||||
PendingSecretID: testPendingStreamSecretID,
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
PeerID: testPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
PendingStreamSecret: testPendingStreamSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
PeerID: testPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_PromotePending{
|
||||||
|
PromotePending: &pbpeering.SecretsWriteRequest_PromotePendingRequest{
|
||||||
|
// Pending gets promoted to active.
|
||||||
|
ActiveStreamSecret: testPendingStreamSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1390,7 +1441,7 @@ func (b *testStreamBackend) ValidateProposedPeeringSecret(id string) (bool, erro
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.PeeringSecrets) error {
|
func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error {
|
||||||
return b.store.PeeringSecretsWrite(1, req)
|
return b.store.PeeringSecretsWrite(1, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1631,12 +1682,25 @@ func writeTestPeering(t *testing.T, store *state.Store, idx uint64, peerName, re
|
|||||||
if remotePeerID != "" {
|
if remotePeerID != "" {
|
||||||
peering.PeerServerAddresses = []string{"127.0.0.1:5300"}
|
peering.PeerServerAddresses = []string{"127.0.0.1:5300"}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, store.PeeringWrite(idx, &pbpeering.PeeringWriteRequest{
|
require.NoError(t, store.PeeringWrite(idx, &pbpeering.PeeringWriteRequest{
|
||||||
Peering: &peering,
|
Peering: &peering,
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
SecretsRequest: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: testPeerID,
|
PeerID: testPeerID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
// Simulate generating a stream secret by first generating a token then exchanging for a stream secret.
|
||||||
PendingSecretID: testPendingStreamSecretID,
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
require.NoError(t, store.PeeringSecretsWrite(idx, &pbpeering.SecretsWriteRequest{
|
||||||
|
PeerID: testPeerID,
|
||||||
|
Request: &pbpeering.SecretsWriteRequest_ExchangeSecret{
|
||||||
|
ExchangeSecret: &pbpeering.SecretsWriteRequest_ExchangeSecretRequest{
|
||||||
|
EstablishmentSecret: testEstablishmentSecretID,
|
||||||
|
PendingStreamSecret: testPendingStreamSecretID,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
|
@ -260,10 +260,12 @@ func (s *Server) GenerateToken(
|
|||||||
|
|
||||||
writeReq := &pbpeering.PeeringWriteRequest{
|
writeReq := &pbpeering.PeeringWriteRequest{
|
||||||
Peering: peering,
|
Peering: peering,
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
SecretsRequest: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: peering.ID,
|
PeerID: peering.ID,
|
||||||
Establishment: &pbpeering.PeeringSecrets_Establishment{
|
Request: &pbpeering.SecretsWriteRequest_GenerateToken{
|
||||||
SecretID: secretID,
|
GenerateToken: &pbpeering.SecretsWriteRequest_GenerateTokenRequest{
|
||||||
|
EstablishmentSecret: secretID,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -431,18 +433,20 @@ func (s *Server) Establish(
|
|||||||
return nil, dialErrors
|
return nil, dialErrors
|
||||||
}
|
}
|
||||||
|
|
||||||
// As soon as a peering is written with a list of ServerAddresses that is
|
// As soon as a peering is written with a non-empty list of ServerAddresses
|
||||||
// non-empty, the leader routine will see the peering and attempt to
|
// and an active stream secret, a leader routine will see the peering and
|
||||||
// establish a connection with the remote peer.
|
// attempt to establish a peering stream with the remote peer.
|
||||||
//
|
//
|
||||||
// This peer now has a record of both the LocalPeerID(ID) and
|
// This peer now has a record of both the LocalPeerID(ID) and
|
||||||
// RemotePeerID(PeerID) but at this point the other peer does not.
|
// RemotePeerID(PeerID) but at this point the other peer does not.
|
||||||
writeReq := &pbpeering.PeeringWriteRequest{
|
writeReq := &pbpeering.PeeringWriteRequest{
|
||||||
Peering: peering,
|
Peering: peering,
|
||||||
Secret: &pbpeering.PeeringSecrets{
|
SecretsRequest: &pbpeering.SecretsWriteRequest{
|
||||||
PeerID: peering.ID,
|
PeerID: peering.ID,
|
||||||
Stream: &pbpeering.PeeringSecrets_Stream{
|
Request: &pbpeering.SecretsWriteRequest_Establish{
|
||||||
ActiveSecretID: exchangeResp.StreamSecret,
|
Establish: &pbpeering.SecretsWriteRequest_EstablishRequest{
|
||||||
|
ActiveStreamSecret: exchangeResp.StreamSecret,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -729,10 +733,11 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete
|
|||||||
// We only need to include the name and partition for the peering to be identified.
|
// We only need to include the name and partition for the peering to be identified.
|
||||||
// All other data associated with the peering can be discarded because once marked
|
// All other data associated with the peering can be discarded because once marked
|
||||||
// for deletion the peering is effectively gone.
|
// for deletion the peering is effectively gone.
|
||||||
ID: existing.ID,
|
ID: existing.ID,
|
||||||
Name: req.Name,
|
Name: req.Name,
|
||||||
State: pbpeering.PeeringState_DELETING,
|
State: pbpeering.PeeringState_DELETING,
|
||||||
DeletedAt: structs.TimeToProto(time.Now().UTC()),
|
PeerServerAddresses: existing.PeerServerAddresses,
|
||||||
|
DeletedAt: structs.TimeToProto(time.Now().UTC()),
|
||||||
|
|
||||||
// PartitionOrEmpty is used to avoid writing "default" in OSS.
|
// PartitionOrEmpty is used to avoid writing "default" in OSS.
|
||||||
Partition: entMeta.PartitionOrEmpty(),
|
Partition: entMeta.PartitionOrEmpty(),
|
||||||
|
@ -155,17 +155,32 @@ func (p *Peering) IsActive() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Validate is a validation helper that checks whether a secret ID is embedded in the container type.
|
// Validate is a validation helper that checks whether a secret ID is embedded in the container type.
|
||||||
func (p *PeeringSecrets) Validate() error {
|
func (s *SecretsWriteRequest) Validate() error {
|
||||||
if p.GetPeerID() == "" {
|
if s.PeerID == "" {
|
||||||
return errors.New("missing peer ID")
|
return errors.New("missing peer ID")
|
||||||
}
|
}
|
||||||
if p.GetEstablishment().GetSecretID() != "" {
|
switch r := s.Request.(type) {
|
||||||
return nil
|
case *SecretsWriteRequest_GenerateToken:
|
||||||
|
if r != nil && r.GenerateToken.GetEstablishmentSecret() != "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case *SecretsWriteRequest_Establish:
|
||||||
|
if r != nil && r.Establish.GetActiveStreamSecret() != "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case *SecretsWriteRequest_ExchangeSecret:
|
||||||
|
if r != nil && r.ExchangeSecret.GetPendingStreamSecret() != "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case *SecretsWriteRequest_PromotePending:
|
||||||
|
if r != nil && r.PromotePending.GetActiveStreamSecret() != "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unexpected request type %T", s.Request)
|
||||||
}
|
}
|
||||||
if p.GetStream().GetPendingSecretID() != "" || p.GetStream().GetActiveSecretID() != "" {
|
|
||||||
return nil
|
return errors.New("missing secret ID")
|
||||||
}
|
|
||||||
return errors.New("no secret IDs were embedded")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TLSDialOption returns the gRPC DialOption to secure the transport if CAPems
|
// TLSDialOption returns the gRPC DialOption to secure the transport if CAPems
|
||||||
|
@ -7,6 +7,56 @@ import (
|
|||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *SecretsWriteRequest) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *SecretsWriteRequest) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *SecretsWriteRequest_GenerateTokenRequest) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *SecretsWriteRequest_GenerateTokenRequest) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *SecretsWriteRequest_ExchangeSecretRequest) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *SecretsWriteRequest_ExchangeSecretRequest) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *SecretsWriteRequest_PromotePendingRequest) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *SecretsWriteRequest_PromotePendingRequest) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
|
func (msg *SecretsWriteRequest_EstablishRequest) MarshalBinary() ([]byte, error) {
|
||||||
|
return proto.Marshal(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||||
|
func (msg *SecretsWriteRequest_EstablishRequest) UnmarshalBinary(b []byte) error {
|
||||||
|
return proto.Unmarshal(b, msg)
|
||||||
|
}
|
||||||
|
|
||||||
// MarshalBinary implements encoding.BinaryMarshaler
|
// MarshalBinary implements encoding.BinaryMarshaler
|
||||||
func (msg *PeeringSecrets) MarshalBinary() ([]byte, error) {
|
func (msg *PeeringSecrets) MarshalBinary() ([]byte, error) {
|
||||||
return proto.Marshal(msg)
|
return proto.Marshal(msg)
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -53,6 +53,59 @@ enum PeeringState {
|
|||||||
TERMINATED = 6;
|
TERMINATED = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SecretsWriteRequest encodes a request to write a peering secret as the result
|
||||||
|
// of some operation. Different operations, such as generating a peering token,
|
||||||
|
// lead to modifying the known secrets associated with a peering.
|
||||||
|
message SecretsWriteRequest {
|
||||||
|
// PeerID is the local UUID of the peering this request applies to.
|
||||||
|
string PeerID = 1;
|
||||||
|
|
||||||
|
oneof Request {
|
||||||
|
GenerateTokenRequest generate_token = 2;
|
||||||
|
ExchangeSecretRequest exchange_secret = 3;
|
||||||
|
PromotePendingRequest promote_pending = 4;
|
||||||
|
EstablishRequest establish = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
// GenerateTokenRequest encodes a request to persist a peering establishment
|
||||||
|
// secret. It is triggered by generating a new peering token for a peer cluster.
|
||||||
|
message GenerateTokenRequest {
|
||||||
|
// establishment_secret is the proposed secret ID to store as the establishment
|
||||||
|
// secret for this peering.
|
||||||
|
string establishment_secret = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExchangeSecretRequest encodes a request to persist a pending stream secret
|
||||||
|
// secret. It is triggered by an acceptor peer generating a long-lived stream secret
|
||||||
|
// in exchange for an establishment secret.
|
||||||
|
message ExchangeSecretRequest {
|
||||||
|
// establishment_secret is the secret to exchange for the given pending stream secret.
|
||||||
|
string establishment_secret = 1;
|
||||||
|
|
||||||
|
// pending_stream_secret is the proposed secret ID to store as the pending stream
|
||||||
|
// secret for this peering.
|
||||||
|
string pending_stream_secret = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// PromotePendingRequest encodes a request to promote a pending stream secret
|
||||||
|
// to be an active stream secret. It is triggered when the accepting stream handler
|
||||||
|
// validates an Open request from a peer with a pending stream secret.
|
||||||
|
message PromotePendingRequest {
|
||||||
|
// active_stream_secret is the proposed secret ID to store as the active stream
|
||||||
|
// secret for this peering.
|
||||||
|
string active_stream_secret = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// EstablishRequest encodes a request to persist an active stream secret.
|
||||||
|
// It is triggered after a dialing peer exchanges their establishment secret
|
||||||
|
// for a long-lived active stream secret.
|
||||||
|
message EstablishRequest {
|
||||||
|
// active_stream_secret is the proposed secret ID to store as the active stream
|
||||||
|
// secret for this peering.
|
||||||
|
string active_stream_secret = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// PeeringSecrets defines a secret used for authenticating/authorizing peer clusters.
|
// PeeringSecrets defines a secret used for authenticating/authorizing peer clusters.
|
||||||
message PeeringSecrets {
|
message PeeringSecrets {
|
||||||
// PeerID is the local UUID of the peering this secret was generated for.
|
// PeerID is the local UUID of the peering this secret was generated for.
|
||||||
@ -195,10 +248,10 @@ message PeeringWriteRequest {
|
|||||||
// Peering is the peering to write with the request.
|
// Peering is the peering to write with the request.
|
||||||
Peering Peering = 1;
|
Peering Peering = 1;
|
||||||
|
|
||||||
// PeeringSecrets contains the optional peering secrets to persist
|
// SecretsWriteRequest contains the optional peering secrets to persist
|
||||||
// with the peering. Peering secrets are not embedded in the peering
|
// with the peering. Peering secrets are not embedded in the peering
|
||||||
// object to avoid leaking them.
|
// object to avoid leaking them.
|
||||||
PeeringSecrets Secret = 2;
|
SecretsWriteRequest SecretsRequest = 2;
|
||||||
|
|
||||||
map<string, string> Meta = 3;
|
map<string, string> Meta = 3;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user