diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index d98f2ce71a..48fd18505f 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -173,7 +173,7 @@ func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply * defaultAllow = authz.IntentionDefaultAllow(nil) } - index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, defaultAllow, &args.EnterpriseMeta) + index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, args.ServiceKind, defaultAllow, &args.EnterpriseMeta) if err != nil { return err } diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 15d57e8bdc..2fac2eb28e 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -1639,6 +1639,7 @@ func TestInternal_ServiceTopology(t *testing.T) { var out structs.IndexedServiceTopology require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) require.False(r, out.FilteredByACLs) + require.Equal(r, "http", out.ServiceTopology.MetricsProtocol) // bar/web, bar/web-proxy, baz/web, baz/web-proxy require.Len(r, out.ServiceTopology.Upstreams, 4) @@ -1664,6 +1665,7 @@ func TestInternal_ServiceTopology(t *testing.T) { var out structs.IndexedServiceTopology require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) require.False(r, out.FilteredByACLs) + require.Equal(r, "http", out.ServiceTopology.MetricsProtocol) // foo/api, foo/api-proxy require.Len(r, out.ServiceTopology.Downstreams, 2) @@ -1699,6 +1701,7 @@ func TestInternal_ServiceTopology(t *testing.T) { var out structs.IndexedServiceTopology require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) require.False(r, out.FilteredByACLs) + require.Equal(r, "http", out.ServiceTopology.MetricsProtocol) require.Len(r, out.ServiceTopology.Upstreams, 0) @@ -1756,6 +1759,7 @@ service "web" { policy = "read" } require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) require.True(t, out.FilteredByACLs) + require.Equal(t, "http", out.ServiceTopology.MetricsProtocol) // The web-proxy upstream gets filtered out from both bar and baz require.Len(t, out.ServiceTopology.Upstreams, 2) @@ -1775,6 +1779,7 @@ service "web" { policy = "read" } require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) require.True(t, out.FilteredByACLs) + require.Equal(t, "http", out.ServiceTopology.MetricsProtocol) // The redis upstream gets filtered out but the api and proxy downstream are returned require.Len(t, out.ServiceTopology.Upstreams, 0) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 7e0fafb736..8230f100db 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2907,6 +2907,36 @@ func serviceGatewayNodes(tx *txn, ws memdb.WatchSet, service string, kind struct return maxIdx, ret, nil } +// metricsProtocolForIngressGateway determines the protocol that should be used when fetching metrics for an ingress gateway +// Since ingress gateways may have listeners with different protocols, favor capturing all traffic by only returning HTTP +// when all listeners are HTTP-like. +func metricsProtocolForIngressGateway(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, string, error) { + idx, conf, err := configEntryTxn(tx, ws, structs.IngressGateway, sn.Name, &sn.EnterpriseMeta) + if err != nil { + return 0, "", fmt.Errorf("failed to get ingress-gateway config entry for %q: %v", sn.String(), err) + } + if conf == nil { + return 0, "", nil + } + entry, ok := conf.(*structs.IngressGatewayConfigEntry) + if !ok { + return 0, "", fmt.Errorf("unexpected config entry type: %T", conf) + } + counts := make(map[string]int) + for _, l := range entry.Listeners { + if structs.IsProtocolHTTPLike(l.Protocol) { + counts["http"] += 1 + } else { + counts["tcp"] += 1 + } + } + protocol := "tcp" + if counts["tcp"] == 0 && counts["http"] > 0 { + protocol = "http" + } + return idx, protocol, nil +} + // checkProtocolMatch filters out any GatewayService entries added from a wildcard with a protocol // that doesn't match the one configured in their discovery chain. func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayService) (uint64, bool, error) { @@ -2925,6 +2955,7 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi func (s *Store) ServiceTopology( ws memdb.WatchSet, dc, service string, + kind structs.ServiceKind, defaultAllow acl.EnforcementDecision, entMeta *structs.EnterpriseMeta, ) (uint64, *structs.ServiceTopology, error) { @@ -2932,10 +2963,30 @@ func (s *Store) ServiceTopology( defer tx.Abort() var ( - maxIdx uint64 - sn = structs.NewServiceName(service, entMeta) + maxIdx uint64 + protocol string + err error + + sn = structs.NewServiceName(service, entMeta) ) + switch kind { + case structs.ServiceKindIngressGateway: + maxIdx, protocol, err = metricsProtocolForIngressGateway(tx, ws, sn) + if err != nil { + return 0, nil, fmt.Errorf("failed to fetch protocol for service %s: %v", sn.String(), err) + } + + case structs.ServiceKindTypical: + maxIdx, protocol, err = protocolForService(tx, ws, sn) + if err != nil { + return 0, nil, fmt.Errorf("failed to fetch protocol for service %s: %v", sn.String(), err) + } + + default: + return 0, nil, fmt.Errorf("unsupported kind %q", kind) + } + idx, upstreamNames, err := upstreamsFromRegistrationTxn(tx, ws, sn) if err != nil { return 0, nil, err @@ -2998,6 +3049,7 @@ func (s *Store) ServiceTopology( } resp := &structs.ServiceTopology{ + MetricsProtocol: protocol, Upstreams: upstreams, Downstreams: downstreams, UpstreamDecisions: upstreamDecisions, diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 6ef5070ab4..352a3f7ee3 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -7182,3 +7182,182 @@ func TestCatalog_DownstreamsForService_Updates(t *testing.T) { require.Equal(t, uint64(6), idx) require.ElementsMatch(t, expect, names) } + +func TestProtocolForIngressGateway(t *testing.T) { + tt := []struct { + name string + idx uint64 + entries []structs.ConfigEntry + expect string + }{ + { + name: "all http like", + idx: uint64(5), + entries: []structs.ConfigEntry{ + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "h1-svc", + Protocol: "http", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "h2-svc", + Protocol: "http2", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "g-svc", + Protocol: "grpc", + }, + &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "h1-svc", + }, + }, + }, + { + Port: 2222, + Protocol: "http2", + Services: []structs.IngressService{ + { + Name: "h2-svc", + }, + }, + }, + { + Port: 3333, + Protocol: "grpc", + Services: []structs.IngressService{ + { + Name: "g-svc", + }, + }, + }, + }, + }, + }, + expect: "http", + }, + { + name: "all tcp", + idx: uint64(6), + entries: []structs.ConfigEntry{ + &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "zip", + }, + }, + }, + { + Port: 2222, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "zop", + }, + }, + }, + { + Port: 3333, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "zap", + }, + }, + }, + }, + }, + }, + expect: "tcp", + }, + { + name: "mix of both", + idx: uint64(7), + entries: []structs.ConfigEntry{ + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "h1-svc", + Protocol: "http", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "g-svc", + Protocol: "grpc", + }, + &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 1111, + Protocol: "http", + Services: []structs.IngressService{ + { + Name: "h1-svc", + }, + }, + }, + { + Port: 2222, + Protocol: "tcp", + Services: []structs.IngressService{ + { + Name: "zop", + }, + }, + }, + { + Port: 3333, + Protocol: "grpc", + Services: []structs.IngressService{ + { + Name: "g-svc", + }, + }, + }, + }, + }, + }, + expect: "tcp", + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s := testStateStore(t) + + for _, entry := range tc.entries { + require.NoError(t, entry.Normalize()) + require.NoError(t, entry.Validate()) + + require.NoError(t, s.EnsureConfigEntry(tc.idx, entry, structs.DefaultEnterpriseMeta())) + } + + tx := s.db.ReadTxn() + defer tx.Abort() + + ws := memdb.NewWatchSet() + sn := structs.NewServiceName("ingress", structs.DefaultEnterpriseMeta()) + + idx, protocol, err := metricsProtocolForIngressGateway(tx, ws, sn) + require.NoError(t, err) + require.Equal(t, tc.idx, idx) + require.Equal(t, tc.expect, protocol) + }) + } +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 0a5d7107a6..4797bc4a8a 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -519,6 +519,7 @@ type ServiceSpecificRequest struct { Datacenter string NodeMetaFilters map[string]string ServiceName string + ServiceKind ServiceKind // DEPRECATED (singular-service-tag) - remove this when backwards RPC compat // with 1.2.x is not required. ServiceTag string @@ -1893,6 +1894,9 @@ type ServiceTopology struct { UpstreamDecisions map[string]IntentionDecisionSummary DownstreamDecisions map[string]IntentionDecisionSummary + + // MetricsProtocol is the protocol of the service being queried + MetricsProtocol string } // IndexedConfigEntries has its own encoding logic which differs from diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index 2625d1f45c..0b2e820b62 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -64,6 +64,7 @@ type ServiceTopologySummary struct { } type ServiceTopology struct { + Protocol string Upstreams []*ServiceTopologySummary Downstreams []*ServiceTopologySummary FilteredByACLs bool @@ -281,6 +282,23 @@ func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Req return nil, nil } + kind, ok := req.URL.Query()["kind"] + if !ok { + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprint(resp, "Missing service kind") + return nil, nil + } + args.ServiceKind = structs.ServiceKind(kind[0]) + + switch args.ServiceKind { + case structs.ServiceKindTypical, structs.ServiceKindIngressGateway: + // allowed + default: + resp.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(resp, "Unsupported service kind %q", args.ServiceKind) + return nil, nil + } + // Make the RPC request var out structs.IndexedServiceTopology defer setMeta(resp, &out.QueryMeta) @@ -324,6 +342,7 @@ RPC: } topo := ServiceTopology{ + Protocol: out.ServiceTopology.MetricsProtocol, Upstreams: upstreamResp, Downstreams: downstreamResp, FilteredByACLs: out.FilteredByACLs, diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index 01705a23bc..9cdbe02f8c 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -12,12 +12,11 @@ import ( "sync/atomic" "testing" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" cleanhttp "github.com/hashicorp/go-cleanhttp" "github.com/stretchr/testify/assert" @@ -940,7 +939,7 @@ func TestUIServiceTopology(t *testing.T) { a := NewTestAgent(t, "") defer a.Shutdown() - // Register api -> web -> redis + // Register ingress -> api -> web -> redis { registrations := map[string]*structs.RegisterRequest{ "Node edge": { @@ -1247,6 +1246,14 @@ func TestUIServiceTopology(t *testing.T) { }, }, }, + { + Datacenter: "dc1", + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "api", + Protocol: "tcp", + }, + }, { Datacenter: "dc1", Entry: &structs.ServiceIntentionsConfigEntry{ @@ -1302,7 +1309,7 @@ func TestUIServiceTopology(t *testing.T) { Listeners: []structs.IngressListener{ { Port: 1111, - Protocol: "http", + Protocol: "tcp", Services: []structs.IngressService{ { Name: "api", @@ -1320,16 +1327,35 @@ func TestUIServiceTopology(t *testing.T) { } } + t.Run("request without kind", func(t *testing.T) { + req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.UIServiceTopology(resp, req) + require.Nil(t, err) + require.Nil(t, obj) + require.Equal(t, "Missing service kind", resp.Body.String()) + }) + + t.Run("request with unsupported kind", func(t *testing.T) { + req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress?kind=not-a-kind", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.UIServiceTopology(resp, req) + require.Nil(t, err) + require.Nil(t, obj) + require.Equal(t, `Unsupported service kind "not-a-kind"`, resp.Body.String()) + }) + t.Run("ingress", func(t *testing.T) { retry.Run(t, func(r *retry.R) { // Request topology for ingress - req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress", nil) + req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress?kind=ingress-gateway", nil) resp := httptest.NewRecorder() obj, err := a.srv.UIServiceTopology(resp, req) assert.Nil(r, err) require.NoError(r, checkIndex(resp)) expect := ServiceTopology{ + Protocol: "tcp", Upstreams: []*ServiceTopologySummary{ { ServiceSummary: ServiceSummary{ @@ -1362,13 +1388,14 @@ func TestUIServiceTopology(t *testing.T) { t.Run("api", func(t *testing.T) { retry.Run(t, func(r *retry.R) { // Request topology for api - req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/api", nil) + req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/api?kind=", nil) resp := httptest.NewRecorder() obj, err := a.srv.UIServiceTopology(resp, req) assert.Nil(r, err) require.NoError(r, checkIndex(resp)) expect := ServiceTopology{ + Protocol: "tcp", Downstreams: []*ServiceTopologySummary{ { ServiceSummary: ServiceSummary{ @@ -1425,13 +1452,14 @@ func TestUIServiceTopology(t *testing.T) { t.Run("web", func(t *testing.T) { retry.Run(t, func(r *retry.R) { // Request topology for web - req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/web", nil) + req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/web?kind=", nil) resp := httptest.NewRecorder() obj, err := a.srv.UIServiceTopology(resp, req) assert.Nil(r, err) require.NoError(r, checkIndex(resp)) expect := ServiceTopology{ + Protocol: "http", Upstreams: []*ServiceTopologySummary{ { ServiceSummary: ServiceSummary{ @@ -1486,13 +1514,14 @@ func TestUIServiceTopology(t *testing.T) { t.Run("redis", func(t *testing.T) { retry.Run(t, func(r *retry.R) { // Request topology for redis - req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/redis", nil) + req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/redis?kind=", nil) resp := httptest.NewRecorder() obj, err := a.srv.UIServiceTopology(resp, req) assert.Nil(r, err) require.NoError(r, checkIndex(resp)) expect := ServiceTopology{ + Protocol: "http", Downstreams: []*ServiceTopologySummary{ { ServiceSummary: ServiceSummary{