[backport/1.8.x] connect: use stronger validation that ingress gateways have compatible protocols defined for their upstreams (#8494)

Backport of #8470 to 1.8.x
This commit is contained in:
R.B. Boyer 2020-08-13 15:26:23 -05:00 committed by GitHub
parent 010a8eb515
commit 7983023acf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 360 additions and 98 deletions

3
.changelog/8494.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
[backport/1.8.x] connect: use stronger validation that ingress gateways have compatible protocols defined for their upstreams
```

View File

@ -354,10 +354,6 @@ func (s *Store) validateProposedConfigEntryInGraph(
if err != nil {
return err
}
err = s.validateProposedIngressProtocolsInServiceGraph(tx, next, entMeta)
if err != nil {
return err
}
case structs.TerminatingGateway:
err := s.checkGatewayClash(tx, name, structs.TerminatingGateway, structs.IngressGateway, entMeta)
if err != nil {
@ -402,7 +398,11 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
) error {
// Collect all of the chains that could be affected by this change
// including our own.
checkChains := make(map[structs.ServiceID]struct{})
var (
checkChains = make(map[structs.ServiceID]struct{})
checkIngress []*structs.IngressGatewayConfigEntry
enforceIngressProtocolsMatch bool
)
if validateAllChains {
// Must be proxy-defaults/global.
@ -419,6 +419,37 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{}
}
}
_, entries, err := s.configEntriesByKindTxn(tx, nil, structs.IngressGateway, structs.WildcardEnterpriseMeta())
if err != nil {
return err
}
for _, entry := range entries {
ingress, ok := entry.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", entry)
}
checkIngress = append(checkIngress, ingress)
}
} else if kind == structs.IngressGateway {
// Checking an ingress pointing to multiple chains.
// This is the case for deleting a config entry
if next == nil {
return nil
}
ingress, ok := next.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", next)
}
checkIngress = append(checkIngress, ingress)
// When editing an ingress-gateway directly we are stricter about
// validating the protocol equivalence.
enforceIngressProtocolsMatch = true
} else {
// Must be a single chain.
@ -426,38 +457,100 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
checkChains[sid] = struct{}{}
iter, err := tx.Get(configTableName, "link", sid)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(structs.ConfigEntry)
checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{}
}
if err != nil {
return err
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(structs.ConfigEntry)
switch entry.GetKind() {
case structs.ServiceRouter, structs.ServiceSplitter, structs.ServiceResolver:
svcID := structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())
checkChains[svcID] = struct{}{}
case structs.IngressGateway:
ingress, ok := entry.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", entry)
}
checkIngress = append(checkIngress, ingress)
}
}
}
// Ensure if any ingress is affected that we fetch all of the chains needed
// to fully validate that ingress.
for _, ingress := range checkIngress {
for _, svcID := range ingress.ListRelatedServices() {
checkChains[svcID] = struct{}{}
}
}
overrides := map[structs.ConfigEntryKindName]structs.ConfigEntry{
{Kind: kind, Name: name}: next,
}
for chain, _ := range checkChains {
if err := s.testCompileDiscoveryChain(tx, nil, chain.ID, overrides, &chain.EnterpriseMeta); err != nil {
var (
svcProtocols = make(map[structs.ServiceID]string)
svcTopNodeType = make(map[structs.ServiceID]string)
)
for chain := range checkChains {
protocol, topNode, err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta)
if err != nil {
return err
}
svcProtocols[chain] = protocol
svcTopNodeType[chain] = topNode.Type
}
// Now validate all of our ingress gateways.
for _, e := range checkIngress {
for _, listener := range e.Listeners {
expectedProto := listener.Protocol
for _, service := range listener.Services {
if service.Name == structs.WildcardSpecifier {
continue
}
svcID := structs.NewServiceID(service.Name, &service.EnterpriseMeta)
svcProto := svcProtocols[svcID]
if svcProto != expectedProto {
// The only time an ingress gateway and its upstreams can
// have differing protocols is when:
//
// 1. ingress is tcp and the target is not-tcp
// AND
// 2. the disco chain has a resolver as the top node
topNodeType := svcTopNodeType[svcID]
if enforceIngressProtocolsMatch ||
(expectedProto != "tcp") ||
(expectedProto == "tcp" && topNodeType != structs.DiscoveryGraphNodeTypeResolver) {
return fmt.Errorf(
"service %q has protocol %q, which does not match defined listener protocol %q",
svcID.String(),
svcProto,
expectedProto,
)
}
}
}
}
}
return nil
}
// testCompileDiscoveryChain speculatively compiles a discovery chain with
// pending modifications to see if it would be valid. Also returns the computed
// protocol and topmost discovery chain node.
func (s *Store) testCompileDiscoveryChain(
tx *memdb.Txn,
ws memdb.WatchSet,
chainName string,
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
) error {
) (string, *structs.DiscoveryGraphNode, error) {
_, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides, entMeta)
if err != nil {
return err
return "", nil, err
}
// Note we use an arbitrary namespace and datacenter as those would not
@ -472,8 +565,12 @@ func (s *Store) testCompileDiscoveryChain(
UseInDatacenter: "dc1",
Entries: speculativeEntries,
}
_, err = discoverychain.Compile(req)
return err
chain, err := discoverychain.Compile(req)
if err != nil {
return "", nil, err
}
return chain.Protocol, chain.Nodes[chain.StartNode], nil
}
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for
@ -841,48 +938,6 @@ func (s *Store) configEntryWithOverridesTxn(
return s.configEntryTxn(tx, ws, kind, name, entMeta)
}
func (s *Store) validateProposedIngressProtocolsInServiceGraph(
tx *memdb.Txn,
next structs.ConfigEntry,
entMeta *structs.EnterpriseMeta,
) error {
// This is the case for deleting a config entry
if next == nil {
return nil
}
ingress, ok := next.(*structs.IngressGatewayConfigEntry)
if !ok {
return fmt.Errorf("type %T is not an ingress gateway config entry", next)
}
validationFn := func(svc structs.ServiceName, expectedProto string) error {
_, svcProto, err := s.protocolForService(tx, nil, svc)
if err != nil {
return err
}
if svcProto != expectedProto {
return fmt.Errorf("service %q has protocol %q, which does not match defined listener protocol %q",
svc.String(), svcProto, expectedProto)
}
return nil
}
for _, l := range ingress.Listeners {
for _, s := range l.Services {
if s.Name == structs.WildcardSpecifier {
continue
}
err := validationFn(s.ToServiceName(), l.Protocol)
if err != nil {
return err
}
}
}
return nil
}
// protocolForService returns the service graph protocol associated to the
// provided service, checking all relevant config entries.
func (s *Store) protocolForService(

View File

@ -1285,29 +1285,120 @@ func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) {
}
func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
s := testStateStore(t)
ingress := &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: "http",
Services: []structs.IngressService{
{Name: "web"},
newIngress := func(protocol, name string) *structs.IngressGatewayConfigEntry {
return &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: protocol,
Services: []structs.IngressService{
{Name: name},
},
},
},
},
}
}
t.Run("default to tcp", func(t *testing.T) {
err := s.EnsureConfigEntry(0, ingress, nil)
t.Run("http ingress fails with http upstream later changed to tcp", func(t *testing.T) {
s := testStateStore(t)
// First set the target service as http
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(0, expected, nil))
// Next configure http ingress to route to the http service
require.NoError(t, s.EnsureConfigEntry(1, newIngress("http", "web"), nil))
t.Run("via modification", func(t *testing.T) {
// Now redefine the target service as tcp
expected = &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "tcp",
}
err := s.EnsureConfigEntry(2, expected, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "tcp"`)
})
t.Run("via deletion", func(t *testing.T) {
// This will fall back to the default tcp.
err := s.DeleteConfigEntry(2, structs.ServiceDefaults, "web", nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "tcp"`)
})
})
t.Run("tcp ingress ok with tcp upstream (defaulted) later changed to http", func(t *testing.T) {
s := testStateStore(t)
// First configure tcp ingress to route to a defaulted tcp service
require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil))
// Now redefine the target service as http
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
})
t.Run("tcp ingress fails with tcp upstream (defaulted) later changed to http", func(t *testing.T) {
s := testStateStore(t)
// First configure tcp ingress to route to a defaulted tcp service
require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil))
// Now redefine the target service as http
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
t.Run("and a router defined", func(t *testing.T) {
// This part should fail.
expected2 := &structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "web",
}
err := s.EnsureConfigEntry(2, expected2, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "http"`)
})
t.Run("and a splitter defined", func(t *testing.T) {
// This part should fail.
expected2 := &structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "web",
Splits: []structs.ServiceSplit{
{Weight: 100},
},
}
err := s.EnsureConfigEntry(2, expected2, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "http"`)
})
})
t.Run("http ingress fails with tcp upstream (defaulted)", func(t *testing.T) {
s := testStateStore(t)
err := s.EnsureConfigEntry(0, newIngress("http", "web"), nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "tcp"`)
})
t.Run("with proxy-default", func(t *testing.T) {
t.Run("http ingress fails with http2 upstream (via proxy-defaults)", func(t *testing.T) {
s := testStateStore(t)
expected := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
@ -1317,51 +1408,43 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
}
require.NoError(t, s.EnsureConfigEntry(0, expected, nil))
err := s.EnsureConfigEntry(1, ingress, nil)
err := s.EnsureConfigEntry(1, newIngress("http", "web"), nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "http2"`)
})
t.Run("with service-defaults override", func(t *testing.T) {
t.Run("http ingress fails with grpc upstream (via service-defaults)", func(t *testing.T) {
s := testStateStore(t)
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "grpc",
}
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
err := s.EnsureConfigEntry(2, ingress, nil)
err := s.EnsureConfigEntry(2, newIngress("http", "web"), nil)
require.Error(t, err)
require.Contains(t, err.Error(), `has protocol "grpc"`)
})
t.Run("with service-defaults correct protocol", func(t *testing.T) {
t.Run("http ingress ok with http upstream (via service-defaults)", func(t *testing.T) {
s := testStateStore(t)
expected := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "web",
Protocol: "http",
}
require.NoError(t, s.EnsureConfigEntry(2, expected, nil))
require.NoError(t, s.EnsureConfigEntry(3, ingress, nil))
require.NoError(t, s.EnsureConfigEntry(3, newIngress("http", "web"), nil))
})
t.Run("ignores wildcard specifier", func(t *testing.T) {
ingress := &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "gateway",
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: "http",
Services: []structs.IngressService{
{Name: "*"},
},
},
},
}
require.NoError(t, s.EnsureConfigEntry(4, ingress, nil))
t.Run("http ingress ignores wildcard specifier", func(t *testing.T) {
s := testStateStore(t)
require.NoError(t, s.EnsureConfigEntry(4, newIngress("http", "*"), nil))
})
t.Run("deleting a config entry", func(t *testing.T) {
t.Run("deleting ingress config entry ok", func(t *testing.T) {
s := testStateStore(t)
require.NoError(t, s.EnsureConfigEntry(1, newIngress("tcp", "web"), nil))
require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil))
})
}

View File

@ -2,6 +2,7 @@ package structs
import (
"fmt"
"sort"
"strings"
"github.com/hashicorp/consul/acl"
@ -203,6 +204,44 @@ func validateHost(tlsEnabled bool, host string) error {
return nil
}
// ListRelatedServices implements discoveryChainConfigEntry
//
// For ingress-gateway config entries this only finds services that are
// explicitly linked in the ingress-gateway config entry. Wildcards will not
// expand to all services.
//
// This function is used during discovery chain graph validation to prevent
// erroneous sets of config entries from being created. Wildcard ingress
// filters out sets with protocol mismatch elsewhere so it isn't an issue here
// that needs fixing.
func (e *IngressGatewayConfigEntry) ListRelatedServices() []ServiceID {
found := make(map[ServiceID]struct{})
for _, listener := range e.Listeners {
for _, service := range listener.Services {
if service.Name == WildcardSpecifier {
continue
}
svcID := NewServiceID(service.Name, &service.EnterpriseMeta)
found[svcID] = struct{}{}
}
}
if len(found) == 0 {
return nil
}
out := make([]ServiceID, 0, len(found))
for svc := range found {
out = append(out, svc)
}
sort.Slice(out, func(i, j int) bool {
return out[i].EnterpriseMeta.LessThan(&out[j].EnterpriseMeta) ||
out[i].ID < out[j].ID
})
return out
}
func (e *IngressGatewayConfigEntry) CanRead(authz acl.Authorizer) bool {
var authzContext acl.AuthorizerContext
e.FillAuthzContext(&authzContext)

View File

@ -86,6 +86,88 @@ func TestIngressConfigEntry_Normalize(t *testing.T) {
}
}
func TestIngressConfigEntry_ListRelatedServices(t *testing.T) {
type testcase struct {
entry IngressGatewayConfigEntry
expectServices []ServiceID
}
cases := map[string]testcase{
"one exact": {
entry: IngressGatewayConfigEntry{
Kind: IngressGateway,
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{
{Name: "web"},
},
},
},
},
expectServices: []ServiceID{NewServiceID("web", nil)},
},
"one wild": {
entry: IngressGatewayConfigEntry{
Kind: IngressGateway,
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{
{Name: "*"},
},
},
},
},
expectServices: nil,
},
"kitchen sink": {
entry: IngressGatewayConfigEntry{
Kind: IngressGateway,
Name: "ingress-web",
Listeners: []IngressListener{
{
Port: 1111,
Protocol: "tcp",
Services: []IngressService{
{Name: "api"},
{Name: "web"},
},
},
{
Port: 2222,
Protocol: "tcp",
Services: []IngressService{
{Name: "web"},
{Name: "*"},
{Name: "db"},
{Name: "blah"},
},
},
},
},
expectServices: []ServiceID{
NewServiceID("api", nil),
NewServiceID("blah", nil),
NewServiceID("db", nil),
NewServiceID("web", nil),
},
},
}
for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
got := tc.entry.ListRelatedServices()
require.Equal(t, tc.expectServices, got)
})
}
}
func TestIngressConfigEntry_Validate(t *testing.T) {
cases := []struct {

View File

@ -785,9 +785,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
} else if cfg.Protocol == "tcp" {
startNode := chain.Nodes[chain.StartNode]
if startNode == nil {
panic("missing first node in compiled discovery chain for: " + chain.ServiceName)
return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName)
} else if startNode.Type != structs.DiscoveryGraphNodeTypeResolver {
panic(fmt.Sprintf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type))
return nil, fmt.Errorf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type)
}
targetID := startNode.Resolver.Target
target := chain.Targets[targetID]

View File

@ -170,7 +170,7 @@ func makeUpstreamRouteForDiscoveryChain(
startNode := chain.Nodes[chain.StartNode]
if startNode == nil {
panic("missing first node in compiled discovery chain for: " + chain.ServiceName)
return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName)
}
switch startNode.Type {
@ -265,7 +265,7 @@ func makeUpstreamRouteForDiscoveryChain(
routes = []*envoyroute.Route{defaultRoute}
default:
panic("unknown first node in discovery chain of type: " + startNode.Type)
return nil, fmt.Errorf("unknown first node in discovery chain of type: %s", startNode.Type)
}
host := &envoyroute.VirtualHost{