From e3cd4a853978848d7e22dad5ad69c53300218e5e Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Wed, 12 Aug 2020 11:19:20 -0500 Subject: [PATCH] connect: use stronger validation that ingress gateways have compatible protocols defined for their upstreams (#8470) Fixes #8466 Since Consul 1.8.0 there was a bug in how ingress gateway protocol compatibility was enforced. At the point in time that an ingress-gateway config entry was modified the discovery chain for each upstream was checked to ensure the ingress gateway protocol matched. Unfortunately future modifications of other config entries were not validated against existing ingress-gateway definitions, such as: 1. create tcp ingress-gateway pointing to 'api' (ok) 2. create service-defaults for 'api' setting protocol=http (worked, but not ok) 3. create service-splitter or service-router for 'api' (worked, but caused an agent panic) If you were to do these in a different order, it would fail without a crash: 1. create service-defaults for 'api' setting protocol=http (ok) 2. create service-splitter or service-router for 'api' (ok) 3. create tcp ingress-gateway pointing to 'api' (fail with message about protocol mismatch) This PR introduces the missing validation. The two new behaviors are: 1. create tcp ingress-gateway pointing to 'api' (ok) 2. (NEW) create service-defaults for 'api' setting protocol=http ("ok" for back compat) 3. (NEW) create service-splitter or service-router for 'api' (fail with message about protocol mismatch) In consideration for any existing users that may be inadvertently be falling into item (2) above, that is now officiall a valid configuration to be in. For anyone falling into item (3) above while you cannot use the API to manufacture that scenario anymore, anyone that has old (now bad) data will still be able to have the agent use them just enough to generate a new agent/proxycfg error message rather than a panic. Unfortunately we just don't have enough information to properly fix the config entries. --- agent/consul/state/config_entry.go | 162 +++++++++++++------- agent/consul/state/config_entry_test.go | 155 ++++++++++++++----- agent/structs/config_entry_gateways.go | 39 +++++ agent/structs/config_entry_gateways_test.go | 82 ++++++++++ agent/xds/listeners.go | 4 +- agent/xds/routes.go | 4 +- 6 files changed, 353 insertions(+), 93 deletions(-) diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 2fd3133227..2f3b9ff0a3 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -337,10 +337,6 @@ func (s *Store) validateProposedConfigEntryInGraph( if err != nil { return err } - err = validateProposedIngressProtocolsInServiceGraph(tx, next, entMeta) - if err != nil { - return err - } case structs.TerminatingGateway: err := checkGatewayClash(tx, name, structs.TerminatingGateway, structs.IngressGateway, entMeta) if err != nil { @@ -384,7 +380,11 @@ func (s *Store) validateProposedConfigEntryInServiceGraph( ) error { // Collect all of the chains that could be affected by this change // including our own. - checkChains := make(map[structs.ServiceID]struct{}) + var ( + checkChains = make(map[structs.ServiceID]struct{}) + checkIngress []*structs.IngressGatewayConfigEntry + enforceIngressProtocolsMatch bool + ) if validateAllChains { // Must be proxy-defaults/global. @@ -401,6 +401,37 @@ func (s *Store) validateProposedConfigEntryInServiceGraph( checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{} } } + + _, entries, err := configEntriesByKindTxn(tx, nil, structs.IngressGateway, structs.WildcardEnterpriseMeta()) + if err != nil { + return err + } + for _, entry := range entries { + ingress, ok := entry.(*structs.IngressGatewayConfigEntry) + if !ok { + return fmt.Errorf("type %T is not an ingress gateway config entry", entry) + } + checkIngress = append(checkIngress, ingress) + } + + } else if kind == structs.IngressGateway { + // Checking an ingress pointing to multiple chains. + + // This is the case for deleting a config entry + if next == nil { + return nil + } + + ingress, ok := next.(*structs.IngressGatewayConfigEntry) + if !ok { + return fmt.Errorf("type %T is not an ingress gateway config entry", next) + } + checkIngress = append(checkIngress, ingress) + + // When editing an ingress-gateway directly we are stricter about + // validating the protocol equivalence. + enforceIngressProtocolsMatch = true + } else { // Must be a single chain. @@ -413,7 +444,25 @@ func (s *Store) validateProposedConfigEntryInServiceGraph( } for raw := iter.Next(); raw != nil; raw = iter.Next() { entry := raw.(structs.ConfigEntry) - checkChains[structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta())] = struct{}{} + switch entry.GetKind() { + case structs.ServiceRouter, structs.ServiceSplitter, structs.ServiceResolver: + svcID := structs.NewServiceID(entry.GetName(), entry.GetEnterpriseMeta()) + checkChains[svcID] = struct{}{} + case structs.IngressGateway: + ingress, ok := entry.(*structs.IngressGatewayConfigEntry) + if !ok { + return fmt.Errorf("type %T is not an ingress gateway config entry", entry) + } + checkIngress = append(checkIngress, ingress) + } + } + } + + // Ensure if any ingress is affected that we fetch all of the chains needed + // to fully validate that ingress. + for _, ingress := range checkIngress { + for _, svcID := range ingress.ListRelatedServices() { + checkChains[svcID] = struct{}{} } } @@ -421,24 +470,69 @@ func (s *Store) validateProposedConfigEntryInServiceGraph( {Kind: kind, Name: name}: next, } + var ( + svcProtocols = make(map[structs.ServiceID]string) + svcTopNodeType = make(map[structs.ServiceID]string) + ) for chain := range checkChains { - if err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta); err != nil { + protocol, topNode, err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta) + if err != nil { return err } + svcProtocols[chain] = protocol + svcTopNodeType[chain] = topNode.Type + } + + // Now validate all of our ingress gateways. + for _, e := range checkIngress { + for _, listener := range e.Listeners { + expectedProto := listener.Protocol + for _, service := range listener.Services { + if service.Name == structs.WildcardSpecifier { + continue + } + svcID := structs.NewServiceID(service.Name, &service.EnterpriseMeta) + + svcProto := svcProtocols[svcID] + + if svcProto != expectedProto { + // The only time an ingress gateway and its upstreams can + // have differing protocols is when: + // + // 1. ingress is tcp and the target is not-tcp + // AND + // 2. the disco chain has a resolver as the top node + topNodeType := svcTopNodeType[svcID] + if enforceIngressProtocolsMatch || + (expectedProto != "tcp") || + (expectedProto == "tcp" && topNodeType != structs.DiscoveryGraphNodeTypeResolver) { + return fmt.Errorf( + "service %q has protocol %q, which does not match defined listener protocol %q", + svcID.String(), + svcProto, + expectedProto, + ) + } + } + } + } } return nil } +// testCompileDiscoveryChain speculatively compiles a discovery chain with +// pending modifications to see if it would be valid. Also returns the computed +// protocol and topmost discovery chain node. func (s *Store) testCompileDiscoveryChain( tx *txn, chainName string, overrides map[structs.ConfigEntryKindName]structs.ConfigEntry, entMeta *structs.EnterpriseMeta, -) error { +) (string, *structs.DiscoveryGraphNode, error) { _, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides, entMeta) if err != nil { - return err + return "", nil, err } // Note we use an arbitrary namespace and datacenter as those would not @@ -453,8 +547,12 @@ func (s *Store) testCompileDiscoveryChain( UseInDatacenter: "dc1", Entries: speculativeEntries, } - _, err = discoverychain.Compile(req) - return err + chain, err := discoverychain.Compile(req) + if err != nil { + return "", nil, err + } + + return chain.Protocol, chain.Nodes[chain.StartNode], nil } // ReadDiscoveryChainConfigEntries will query for the full discovery chain for @@ -822,48 +920,6 @@ func configEntryWithOverridesTxn( return configEntryTxn(tx, ws, kind, name, entMeta) } -func validateProposedIngressProtocolsInServiceGraph( - tx *txn, - next structs.ConfigEntry, - entMeta *structs.EnterpriseMeta, -) error { - // This is the case for deleting a config entry - if next == nil { - return nil - } - ingress, ok := next.(*structs.IngressGatewayConfigEntry) - if !ok { - return fmt.Errorf("type %T is not an ingress gateway config entry", next) - } - - validationFn := func(svc structs.ServiceName, expectedProto string) error { - _, svcProto, err := protocolForService(tx, nil, svc) - if err != nil { - return err - } - - if svcProto != expectedProto { - return fmt.Errorf("service %q has protocol %q, which does not match defined listener protocol %q", - svc.String(), svcProto, expectedProto) - } - - return nil - } - - for _, l := range ingress.Listeners { - for _, s := range l.Services { - if s.Name == structs.WildcardSpecifier { - continue - } - err := validationFn(s.ToServiceName(), l.Protocol) - if err != nil { - return err - } - } - } - return nil -} - // protocolForService returns the service graph protocol associated to the // provided service, checking all relevant config entries. func protocolForService( diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index 38f7cc28de..4cee1a6c40 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1285,29 +1285,120 @@ func TestStore_ValidateGatewayNamesCannotBeShared(t *testing.T) { } func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) { - s := testStateStore(t) - - ingress := &structs.IngressGatewayConfigEntry{ - Kind: structs.IngressGateway, - Name: "gateway", - Listeners: []structs.IngressListener{ - { - Port: 8080, - Protocol: "http", - Services: []structs.IngressService{ - {Name: "web"}, + newIngress := func(protocol, name string) *structs.IngressGatewayConfigEntry { + return &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "gateway", + Listeners: []structs.IngressListener{ + { + Port: 8080, + Protocol: protocol, + Services: []structs.IngressService{ + {Name: name}, + }, }, }, - }, + } } - t.Run("default to tcp", func(t *testing.T) { - err := s.EnsureConfigEntry(0, ingress, nil) + t.Run("http ingress fails with http upstream later changed to tcp", func(t *testing.T) { + s := testStateStore(t) + + // First set the target service as http + expected := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + } + require.NoError(t, s.EnsureConfigEntry(0, expected, nil)) + + // Next configure http ingress to route to the http service + require.NoError(t, s.EnsureConfigEntry(1, newIngress("http", "web"), nil)) + + t.Run("via modification", func(t *testing.T) { + // Now redefine the target service as tcp + expected = &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "tcp", + } + + err := s.EnsureConfigEntry(2, expected, nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "tcp"`) + }) + t.Run("via deletion", func(t *testing.T) { + // This will fall back to the default tcp. + err := s.DeleteConfigEntry(2, structs.ServiceDefaults, "web", nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "tcp"`) + }) + }) + + t.Run("tcp ingress ok with tcp upstream (defaulted) later changed to http", func(t *testing.T) { + s := testStateStore(t) + + // First configure tcp ingress to route to a defaulted tcp service + require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil)) + + // Now redefine the target service as http + expected := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + } + require.NoError(t, s.EnsureConfigEntry(1, expected, nil)) + }) + + t.Run("tcp ingress fails with tcp upstream (defaulted) later changed to http", func(t *testing.T) { + s := testStateStore(t) + + // First configure tcp ingress to route to a defaulted tcp service + require.NoError(t, s.EnsureConfigEntry(0, newIngress("tcp", "web"), nil)) + + // Now redefine the target service as http + expected := &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "web", + Protocol: "http", + } + require.NoError(t, s.EnsureConfigEntry(1, expected, nil)) + + t.Run("and a router defined", func(t *testing.T) { + // This part should fail. + expected2 := &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "web", + } + err := s.EnsureConfigEntry(2, expected2, nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "http"`) + }) + + t.Run("and a splitter defined", func(t *testing.T) { + // This part should fail. + expected2 := &structs.ServiceSplitterConfigEntry{ + Kind: structs.ServiceSplitter, + Name: "web", + Splits: []structs.ServiceSplit{ + {Weight: 100}, + }, + } + err := s.EnsureConfigEntry(2, expected2, nil) + require.Error(t, err) + require.Contains(t, err.Error(), `has protocol "http"`) + }) + }) + + t.Run("http ingress fails with tcp upstream (defaulted)", func(t *testing.T) { + s := testStateStore(t) + err := s.EnsureConfigEntry(0, newIngress("http", "web"), nil) require.Error(t, err) require.Contains(t, err.Error(), `has protocol "tcp"`) }) - t.Run("with proxy-default", func(t *testing.T) { + t.Run("http ingress fails with http2 upstream (via proxy-defaults)", func(t *testing.T) { + s := testStateStore(t) expected := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: "global", @@ -1317,51 +1408,43 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) { } require.NoError(t, s.EnsureConfigEntry(0, expected, nil)) - err := s.EnsureConfigEntry(1, ingress, nil) + err := s.EnsureConfigEntry(1, newIngress("http", "web"), nil) require.Error(t, err) require.Contains(t, err.Error(), `has protocol "http2"`) }) - t.Run("with service-defaults override", func(t *testing.T) { + t.Run("http ingress fails with grpc upstream (via service-defaults)", func(t *testing.T) { + s := testStateStore(t) expected := &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: "web", Protocol: "grpc", } require.NoError(t, s.EnsureConfigEntry(1, expected, nil)) - err := s.EnsureConfigEntry(2, ingress, nil) + err := s.EnsureConfigEntry(2, newIngress("http", "web"), nil) require.Error(t, err) require.Contains(t, err.Error(), `has protocol "grpc"`) }) - t.Run("with service-defaults correct protocol", func(t *testing.T) { + t.Run("http ingress ok with http upstream (via service-defaults)", func(t *testing.T) { + s := testStateStore(t) expected := &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: "web", Protocol: "http", } require.NoError(t, s.EnsureConfigEntry(2, expected, nil)) - require.NoError(t, s.EnsureConfigEntry(3, ingress, nil)) + require.NoError(t, s.EnsureConfigEntry(3, newIngress("http", "web"), nil)) }) - t.Run("ignores wildcard specifier", func(t *testing.T) { - ingress := &structs.IngressGatewayConfigEntry{ - Kind: structs.IngressGateway, - Name: "gateway", - Listeners: []structs.IngressListener{ - { - Port: 8080, - Protocol: "http", - Services: []structs.IngressService{ - {Name: "*"}, - }, - }, - }, - } - require.NoError(t, s.EnsureConfigEntry(4, ingress, nil)) + t.Run("http ingress ignores wildcard specifier", func(t *testing.T) { + s := testStateStore(t) + require.NoError(t, s.EnsureConfigEntry(4, newIngress("http", "*"), nil)) }) - t.Run("deleting a config entry", func(t *testing.T) { + t.Run("deleting ingress config entry ok", func(t *testing.T) { + s := testStateStore(t) + require.NoError(t, s.EnsureConfigEntry(1, newIngress("tcp", "web"), nil)) require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil)) }) } diff --git a/agent/structs/config_entry_gateways.go b/agent/structs/config_entry_gateways.go index d246755f40..27a025d527 100644 --- a/agent/structs/config_entry_gateways.go +++ b/agent/structs/config_entry_gateways.go @@ -2,6 +2,7 @@ package structs import ( "fmt" + "sort" "strings" "github.com/hashicorp/consul/acl" @@ -203,6 +204,44 @@ func validateHost(tlsEnabled bool, host string) error { return nil } +// ListRelatedServices implements discoveryChainConfigEntry +// +// For ingress-gateway config entries this only finds services that are +// explicitly linked in the ingress-gateway config entry. Wildcards will not +// expand to all services. +// +// This function is used during discovery chain graph validation to prevent +// erroneous sets of config entries from being created. Wildcard ingress +// filters out sets with protocol mismatch elsewhere so it isn't an issue here +// that needs fixing. +func (e *IngressGatewayConfigEntry) ListRelatedServices() []ServiceID { + found := make(map[ServiceID]struct{}) + + for _, listener := range e.Listeners { + for _, service := range listener.Services { + if service.Name == WildcardSpecifier { + continue + } + svcID := NewServiceID(service.Name, &service.EnterpriseMeta) + found[svcID] = struct{}{} + } + } + + if len(found) == 0 { + return nil + } + + out := make([]ServiceID, 0, len(found)) + for svc := range found { + out = append(out, svc) + } + sort.Slice(out, func(i, j int) bool { + return out[i].EnterpriseMeta.LessThan(&out[j].EnterpriseMeta) || + out[i].ID < out[j].ID + }) + return out +} + func (e *IngressGatewayConfigEntry) CanRead(authz acl.Authorizer) bool { var authzContext acl.AuthorizerContext e.FillAuthzContext(&authzContext) diff --git a/agent/structs/config_entry_gateways_test.go b/agent/structs/config_entry_gateways_test.go index e86d233024..cf341f4b3c 100644 --- a/agent/structs/config_entry_gateways_test.go +++ b/agent/structs/config_entry_gateways_test.go @@ -86,6 +86,88 @@ func TestIngressConfigEntry_Normalize(t *testing.T) { } } +func TestIngressConfigEntry_ListRelatedServices(t *testing.T) { + type testcase struct { + entry IngressGatewayConfigEntry + expectServices []ServiceID + } + + cases := map[string]testcase{ + "one exact": { + entry: IngressGatewayConfigEntry{ + Kind: IngressGateway, + Name: "ingress-web", + Listeners: []IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []IngressService{ + {Name: "web"}, + }, + }, + }, + }, + expectServices: []ServiceID{NewServiceID("web", nil)}, + }, + "one wild": { + entry: IngressGatewayConfigEntry{ + Kind: IngressGateway, + Name: "ingress-web", + Listeners: []IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []IngressService{ + {Name: "*"}, + }, + }, + }, + }, + expectServices: nil, + }, + "kitchen sink": { + entry: IngressGatewayConfigEntry{ + Kind: IngressGateway, + Name: "ingress-web", + Listeners: []IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []IngressService{ + {Name: "api"}, + {Name: "web"}, + }, + }, + { + Port: 2222, + Protocol: "tcp", + Services: []IngressService{ + {Name: "web"}, + {Name: "*"}, + {Name: "db"}, + {Name: "blah"}, + }, + }, + }, + }, + expectServices: []ServiceID{ + NewServiceID("api", nil), + NewServiceID("blah", nil), + NewServiceID("db", nil), + NewServiceID("web", nil), + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + got := tc.entry.ListRelatedServices() + require.Equal(t, tc.expectServices, got) + }) + } +} + func TestIngressConfigEntry_Validate(t *testing.T) { cases := []struct { diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index d6c46582bb..1a834db775 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -780,9 +780,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain( } else if cfg.Protocol == "tcp" { startNode := chain.Nodes[chain.StartNode] if startNode == nil { - panic("missing first node in compiled discovery chain for: " + chain.ServiceName) + return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName) } else if startNode.Type != structs.DiscoveryGraphNodeTypeResolver { - panic(fmt.Sprintf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type)) + return nil, fmt.Errorf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type) } targetID := startNode.Resolver.Target target := chain.Targets[targetID] diff --git a/agent/xds/routes.go b/agent/xds/routes.go index e16620ae8f..9d8dbc2b00 100644 --- a/agent/xds/routes.go +++ b/agent/xds/routes.go @@ -170,7 +170,7 @@ func makeUpstreamRouteForDiscoveryChain( startNode := chain.Nodes[chain.StartNode] if startNode == nil { - panic("missing first node in compiled discovery chain for: " + chain.ServiceName) + return nil, fmt.Errorf("missing first node in compiled discovery chain for: %s", chain.ServiceName) } switch startNode.Type { @@ -265,7 +265,7 @@ func makeUpstreamRouteForDiscoveryChain( routes = []*envoyroute.Route{defaultRoute} default: - panic("unknown first node in discovery chain of type: " + startNode.Type) + return nil, fmt.Errorf("unknown first node in discovery chain of type: %s", startNode.Type) } host := &envoyroute.VirtualHost{