diff --git a/.changelog/8494.txt b/.changelog/8494.txt new file mode 100644 index 0000000000..ef9fb907bc --- /dev/null +++ b/.changelog/8494.txt @@ -0,0 +1,3 @@ +```release-note:bug +[backport/1.8.x] connect: use stronger validation that ingress gateways have compatible protocols defined for their upstreams +``` diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 1fbaea78be..7e947d749a 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -354,10 +354,6 @@ func (s *Store) validateProposedConfigEntryInGraph( if err != nil { return err } - err = s.validateProposedIngressProtocolsInServiceGraph(tx, next, entMeta) - if err != nil { - return err - } case structs.TerminatingGateway: err := s.checkGatewayClash(tx, name, structs.TerminatingGateway, structs.IngressGateway, entMeta) if err != nil { @@ -402,7 +398,11 @@ func (s *Store) validateProposedConfigEntryInServiceGraph( ) error { // Collect all of the chains that could be affected by this change // including our own. - checkChains := make(map[structs.ServiceID]struct{}) + var ( + checkChains = make(map[structs.ServiceID]struct{}) + checkIngress []*structs.IngressGatewayConfigEntry + enforceIngressProtocolsMatch bool + ) if validateAllChains { // Must be proxy-defaults/global. @@ -419,6 +419,37 @@ func (s *Store) validateProposedConfigEntryInServiceGraph( checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{} } } + + _, entries, err := s.configEntriesByKindTxn(tx, nil, structs.IngressGateway, structs.WildcardEnterpriseMeta()) + if err != nil { + return err + } + for _, entry := range entries { + ingress, ok := entry.(*structs.IngressGatewayConfigEntry) + if !ok { + return fmt.Errorf("type %T is not an ingress gateway config entry", entry) + } + checkIngress = append(checkIngress, ingress) + } + + } else if kind == structs.IngressGateway { + // Checking an ingress pointing to multiple chains. + + // This is the case for deleting a config entry + if next == nil { + return nil + } + + ingress, ok := next.(*structs.IngressGatewayConfigEntry) + if !ok { + return fmt.Errorf("type %T is not an ingress gateway config entry", next) + } + checkIngress = append(checkIngress, ingress) + + // When editing an ingress-gateway directly we are stricter about + // validating the protocol equivalence. + enforceIngressProtocolsMatch = true + } else { // Must be a single chain. @@ -426,38 +457,100 @@ func (s *Store) validateProposedConfigEntryInServiceGraph( checkChains[sid] = struct{}{} iter, err := tx.Get(configTableName, "link", sid) - for raw := iter.Next(); raw != nil; raw = iter.Next() { - entry := raw.(structs.ConfigEntry) - checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{} - } if err != nil { return err } + for raw := iter.Next(); raw != nil; raw = iter.Next() { + entry := raw.(structs.ConfigEntry) + switch entry.GetKind() { + case structs.ServiceRouter, structs.ServiceSplitter, structs.ServiceResolver: + svcID := structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta()) + checkChains[svcID] = struct{}{} + case structs.IngressGateway: + ingress, ok := entry.(*structs.IngressGatewayConfigEntry) + if !ok { + return fmt.Errorf("type %T is not an ingress gateway config entry", entry) + } + checkIngress = append(checkIngress, ingress) + } + } + } + + // Ensure if any ingress is affected that we fetch all of the chains needed + // to fully validate that ingress. + for _, ingress := range checkIngress { + for _, svcID := range ingress.ListRelatedServices() { + checkChains[svcID] = struct{}{} + } } overrides := map[structs.ConfigEntryKindName]structs.ConfigEntry{ {Kind: kind, Name: name}: next, } - for chain, _ := range checkChains { - if err := s.testCompileDiscoveryChain(tx, nil, chain.ID, overrides, &chain.EnterpriseMeta); err != nil { + var ( + svcProtocols = make(map[structs.ServiceID]string) + svcTopNodeType = make(map[structs.ServiceID]string) + ) + for chain := range checkChains { + protocol, topNode, err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta) + if err != nil { return err } + svcProtocols[chain] = protocol + svcTopNodeType[chain] = topNode.Type + } + + // Now validate all of our ingress gateways. + for _, e := range checkIngress { + for _, listener := range e.Listeners { + expectedProto := listener.Protocol + for _, service := range listener.Services { + if service.Name == structs.WildcardSpecifier { + continue + } + svcID := structs.NewServiceID(service.Name, &service.EnterpriseMeta) + + svcProto := svcProtocols[svcID] + + if svcProto != expectedProto { + // The only time an ingress gateway and its upstreams can + // have differing protocols is when: + // + // 1. ingress is tcp and the target is not-tcp + // AND + // 2. the disco chain has a resolver as the top node + topNodeType := svcTopNodeType[svcID] + if enforceIngressProtocolsMatch || + (expectedProto != "tcp") || + (expectedProto == "tcp" && topNodeType != structs.DiscoveryGraphNodeTypeResolver) { + return fmt.Errorf( + "service %q has protocol %q, which does not match defined listener protocol %q", + svcID.String(), + svcProto, + expectedProto, + ) + } + } + } + } } return nil } +// testCompileDiscoveryChain speculatively compiles a discovery chain with +// pending modifications to see if it would be valid. Also returns the computed +// protocol and topmost discovery chain node. func (s *Store) testCompileDiscoveryChain( tx *memdb.Txn, - ws memdb.WatchSet, chainName string, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, entMeta *structs.EnterpriseMeta, -) error { +) (string, *structs.DiscoveryGraphNode, error) { _, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides, entMeta) if err != nil { - return err + return "", nil, err } // Note we use an arbitrary namespace and datacenter as those would not @@ -472,8 +565,12 @@ func (s *Store) testCompileDiscoveryChain( UseInDatacenter: "dc1", Entries: speculativeEntries, } - _, err = discoverychain.Compile(req) - return err + chain, err := discoverychain.Compile(req) + if err != nil { + return "", nil, err + } + + return chain.Protocol, chain.Nodes[chain.StartNode], nil } // ReadDiscoveryChainConfigEntries will query for the full discovery chain for @@ -841,48 +938,6 @@ func (s *Store) configEntryWithOverridesTxn( return s.configEntryTxn(tx, ws, kind, name, entMeta) } -func (s *Store) validateProposedIngressProtocolsInServiceGraph( - tx *memdb.Txn, - next structs.ConfigEntry, - entMeta *structs.EnterpriseMeta, -) error { - // This is the case for deleting a config entry - if next == nil { - return nil - } - ingress, ok := next.(*structs.IngressGatewayConfigEntry) - if !ok { - return fmt.Errorf("type %T is not an ingress gateway config entry", next) - } - - validationFn := func(svc structs.ServiceName, expectedProto string) error { - _, svcProto, err := s.protocolForService(tx, nil, svc) - if err != nil { - return err - } - - if svcProto != expectedProto { - return fmt.Errorf("service %q has protocol %q, which does not match defined listener protocol %q", - svc.String(), svcProto, expectedProto) - } - - return nil - } - - for _, l := range ingress.Listeners { - for _, s := range l.Services { - if s.Name == structs.WildcardSpecifier { - continue - } - err := validationFn(s.ToServiceName(), l.Protocol) - if err != nil { - return err - } - } - } - return nil -} - // protocolForService returns the service graph protocol associated to the // provided service, checking all relevant config entries. func (s *Store) protocolForService( diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index 6747991bb4..dcacf2f1a9 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1285,29 +1285,120 @@ func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) { } func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) { - s := testStateStore(t) - - ingress := &structs.IngressGatewayConfigEntry{ - Kind: structs.IngressGateway, - Name: "gateway", - Listeners: []structs.IngressListener{ - { - Port: 8080, - Protocol: "http", - Services: []structs.IngressService{ - {Name: "web"}, + newIngress := func(protocol, name string) *structs.IngressGatewayConfigEntry { + return &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "gateway", + Listeners: []structs.IngressListener{ + { + Port: 8080, + Protocol: protocol, + Services: []structs.IngressService{ + {Name: name}, + }, }, }, - }, + } } - t.Run("default to tcp", func(t *testing.T) { - err := s.EnsureConfigEntry(0, ingress, nil) + t.Run("http ingress fails with http upstream later changed to tcp", func(t *testing.T) { + s := testStateStore(t) + + // First set the target service as http + expected := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + } + require.NoError(t, s.EnsureConfigEntry(0, expected, nil)) + + // Next configure http ingress to route to the http service + require.NoError(t, s.EnsureConfigEntry(1, newIngress("http", "web"), nil)) + + t.Run("via modification", func(t *testing.T) { + // Now redefine the target service as tcp + expected = &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "tcp", + } + + err := s.EnsureConfigEntry(2, expected, nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "tcp"`) + }) + t.Run("via deletion", func(t *testing.T) { + // This will fall back to the default tcp. + err := s.DeleteConfigEntry(2, structs.ServiceDefaults, "web", nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "tcp"`) + }) + }) + + t.Run("tcp ingress ok with tcp upstream (defaulted) later changed to http", func(t *testing.T) { + s := testStateStore(t) + + // First configure tcp ingress to route to a defaulted tcp service + require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil)) + + // Now redefine the target service as http + expected := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + } + require.NoError(t, s.EnsureConfigEntry(1, expected, nil)) + }) + + t.Run("tcp ingress fails with tcp upstream (defaulted) later changed to http", func(t *testing.T) { + s := testStateStore(t) + + // First configure tcp ingress to route to a defaulted tcp service + require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil)) + + // Now redefine the target service as http + expected := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + } + require.NoError(t, s.EnsureConfigEntry(1, expected, nil)) + + t.Run("and a router defined", func(t *testing.T) { + // This part should fail. + expected2 := &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "web", + } + err := s.EnsureConfigEntry(2, expected2, nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "http"`) + }) + + t.Run("and a splitter defined", func(t *testing.T) { + // This part should fail. + expected2 := &structs.ServiceSplitterConfigEntry{ + Kind: structs.ServiceSplitter, + Name: "web", + Splits: []structs.ServiceSplit{ + {Weight: 100}, + }, + } + err := s.EnsureConfigEntry(2, expected2, nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "http"`) + }) + }) + + t.Run("http ingress fails with tcp upstream (defaulted)", func(t *testing.T) { + s := testStateStore(t) + err := s.EnsureConfigEntry(0, newIngress("http", "web"), nil) require.Error(t, err) require.Contains(t, err.Error(), `has protocol "tcp"`) }) - t.Run("with proxy-default", func(t *testing.T) { + t.Run("http ingress fails with http2 upstream (via proxy-defaults)", func(t *testing.T) { + s := testStateStore(t) expected := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", @@ -1317,51 +1408,43 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) { } require.NoError(t, s.EnsureConfigEntry(0, expected, nil)) - err := s.EnsureConfigEntry(1, ingress, nil) + err := s.EnsureConfigEntry(1, newIngress("http", "web"), nil) require.Error(t, err) require.Contains(t, err.Error(), `has protocol "http2"`) }) - t.Run("with service-defaults override", func(t *testing.T) { + t.Run("http ingress fails with grpc upstream (via service-defaults)", func(t *testing.T) { + s := testStateStore(t) expected := &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: "web", Protocol: "grpc", } require.NoError(t, s.EnsureConfigEntry(1, expected, nil)) - err := s.EnsureConfigEntry(2, ingress, nil) + err := s.EnsureConfigEntry(2, newIngress("http", "web"), nil) require.Error(t, err) require.Contains(t, err.Error(), `has protocol "grpc"`) }) - t.Run("with service-defaults correct protocol", func(t *testing.T) { + t.Run("http ingress ok with http upstream (via service-defaults)", func(t *testing.T) { + s := testStateStore(t) expected := &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: "web", Protocol: "http", } require.NoError(t, s.EnsureConfigEntry(2, expected, nil)) - require.NoError(t, s.EnsureConfigEntry(3, ingress, nil)) + require.NoError(t, s.EnsureConfigEntry(3, newIngress("http", "web"), nil)) }) - t.Run("ignores wildcard specifier", func(t *testing.T) { - ingress := &structs.IngressGatewayConfigEntry{ - Kind: structs.IngressGateway, - Name: "gateway", - Listeners: []structs.IngressListener{ - { - Port: 8080, - Protocol: "http", - Services: []structs.IngressService{ - {Name: "*"}, - }, - }, - }, - } - require.NoError(t, s.EnsureConfigEntry(4, ingress, nil)) + t.Run("http ingress ignores wildcard specifier", func(t *testing.T) { + s := testStateStore(t) + require.NoError(t, s.EnsureConfigEntry(4, newIngress("http", "*"), nil)) }) - t.Run("deleting a config entry", func(t *testing.T) { + t.Run("deleting ingress config entry ok", func(t *testing.T) { + s := testStateStore(t) + require.NoError(t, s.EnsureConfigEntry(1, newIngress("tcp", "web"), nil)) require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil)) }) } diff --git a/agent/structs/config_entry_gateways.go b/agent/structs/config_entry_gateways.go index d246755f40..27a025d527 100644 --- a/agent/structs/config_entry_gateways.go +++ b/agent/structs/config_entry_gateways.go @@ -2,6 +2,7 @@ package structs import ( "fmt" + "sort" "strings" "github.com/hashicorp/consul/acl" @@ -203,6 +204,44 @@ func validateHost(tlsEnabled bool, host string) error { return nil } +// ListRelatedServices implements discoveryChainConfigEntry +// +// For ingress-gateway config entries this only finds services that are +// explicitly linked in the ingress-gateway config entry. Wildcards will not +// expand to all services. +// +// This function is used during discovery chain graph validation to prevent +// erroneous sets of config entries from being created. Wildcard ingress +// filters out sets with protocol mismatch elsewhere so it isn't an issue here +// that needs fixing. +func (e *IngressGatewayConfigEntry) ListRelatedServices() []ServiceID { + found := make(map[ServiceID]struct{}) + + for _, listener := range e.Listeners { + for _, service := range listener.Services { + if service.Name == WildcardSpecifier { + continue + } + svcID := NewServiceID(service.Name, &service.EnterpriseMeta) + found[svcID] = struct{}{} + } + } + + if len(found) == 0 { + return nil + } + + out := make([]ServiceID, 0, len(found)) + for svc := range found { + out = append(out, svc) + } + sort.Slice(out, func(i, j int) bool { + return out[i].EnterpriseMeta.LessThan(&out[j].EnterpriseMeta) || + out[i].ID < out[j].ID + }) + return out +} + func (e *IngressGatewayConfigEntry) CanRead(authz acl.Authorizer) bool { var authzContext acl.AuthorizerContext e.FillAuthzContext(&authzContext) diff --git a/agent/structs/config_entry_gateways_test.go b/agent/structs/config_entry_gateways_test.go index e86d233024..cf341f4b3c 100644 --- a/agent/structs/config_entry_gateways_test.go +++ b/agent/structs/config_entry_gateways_test.go @@ -86,6 +86,88 @@ func TestIngressConfigEntry_Normalize(t *testing.T) { } } +func TestIngressConfigEntry_ListRelatedServices(t *testing.T) { + type testcase struct { + entry IngressGatewayConfigEntry + expectServices []ServiceID + } + + cases := map[string]testcase{ + "one exact": { + entry: IngressGatewayConfigEntry{ + Kind: IngressGateway, + Name: "ingress-web", + Listeners: []IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []IngressService{ + {Name: "web"}, + }, + }, + }, + }, + expectServices: []ServiceID{NewServiceID("web", nil)}, + }, + "one wild": { + entry: IngressGatewayConfigEntry{ + Kind: IngressGateway, + Name: "ingress-web", + Listeners: []IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []IngressService{ + {Name: "*"}, + }, + }, + }, + }, + expectServices: nil, + }, + "kitchen sink": { + entry: IngressGatewayConfigEntry{ + Kind: IngressGateway, + Name: "ingress-web", + Listeners: []IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []IngressService{ + {Name: "api"}, + {Name: "web"}, + }, + }, + { + Port: 2222, + Protocol: "tcp", + Services: []IngressService{ + {Name: "web"}, + {Name: "*"}, + {Name: "db"}, + {Name: "blah"}, + }, + }, + }, + }, + expectServices: []ServiceID{ + NewServiceID("api", nil), + NewServiceID("blah", nil), + NewServiceID("db", nil), + NewServiceID("web", nil), + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + got := tc.entry.ListRelatedServices() + require.Equal(t, tc.expectServices, got) + }) + } +} + func TestIngressConfigEntry_Validate(t *testing.T) { cases := []struct { diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 28a67bcd37..e13193ad6c 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -785,9 +785,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain( } else if cfg.Protocol == "tcp" { startNode := chain.Nodes[chain.StartNode] if startNode == nil { - panic("missing first node in compiled discovery chain for: " + chain.ServiceName) + return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName) } else if startNode.Type != structs.DiscoveryGraphNodeTypeResolver { - panic(fmt.Sprintf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type)) + return nil, fmt.Errorf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type) } targetID := startNode.Resolver.Target target := chain.Targets[targetID] diff --git a/agent/xds/routes.go b/agent/xds/routes.go index ff5891d172..e4b49dbfc9 100644 --- a/agent/xds/routes.go +++ b/agent/xds/routes.go @@ -170,7 +170,7 @@ func makeUpstreamRouteForDiscoveryChain( startNode := chain.Nodes[chain.StartNode] if startNode == nil { - panic("missing first node in compiled discovery chain for: " + chain.ServiceName) + return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName) } switch startNode.Type { @@ -265,7 +265,7 @@ func makeUpstreamRouteForDiscoveryChain( routes = []*envoyroute.Route{defaultRoute} default: - panic("unknown first node in discovery chain of type: " + startNode.Type) + return nil, fmt.Errorf("unknown first node in discovery chain of type: %s", startNode.Type) } host := &envoyroute.VirtualHost{