peering: use ShouldDial to validate peer role (#13823)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
alex 2022-07-22 15:56:25 -07:00 committed by GitHub
parent a1e6d69454
commit 279d458e6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 13 deletions

View File

@ -194,13 +194,22 @@ func (s *Server) GenerateToken(
}
}
peeringOrNil, err := s.getExistingPeering(req.PeerName, req.Partition)
if err != nil {
return nil, err
}
// validate that this peer name is not being used as a dialer already
if err = validatePeer(peeringOrNil, false); err != nil {
return nil, err
}
canRetry := true
RETRY_ONCE:
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
if err != nil {
return nil, err
}
writeReq := pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
ID: id,
@ -290,17 +299,32 @@ func (s *Server) Establish(
defer metrics.MeasureSince([]string{"peering", "establish"}, time.Now())
peeringOrNil, err := s.getExistingPeering(req.PeerName, req.Partition)
if err != nil {
return nil, err
}
// validate that this peer name is not being used as an acceptor already
if err = validatePeer(peeringOrNil, true); err != nil {
return nil, err
}
var id string
if peeringOrNil != nil {
id = peeringOrNil.ID
} else {
id, err = lib.GenerateUUID(s.Backend.CheckPeeringUUID)
if err != nil {
return nil, err
}
}
// convert ServiceAddress values to strings
serverAddrs := make([]string, len(tok.ServerAddresses))
for i, addr := range tok.ServerAddresses {
serverAddrs[i] = addr
}
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
if err != nil {
return nil, err
}
// as soon as a peering is written with a list of ServerAddresses that is
// non-empty, the leader routine will see the peering and attempt to
// establish a connection with the remote peer.
@ -622,16 +646,12 @@ func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.Tr
}
func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (string, error) {
q := state.Query{
Value: strings.ToLower(peerName),
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition),
}
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
peeringOrNil, err := s.getExistingPeering(peerName, partition)
if err != nil {
return "", err
}
if peering != nil {
return peering.ID, nil
if peeringOrNil != nil {
return peeringOrNil.ID, nil
}
id, err := lib.GenerateUUID(s.Backend.CheckPeeringUUID)
@ -641,6 +661,36 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin
return id, nil
}
func (s *Server) getExistingPeering(peerName, partition string) (*pbpeering.Peering, error) {
q := state.Query{
Value: strings.ToLower(peerName),
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition),
}
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
if err != nil {
return nil, err
}
return peering, nil
}
// validatePeer enforces the following rule for an existing peering:
// - if a peering already exists, it can only be used as an acceptor or dialer
//
// We define a DIALER as a peering that has server addresses (or a peering that is created via the Establish endpoint)
// Conversely, we define an ACCEPTOR as a peering that is created via the GenerateToken endpoint
func validatePeer(peering *pbpeering.Peering, allowedToDial bool) error {
if peering != nil && peering.ShouldDial() != allowedToDial {
if allowedToDial {
return fmt.Errorf("cannot create peering with name: %q; there is an existing peering expecting to be dialed", peering.Name)
} else {
return fmt.Errorf("cannot create peering with name: %q; there is already an established peering", peering.Name)
}
}
return nil
}
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
var copyP pbpeering.Peering
proto.Merge(&copyP, p)

View File

@ -531,6 +531,81 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs)
}
func TestPeeringService_validatePeer(t *testing.T) {
dir := testutil.TempDir(t, "consul")
signer, _, _ := tlsutil.GeneratePrivateKey()
ca, _, _ := tlsutil.GenerateCA(tlsutil.CAOpts{Signer: signer})
cafile := path.Join(dir, "cacert.pem")
require.NoError(t, ioutil.WriteFile(cafile, []byte(ca), 0600))
s := newTestServer(t, func(c *consul.Config) {
c.SerfLANConfig.MemberlistConfig.AdvertiseAddr = "127.0.0.1"
c.TLSConfig.GRPC.CAFile = cafile
c.DataDir = dir
})
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
testutil.RunStep(t, "generate a token", func(t *testing.T) {
req := pbpeering.GenerateTokenRequest{PeerName: "peerB"}
resp, err := client.GenerateToken(ctx, &req)
require.NoError(t, err)
require.NotEmpty(t, resp)
})
testutil.RunStep(t, "generate a token with the same name", func(t *testing.T) {
req := pbpeering.GenerateTokenRequest{PeerName: "peerB"}
resp, err := client.GenerateToken(ctx, &req)
require.NoError(t, err)
require.NotEmpty(t, resp)
})
validToken := peering.TestPeeringToken("83474a06-cca4-4ff4-99a4-4152929c8160")
validTokenJSON, _ := json.Marshal(&validToken)
validTokenB64 := base64.StdEncoding.EncodeToString(validTokenJSON)
testutil.RunStep(t, "send an establish request for a different peer name", func(t *testing.T) {
resp, err := client.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "peer1-usw1",
PeeringToken: validTokenB64,
})
require.NoError(t, err)
require.NotEmpty(t, resp)
})
testutil.RunStep(t, "send an establish request for a different peer name again", func(t *testing.T) {
resp, err := client.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "peer1-usw1",
PeeringToken: validTokenB64,
})
require.NoError(t, err)
require.NotEmpty(t, resp)
})
testutil.RunStep(t, "attempt to generate token with the same name used as dialer", func(t *testing.T) {
req := pbpeering.GenerateTokenRequest{PeerName: "peer1-usw1"}
resp, err := client.GenerateToken(ctx, &req)
require.Error(t, err)
require.Contains(t, err.Error(),
"cannot create peering with name: \"peer1-usw1\"; there is already an established peering")
require.Nil(t, resp)
})
testutil.RunStep(t, "attempt to establish the with the same name used as acceptor", func(t *testing.T) {
resp, err := client.Establish(ctx, &pbpeering.EstablishRequest{
PeerName: "peerB",
PeeringToken: validTokenB64,
})
require.Error(t, err)
require.Contains(t, err.Error(),
"cannot create peering with name: \"peerB\"; there is an existing peering expecting to be dialed")
require.Nil(t, resp)
})
}
// Test RPC endpoint responses when peering is disabled. They should all return an error.
func TestPeeringService_PeeringDisabled(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this