feat(peering): validate server name conflicts on establish

This commit is contained in:
DanStough 2022-09-09 17:09:32 -04:00 committed by Dan Stough
parent 60cee76746
commit 2a2debee64
7 changed files with 66 additions and 12 deletions

3
.changelog/14563.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
peering: Validate peering tokens for server name conflicts
```

View File

@ -1370,6 +1370,11 @@ func (s *Server) WANMembers() []serf.Member {
return s.serfWAN.Members() return s.serfWAN.Members()
} }
// GetPeeringBackend is a test helper.
func (s *Server) GetPeeringBackend() peering.Backend {
return s.peeringBackend
}
// RemoveFailedNode is used to remove a failed node from the cluster. // RemoveFailedNode is used to remove a failed node from the cluster.
func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *acl.EnterpriseMeta) error { func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *acl.EnterpriseMeta) error {
var removeFn func(*serf.Serf, string) error var removeFn func(*serf.Serf, string) error

View File

@ -267,8 +267,8 @@ func TestHTTP_Peering_Establish(t *testing.T) {
}) })
t.Run("Success", func(t *testing.T) { t.Run("Success", func(t *testing.T) {
a2 := NewTestAgent(t, "") a2 := NewTestAgent(t, `datacenter = "dc2"`)
testrpc.WaitForTestAgent(t, a2.RPC, "dc1") testrpc.WaitForTestAgent(t, a2.RPC, "dc2")
bodyBytes, err := json.Marshal(&pbpeering.GenerateTokenRequest{ bodyBytes, err := json.Marshal(&pbpeering.GenerateTokenRequest{
PeerName: "foo", PeerName: "foo",

View File

@ -374,7 +374,7 @@ func (s *Server) Establish(
return nil, err return nil, err
} }
if err := s.validatePeeringInPartition(tok.PeerID, entMeta.PartitionOrEmpty()); err != nil { if err := s.validatePeeringLocality(tok, entMeta.PartitionOrEmpty()); err != nil {
return nil, err return nil, err
} }
@ -463,15 +463,23 @@ func (s *Server) Establish(
return resp, nil return resp, nil
} }
// validatePeeringInPartition makes sure that we don't create a peering in the same partition. We validate by looking at // validatePeeringLocality makes sure that we don't create a peering in the cluster/partition it was generated.
// the remotePeerID from the PeeringToken and looking up for a peering in the partition. If there is one and the // We validate by looking at the remote PeerID from the PeeringToken and looking up that peering in the partition.
// request partition is the same, then we are attempting to peer within the partition, which we shouldn't. // If there is one and the request partition is the same, then we are attempting to peer within the partition, which we shouldn't.
func (s *Server) validatePeeringInPartition(remotePeerID, partition string) error { // We also perform a check to verify if the ServerName of the PeeringToken overlaps with our own, we do not process it
_, peering, err := s.Backend.Store().PeeringReadByID(nil, remotePeerID) // unless we've been able to find the peering in the store, i.e. this peering is between two local partitions.
func (s *Server) validatePeeringLocality(token *structs.PeeringToken, partition string) error {
_, peering, err := s.Backend.Store().PeeringReadByID(nil, token.PeerID)
if err != nil { if err != nil {
return fmt.Errorf("cannot read peering by ID: %w", err) return fmt.Errorf("cannot read peering by ID: %w", err)
} }
// If the token has the same server name as this cluster, but we can't find the peering
// in our store, it indicates a naming conflict.
if s.Backend.GetServerName() == token.ServerName && peering == nil {
return fmt.Errorf("conflict - peering token's server name matches the current cluster's server name, %q, but there is no record in the database", s.Backend.GetServerName())
}
if peering != nil && acl.EqualPartitions(peering.GetPartition(), partition) { if peering != nil && acl.EqualPartitions(peering.GetPartition(), partition) {
return fmt.Errorf("cannot create a peering within the same partition (ENT) or cluster (OSS)") return fmt.Errorf("cannot create a peering within the same partition (ENT) or cluster (OSS)")
} }

View File

@ -345,8 +345,8 @@ func TestPeeringService_Establish_Validation(t *testing.T) {
} }
} }
// We define a valid peering by a peering that does not occur over the same server addresses // Loopback peering within the same cluster/partion should throw an error
func TestPeeringService_Establish_validPeeringInPartition(t *testing.T) { func TestPeeringService_Establish_invalidPeeringInSamePartition(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this // TODO(peering): see note on newTestServer, refactor to not use this
s := newTestServer(t, nil) s := newTestServer(t, nil)
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t)) client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
@ -369,12 +369,48 @@ func TestPeeringService_Establish_validPeeringInPartition(t *testing.T) {
require.Nil(t, respE) require.Nil(t, respE)
} }
// When tokens have the same name as the dialing cluster but are unknown by ID, we
// should be throwing an error to note the server name conflict.
func TestPeeringService_Establish_serverNameConflict(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this
s := newTestServer(t, nil)
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
// Manufacture token to have the same server name but a PeerID not in the store.
id, err := uuid.GenerateUUID()
require.NoError(t, err, "could not generate uuid")
peeringToken := structs.PeeringToken{
ServerAddresses: []string{"1.2.3.4:8502"},
ServerName: s.Server.GetPeeringBackend().GetServerName(),
EstablishmentSecret: "foo",
PeerID: id,
}
jsonToken, err := json.Marshal(peeringToken)
require.NoError(t, err, "could not marshal peering token")
base64Token := base64.StdEncoding.EncodeToString(jsonToken)
establishReq := &pbpeering.EstablishRequest{
PeerName: "peerTwo",
PeeringToken: base64Token,
}
respE, errE := client.Establish(ctx, establishReq)
require.Error(t, errE)
require.Contains(t, errE.Error(), "conflict - peering token's server name matches the current cluster's server name")
require.Nil(t, respE)
}
func TestPeeringService_Establish(t *testing.T) { func TestPeeringService_Establish(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this // TODO(peering): see note on newTestServer, refactor to not use this
s1 := newTestServer(t, nil) s1 := newTestServer(t, nil)
client1 := pbpeering.NewPeeringServiceClient(s1.ClientConn(t)) client1 := pbpeering.NewPeeringServiceClient(s1.ClientConn(t))
s2 := newTestServer(t, func(conf *consul.Config) { s2 := newTestServer(t, func(conf *consul.Config) {
conf.Datacenter = "dc2"
conf.GRPCPort = 5301 conf.GRPCPort = 5301
}) })
client2 := pbpeering.NewPeeringServiceClient(s2.ClientConn(t)) client2 := pbpeering.NewPeeringServiceClient(s2.ClientConn(t))
@ -1070,6 +1106,7 @@ func TestPeeringService_validatePeer(t *testing.T) {
s2 := newTestServer(t, func(conf *consul.Config) { s2 := newTestServer(t, func(conf *consul.Config) {
conf.GRPCPort = 5301 conf.GRPCPort = 5301
conf.Datacenter = "dc2"
}) })
client2 := pbpeering.NewPeeringServiceClient(s2.ClientConn(t)) client2 := pbpeering.NewPeeringServiceClient(s2.ClientConn(t))

View File

@ -51,6 +51,7 @@ func TestAPI_Peering_ACLDeny(t *testing.T) {
serverConfig.ACL.Enabled = true serverConfig.ACL.Enabled = true
serverConfig.ACL.DefaultPolicy = "deny" serverConfig.ACL.DefaultPolicy = "deny"
serverConfig.Ports.GRPC = 5301 serverConfig.Ports.GRPC = 5301
serverConfig.Datacenter = "dc2"
}) })
defer s2.Stop() defer s2.Stop()

View File

@ -32,11 +32,11 @@ func TestEstablishCommand(t *testing.T) {
acceptor := agent.NewTestAgent(t, ``) acceptor := agent.NewTestAgent(t, ``)
t.Cleanup(func() { _ = acceptor.Shutdown() }) t.Cleanup(func() { _ = acceptor.Shutdown() })
dialer := agent.NewTestAgent(t, ``) dialer := agent.NewTestAgent(t, `datacenter = "dc2"`)
t.Cleanup(func() { _ = dialer.Shutdown() }) t.Cleanup(func() { _ = dialer.Shutdown() })
testrpc.WaitForTestAgent(t, acceptor.RPC, "dc1") testrpc.WaitForTestAgent(t, acceptor.RPC, "dc1")
testrpc.WaitForTestAgent(t, dialer.RPC, "dc1") testrpc.WaitForTestAgent(t, dialer.RPC, "dc2")
acceptingClient := acceptor.Client() acceptingClient := acceptor.Client()
dialingClient := dialer.Client() dialingClient := dialer.Client()