mirror of https://github.com/status-im/consul.git
Add protocol to the topology endpoint response (#8868)
This commit is contained in:
parent
2ec7d09381
commit
13df5d5bf8
|
@ -173,7 +173,7 @@ func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *
|
|||
defaultAllow = authz.IntentionDefaultAllow(nil)
|
||||
}
|
||||
|
||||
index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, defaultAllow, &args.EnterpriseMeta)
|
||||
index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, args.ServiceKind, defaultAllow, &args.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1639,6 +1639,7 @@ func TestInternal_ServiceTopology(t *testing.T) {
|
|||
var out structs.IndexedServiceTopology
|
||||
require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
require.False(r, out.FilteredByACLs)
|
||||
require.Equal(r, "http", out.ServiceTopology.MetricsProtocol)
|
||||
|
||||
// bar/web, bar/web-proxy, baz/web, baz/web-proxy
|
||||
require.Len(r, out.ServiceTopology.Upstreams, 4)
|
||||
|
@ -1664,6 +1665,7 @@ func TestInternal_ServiceTopology(t *testing.T) {
|
|||
var out structs.IndexedServiceTopology
|
||||
require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
require.False(r, out.FilteredByACLs)
|
||||
require.Equal(r, "http", out.ServiceTopology.MetricsProtocol)
|
||||
|
||||
// foo/api, foo/api-proxy
|
||||
require.Len(r, out.ServiceTopology.Downstreams, 2)
|
||||
|
@ -1699,6 +1701,7 @@ func TestInternal_ServiceTopology(t *testing.T) {
|
|||
var out structs.IndexedServiceTopology
|
||||
require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
require.False(r, out.FilteredByACLs)
|
||||
require.Equal(r, "http", out.ServiceTopology.MetricsProtocol)
|
||||
|
||||
require.Len(r, out.ServiceTopology.Upstreams, 0)
|
||||
|
||||
|
@ -1756,6 +1759,7 @@ service "web" { policy = "read" }
|
|||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
|
||||
require.True(t, out.FilteredByACLs)
|
||||
require.Equal(t, "http", out.ServiceTopology.MetricsProtocol)
|
||||
|
||||
// The web-proxy upstream gets filtered out from both bar and baz
|
||||
require.Len(t, out.ServiceTopology.Upstreams, 2)
|
||||
|
@ -1775,6 +1779,7 @@ service "web" { policy = "read" }
|
|||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
|
||||
|
||||
require.True(t, out.FilteredByACLs)
|
||||
require.Equal(t, "http", out.ServiceTopology.MetricsProtocol)
|
||||
|
||||
// The redis upstream gets filtered out but the api and proxy downstream are returned
|
||||
require.Len(t, out.ServiceTopology.Upstreams, 0)
|
||||
|
|
|
@ -2907,6 +2907,36 @@ func serviceGatewayNodes(tx *txn, ws memdb.WatchSet, service string, kind struct
|
|||
return maxIdx, ret, nil
|
||||
}
|
||||
|
||||
// metricsProtocolForIngressGateway determines the protocol that should be used when fetching metrics for an ingress gateway
|
||||
// Since ingress gateways may have listeners with different protocols, favor capturing all traffic by only returning HTTP
|
||||
// when all listeners are HTTP-like.
|
||||
func metricsProtocolForIngressGateway(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, string, error) {
|
||||
idx, conf, err := configEntryTxn(tx, ws, structs.IngressGateway, sn.Name, &sn.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return 0, "", fmt.Errorf("failed to get ingress-gateway config entry for %q: %v", sn.String(), err)
|
||||
}
|
||||
if conf == nil {
|
||||
return 0, "", nil
|
||||
}
|
||||
entry, ok := conf.(*structs.IngressGatewayConfigEntry)
|
||||
if !ok {
|
||||
return 0, "", fmt.Errorf("unexpected config entry type: %T", conf)
|
||||
}
|
||||
counts := make(map[string]int)
|
||||
for _, l := range entry.Listeners {
|
||||
if structs.IsProtocolHTTPLike(l.Protocol) {
|
||||
counts["http"] += 1
|
||||
} else {
|
||||
counts["tcp"] += 1
|
||||
}
|
||||
}
|
||||
protocol := "tcp"
|
||||
if counts["tcp"] == 0 && counts["http"] > 0 {
|
||||
protocol = "http"
|
||||
}
|
||||
return idx, protocol, nil
|
||||
}
|
||||
|
||||
// checkProtocolMatch filters out any GatewayService entries added from a wildcard with a protocol
|
||||
// that doesn't match the one configured in their discovery chain.
|
||||
func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayService) (uint64, bool, error) {
|
||||
|
@ -2925,6 +2955,7 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi
|
|||
func (s *Store) ServiceTopology(
|
||||
ws memdb.WatchSet,
|
||||
dc, service string,
|
||||
kind structs.ServiceKind,
|
||||
defaultAllow acl.EnforcementDecision,
|
||||
entMeta *structs.EnterpriseMeta,
|
||||
) (uint64, *structs.ServiceTopology, error) {
|
||||
|
@ -2932,10 +2963,30 @@ func (s *Store) ServiceTopology(
|
|||
defer tx.Abort()
|
||||
|
||||
var (
|
||||
maxIdx uint64
|
||||
sn = structs.NewServiceName(service, entMeta)
|
||||
maxIdx uint64
|
||||
protocol string
|
||||
err error
|
||||
|
||||
sn = structs.NewServiceName(service, entMeta)
|
||||
)
|
||||
|
||||
switch kind {
|
||||
case structs.ServiceKindIngressGateway:
|
||||
maxIdx, protocol, err = metricsProtocolForIngressGateway(tx, ws, sn)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to fetch protocol for service %s: %v", sn.String(), err)
|
||||
}
|
||||
|
||||
case structs.ServiceKindTypical:
|
||||
maxIdx, protocol, err = protocolForService(tx, ws, sn)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to fetch protocol for service %s: %v", sn.String(), err)
|
||||
}
|
||||
|
||||
default:
|
||||
return 0, nil, fmt.Errorf("unsupported kind %q", kind)
|
||||
}
|
||||
|
||||
idx, upstreamNames, err := upstreamsFromRegistrationTxn(tx, ws, sn)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
|
@ -2998,6 +3049,7 @@ func (s *Store) ServiceTopology(
|
|||
}
|
||||
|
||||
resp := &structs.ServiceTopology{
|
||||
MetricsProtocol: protocol,
|
||||
Upstreams: upstreams,
|
||||
Downstreams: downstreams,
|
||||
UpstreamDecisions: upstreamDecisions,
|
||||
|
|
|
@ -7182,3 +7182,182 @@ func TestCatalog_DownstreamsForService_Updates(t *testing.T) {
|
|||
require.Equal(t, uint64(6), idx)
|
||||
require.ElementsMatch(t, expect, names)
|
||||
}
|
||||
|
||||
func TestProtocolForIngressGateway(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
idx uint64
|
||||
entries []structs.ConfigEntry
|
||||
expect string
|
||||
}{
|
||||
{
|
||||
name: "all http like",
|
||||
idx: uint64(5),
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "h1-svc",
|
||||
Protocol: "http",
|
||||
},
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "h2-svc",
|
||||
Protocol: "http2",
|
||||
},
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "g-svc",
|
||||
Protocol: "grpc",
|
||||
},
|
||||
&structs.IngressGatewayConfigEntry{
|
||||
Kind: structs.IngressGateway,
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "h1-svc",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Port: 2222,
|
||||
Protocol: "http2",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "h2-svc",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Port: 3333,
|
||||
Protocol: "grpc",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "g-svc",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: "http",
|
||||
},
|
||||
{
|
||||
name: "all tcp",
|
||||
idx: uint64(6),
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.IngressGatewayConfigEntry{
|
||||
Kind: structs.IngressGateway,
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "tcp",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "zip",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Port: 2222,
|
||||
Protocol: "tcp",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "zop",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Port: 3333,
|
||||
Protocol: "tcp",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "zap",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: "tcp",
|
||||
},
|
||||
{
|
||||
name: "mix of both",
|
||||
idx: uint64(7),
|
||||
entries: []structs.ConfigEntry{
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "h1-svc",
|
||||
Protocol: "http",
|
||||
},
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "g-svc",
|
||||
Protocol: "grpc",
|
||||
},
|
||||
&structs.IngressGatewayConfigEntry{
|
||||
Kind: structs.IngressGateway,
|
||||
Name: "ingress",
|
||||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "h1-svc",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Port: 2222,
|
||||
Protocol: "tcp",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "zop",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Port: 3333,
|
||||
Protocol: "grpc",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "g-svc",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expect: "tcp",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
for _, entry := range tc.entries {
|
||||
require.NoError(t, entry.Normalize())
|
||||
require.NoError(t, entry.Validate())
|
||||
|
||||
require.NoError(t, s.EnsureConfigEntry(tc.idx, entry, structs.DefaultEnterpriseMeta()))
|
||||
}
|
||||
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
sn := structs.NewServiceName("ingress", structs.DefaultEnterpriseMeta())
|
||||
|
||||
idx, protocol, err := metricsProtocolForIngressGateway(tx, ws, sn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.idx, idx)
|
||||
require.Equal(t, tc.expect, protocol)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -519,6 +519,7 @@ type ServiceSpecificRequest struct {
|
|||
Datacenter string
|
||||
NodeMetaFilters map[string]string
|
||||
ServiceName string
|
||||
ServiceKind ServiceKind
|
||||
// DEPRECATED (singular-service-tag) - remove this when backwards RPC compat
|
||||
// with 1.2.x is not required.
|
||||
ServiceTag string
|
||||
|
@ -1893,6 +1894,9 @@ type ServiceTopology struct {
|
|||
|
||||
UpstreamDecisions map[string]IntentionDecisionSummary
|
||||
DownstreamDecisions map[string]IntentionDecisionSummary
|
||||
|
||||
// MetricsProtocol is the protocol of the service being queried
|
||||
MetricsProtocol string
|
||||
}
|
||||
|
||||
// IndexedConfigEntries has its own encoding logic which differs from
|
||||
|
|
|
@ -64,6 +64,7 @@ type ServiceTopologySummary struct {
|
|||
}
|
||||
|
||||
type ServiceTopology struct {
|
||||
Protocol string
|
||||
Upstreams []*ServiceTopologySummary
|
||||
Downstreams []*ServiceTopologySummary
|
||||
FilteredByACLs bool
|
||||
|
@ -281,6 +282,23 @@ func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Req
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
kind, ok := req.URL.Query()["kind"]
|
||||
if !ok {
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
fmt.Fprint(resp, "Missing service kind")
|
||||
return nil, nil
|
||||
}
|
||||
args.ServiceKind = structs.ServiceKind(kind[0])
|
||||
|
||||
switch args.ServiceKind {
|
||||
case structs.ServiceKindTypical, structs.ServiceKindIngressGateway:
|
||||
// allowed
|
||||
default:
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
fmt.Fprintf(resp, "Unsupported service kind %q", args.ServiceKind)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Make the RPC request
|
||||
var out structs.IndexedServiceTopology
|
||||
defer setMeta(resp, &out.QueryMeta)
|
||||
|
@ -324,6 +342,7 @@ RPC:
|
|||
}
|
||||
|
||||
topo := ServiceTopology{
|
||||
Protocol: out.ServiceTopology.MetricsProtocol,
|
||||
Upstreams: upstreamResp,
|
||||
Downstreams: downstreamResp,
|
||||
FilteredByACLs: out.FilteredByACLs,
|
||||
|
|
|
@ -12,12 +12,11 @@ import (
|
|||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
cleanhttp "github.com/hashicorp/go-cleanhttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -940,7 +939,7 @@ func TestUIServiceTopology(t *testing.T) {
|
|||
a := NewTestAgent(t, "")
|
||||
defer a.Shutdown()
|
||||
|
||||
// Register api -> web -> redis
|
||||
// Register ingress -> api -> web -> redis
|
||||
{
|
||||
registrations := map[string]*structs.RegisterRequest{
|
||||
"Node edge": {
|
||||
|
@ -1247,6 +1246,14 @@ func TestUIServiceTopology(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "api",
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceIntentionsConfigEntry{
|
||||
|
@ -1302,7 +1309,7 @@ func TestUIServiceTopology(t *testing.T) {
|
|||
Listeners: []structs.IngressListener{
|
||||
{
|
||||
Port: 1111,
|
||||
Protocol: "http",
|
||||
Protocol: "tcp",
|
||||
Services: []structs.IngressService{
|
||||
{
|
||||
Name: "api",
|
||||
|
@ -1320,16 +1327,35 @@ func TestUIServiceTopology(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
t.Run("request without kind", func(t *testing.T) {
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.UIServiceTopology(resp, req)
|
||||
require.Nil(t, err)
|
||||
require.Nil(t, obj)
|
||||
require.Equal(t, "Missing service kind", resp.Body.String())
|
||||
})
|
||||
|
||||
t.Run("request with unsupported kind", func(t *testing.T) {
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress?kind=not-a-kind", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.UIServiceTopology(resp, req)
|
||||
require.Nil(t, err)
|
||||
require.Nil(t, obj)
|
||||
require.Equal(t, `Unsupported service kind "not-a-kind"`, resp.Body.String())
|
||||
})
|
||||
|
||||
t.Run("ingress", func(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Request topology for ingress
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress", nil)
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/ingress?kind=ingress-gateway", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.UIServiceTopology(resp, req)
|
||||
assert.Nil(r, err)
|
||||
require.NoError(r, checkIndex(resp))
|
||||
|
||||
expect := ServiceTopology{
|
||||
Protocol: "tcp",
|
||||
Upstreams: []*ServiceTopologySummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
|
@ -1362,13 +1388,14 @@ func TestUIServiceTopology(t *testing.T) {
|
|||
t.Run("api", func(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Request topology for api
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/api", nil)
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/api?kind=", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.UIServiceTopology(resp, req)
|
||||
assert.Nil(r, err)
|
||||
require.NoError(r, checkIndex(resp))
|
||||
|
||||
expect := ServiceTopology{
|
||||
Protocol: "tcp",
|
||||
Downstreams: []*ServiceTopologySummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
|
@ -1425,13 +1452,14 @@ func TestUIServiceTopology(t *testing.T) {
|
|||
t.Run("web", func(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Request topology for web
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/web", nil)
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/web?kind=", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.UIServiceTopology(resp, req)
|
||||
assert.Nil(r, err)
|
||||
require.NoError(r, checkIndex(resp))
|
||||
|
||||
expect := ServiceTopology{
|
||||
Protocol: "http",
|
||||
Upstreams: []*ServiceTopologySummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
|
@ -1486,13 +1514,14 @@ func TestUIServiceTopology(t *testing.T) {
|
|||
t.Run("redis", func(t *testing.T) {
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
// Request topology for redis
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/redis", nil)
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/redis?kind=", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.UIServiceTopology(resp, req)
|
||||
assert.Nil(r, err)
|
||||
require.NoError(r, checkIndex(resp))
|
||||
|
||||
expect := ServiceTopology{
|
||||
Protocol: "http",
|
||||
Downstreams: []*ServiceTopologySummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
|
|
Loading…
Reference in New Issue