Gateway Services Nodes UI Endpoint (#7685)

The endpoint supports queries for both Ingress Gateways and Terminating Gateways. Used to display a gateway's linked services in the UI.
This commit is contained in:
Freddy 2020-05-11 11:35:17 -06:00 committed by GitHub
parent 136549205c
commit b3ec383d04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1125 additions and 17 deletions

View File

@ -26,10 +26,10 @@ func TestCatalogServiceList(t *testing.T) {
reply := args.Get(2).(*structs.IndexedServiceList)
reply.Services = structs.ServiceList{
structs.ServiceInfo{
structs.ServiceName{
Name: "foo",
},
structs.ServiceInfo{
structs.ServiceName{
Name: "bar",
},
}

View File

@ -1171,6 +1171,23 @@ func (f *aclFilter) allowNode(node string, ent *acl.AuthorizerContext) bool {
return f.authorizer.NodeRead(node, ent) == acl.Allow
}
// allowNode is used to determine if the gateway and service are accessible for an ACL
func (f *aclFilter) allowGateway(gs *structs.GatewayService) bool {
var authzContext acl.AuthorizerContext
// Need read on service and gateway. Gateway may have different EnterpriseMeta so we fill authzContext twice
gs.Gateway.FillAuthzContext(&authzContext)
if !f.allowService(gs.Gateway.ID, &authzContext) {
return false
}
gs.Service.FillAuthzContext(&authzContext)
if !f.allowService(gs.Service.ID, &authzContext) {
return false
}
return true
}
// allowService is used to determine if a service is accessible for an ACL.
func (f *aclFilter) allowService(service string, ent *acl.AuthorizerContext) bool {
if service == "" {
@ -1438,6 +1455,33 @@ func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) {
*dump = nd
}
// filterServiceDump is used to filter nodes based on ACL rules.
func (f *aclFilter) filterServiceDump(services *structs.ServiceDump) {
svcs := *services
var authzContext acl.AuthorizerContext
for i := 0; i < len(svcs); i++ {
service := svcs[i]
if f.allowGateway(service.GatewayService) {
// ServiceDump might only have gateway config and no node information
if service.Node == nil {
continue
}
service.Service.FillAuthzContext(&authzContext)
if f.allowNode(service.Node.Node, &authzContext) {
continue
}
}
f.logger.Debug("dropping service from result due to ACLs", "service", service.GatewayService.Service)
svcs = append(svcs[:i], svcs[i+1:]...)
i--
}
*services = svcs
}
// filterNodes is used to filter through all parts of a node list and remove
// elements the provided ACL token cannot access.
func (f *aclFilter) filterNodes(nodes *structs.Nodes) {
@ -1749,6 +1793,9 @@ func (r *ACLResolver) filterACLWithAuthorizer(authorizer acl.Authorizer, subj in
case *structs.IndexedNodeDump:
filt.filterNodeDump(&v.Dump)
case *structs.IndexedServiceDump:
filt.filterServiceDump(&v.Dump)
case *structs.IndexedNodes:
filt.filterNodes(&v.Nodes)

View File

@ -127,6 +127,84 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
})
}
// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config
func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error {
if done, err := m.srv.forward("Internal.GatewayServiceDump", args, args, reply); done {
return err
}
// Verify the arguments
if args.ServiceName == "" {
return fmt.Errorf("Must provide gateway name")
}
var authzContext acl.AuthorizerContext
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
// We need read access to the gateway we're trying to find services for, so check that first.
if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
err = m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var maxIdx uint64
idx, gatewayServices, err := state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if idx > maxIdx {
maxIdx = idx
}
// Loop over the gateway <-> serviceName mappings and fetch all service instances for each
var result structs.ServiceDump
for _, gs := range gatewayServices {
idx, instances, err := state.CheckServiceNodes(ws, gs.Service.ID, &gs.Service.EnterpriseMeta)
if err != nil {
return err
}
if idx > maxIdx {
maxIdx = idx
}
for _, n := range instances {
svc := structs.ServiceInfo{
Node: n.Node,
Service: n.Service,
Checks: n.Checks,
GatewayService: gs,
}
result = append(result, &svc)
}
// Ensure we store the gateway <-> service mapping even if there are no instances of the service
if len(instances) == 0 {
svc := structs.ServiceInfo{
GatewayService: gs,
}
result = append(result, &svc)
}
}
reply.Index, reply.Dump = maxIdx, result
if err := m.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
return err
}
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
// call to fire an event. The primary use case is to enable user events being
// triggered in a remote DC.

View File

@ -6,13 +6,13 @@ import (
"strings"
"testing"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -1146,3 +1146,668 @@ service "gateway" {
assert.Equal(r, expect, resp.Services)
})
}
func TestInternal_GatewayServiceDump_Terminating(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// Register gateway and two service instances that will be associated with it
{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "terminating-gateway",
Service: "terminating-gateway",
Kind: structs.ServiceKindTerminatingGateway,
Port: 443,
},
Check: &structs.HealthCheck{
Name: "terminating connect",
Status: api.HealthPassing,
ServiceID: "terminating-gateway",
},
}
var out struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "db-warning",
Status: api.HealthWarning,
ServiceID: "db",
},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "baz",
Address: "127.0.0.3",
Service: &structs.NodeService{
ID: "db2",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "db2-passing",
Status: api.HealthPassing,
ServiceID: "db2",
},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
}
// Register terminating-gateway config entry, linking it to db, api, and redis (dne)
{
args := &structs.TerminatingGatewayConfigEntry{
Name: "terminating-gateway",
Kind: structs.TerminatingGateway,
Services: []structs.LinkedService{
{
Name: "db",
},
{
Name: "redis",
CAFile: "/etc/certs/ca.pem",
CertFile: "/etc/certs/cert.pem",
KeyFile: "/etc/certs/key.pem",
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &configOutput))
require.True(t, configOutput)
}
var out structs.IndexedServiceDump
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "terminating-gateway",
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out))
dump := out.Dump
// Reset raft indices to facilitate assertion
for i := 0; i < len(dump); i++ {
svc := dump[i]
if svc.Node != nil {
svc.Node.RaftIndex = structs.RaftIndex{}
}
if svc.Service != nil {
svc.Service.RaftIndex = structs.RaftIndex{}
}
if len(svc.Checks) > 0 && svc.Checks[0] != nil {
svc.Checks[0].RaftIndex = structs.RaftIndex{}
}
if svc.GatewayService != nil {
svc.GatewayService.RaftIndex = structs.RaftIndex{}
}
}
expect := structs.ServiceDump{
{
Node: &structs.Node{
Node: "baz",
Address: "127.0.0.3",
Datacenter: "dc1",
},
Service: &structs.NodeService{
ID: "db2",
Service: "db",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
Checks: structs.HealthChecks{
{
Node: "baz",
CheckID: types.CheckID("db2-passing"),
Name: "db2-passing",
Status: "passing",
ServiceID: "db2",
ServiceName: "db",
},
},
GatewayService: &structs.GatewayService{
Gateway: structs.ServiceID{ID: "terminating-gateway"},
Service: structs.ServiceID{ID: "db"},
GatewayKind: "terminating-gateway",
},
},
{
Node: &structs.Node{
Node: "bar",
Address: "127.0.0.2",
Datacenter: "dc1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
Checks: structs.HealthChecks{
{
Node: "bar",
CheckID: types.CheckID("db-warning"),
Name: "db-warning",
Status: "warning",
ServiceID: "db",
ServiceName: "db",
},
},
GatewayService: &structs.GatewayService{
Gateway: structs.ServiceID{ID: "terminating-gateway"},
Service: structs.ServiceID{ID: "db"},
GatewayKind: "terminating-gateway",
},
},
{
// Only GatewayService should be returned when linked service isn't registered
GatewayService: &structs.GatewayService{
Gateway: structs.ServiceID{ID: "terminating-gateway"},
Service: structs.ServiceID{ID: "redis"},
GatewayKind: "terminating-gateway",
CAFile: "/etc/certs/ca.pem",
CertFile: "/etc/certs/cert.pem",
KeyFile: "/etc/certs/key.pem",
},
},
}
assert.ElementsMatch(t, expect, dump)
}
func TestInternal_GatewayServiceDump_Terminating_ACL(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root"))
// Create the ACL.
token, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `
service "db" { policy = "read" }
service "terminating-gateway" { policy = "read" }
node_prefix "" { policy = "read" }`)
require.NoError(t, err)
// Register gateway and two service instances that will be associated with it
{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "terminating-gateway",
Service: "terminating-gateway",
Kind: structs.ServiceKindTerminatingGateway,
Port: 443,
},
Check: &structs.HealthCheck{
Name: "terminating connect",
Status: api.HealthPassing,
ServiceID: "terminating-gateway",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "db-warning",
Status: api.HealthWarning,
ServiceID: "db",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "baz",
Address: "127.0.0.3",
Service: &structs.NodeService{
ID: "api",
Service: "api",
},
Check: &structs.HealthCheck{
Name: "api-passing",
Status: api.HealthPassing,
ServiceID: "api",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
}
// Register terminating-gateway config entry, linking it to db and api
{
args := &structs.TerminatingGatewayConfigEntry{
Name: "terminating-gateway",
Kind: structs.TerminatingGateway,
Services: []structs.LinkedService{
{Name: "db"},
{Name: "api"},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out))
require.True(t, out)
}
var out structs.IndexedServiceDump
// Not passing a token with service:read on Gateway leads to PermissionDenied
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "terminating-gateway",
}
err = msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out)
require.Error(t, err, acl.ErrPermissionDenied)
// Passing a token without service:read on api leads to it getting filtered out
req = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "terminating-gateway",
QueryOptions: structs.QueryOptions{Token: token.SecretID},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out))
nodes := out.Dump
require.Len(t, nodes, 1)
require.Equal(t, nodes[0].Node.Node, "bar")
require.Equal(t, nodes[0].Service.Service, "db")
require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning)
}
func TestInternal_GatewayServiceDump_Ingress(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// Register gateway and service instance that will be associated with it
{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "ingress-gateway",
Service: "ingress-gateway",
Kind: structs.ServiceKindIngressGateway,
Port: 8443,
},
Check: &structs.HealthCheck{
Name: "ingress connect",
Status: api.HealthPassing,
ServiceID: "ingress-gateway",
},
}
var regOutput struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &regOutput))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "db-warning",
Status: api.HealthWarning,
ServiceID: "db",
},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &regOutput))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "baz",
Address: "127.0.0.3",
Service: &structs.NodeService{
ID: "db2",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "db2-passing",
Status: api.HealthPassing,
ServiceID: "db2",
},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &regOutput))
// Register ingress-gateway config entry, linking it to db and redis (dne)
args := &structs.IngressGatewayConfigEntry{
Name: "ingress-gateway",
Kind: structs.IngressGateway,
Listeners: []structs.IngressListener{
{
Port: 8888,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "db",
},
},
},
{
Port: 8080,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "web",
},
},
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &configOutput))
require.True(t, configOutput)
}
var out structs.IndexedServiceDump
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "ingress-gateway",
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out))
dump := out.Dump
// Reset raft indices to facilitate assertion
for i := 0; i < len(dump); i++ {
svc := dump[i]
if svc.Node != nil {
svc.Node.RaftIndex = structs.RaftIndex{}
}
if svc.Service != nil {
svc.Service.RaftIndex = structs.RaftIndex{}
}
if len(svc.Checks) > 0 && svc.Checks[0] != nil {
svc.Checks[0].RaftIndex = structs.RaftIndex{}
}
if svc.GatewayService != nil {
svc.GatewayService.RaftIndex = structs.RaftIndex{}
}
}
expect := structs.ServiceDump{
{
Node: &structs.Node{
Node: "bar",
Address: "127.0.0.2",
Datacenter: "dc1",
},
Service: &structs.NodeService{
Kind: "",
ID: "db",
Service: "db",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
Checks: structs.HealthChecks{
{
Node: "bar",
CheckID: types.CheckID("db-warning"),
Name: "db-warning",
Status: "warning",
ServiceID: "db",
ServiceName: "db",
},
},
GatewayService: &structs.GatewayService{
Gateway: structs.ServiceID{ID: "ingress-gateway"},
Service: structs.ServiceID{ID: "db"},
GatewayKind: "ingress-gateway",
Port: 8888,
},
},
{
Node: &structs.Node{
Node: "baz",
Address: "127.0.0.3",
Datacenter: "dc1",
},
Service: &structs.NodeService{
ID: "db2",
Service: "db",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
Checks: structs.HealthChecks{
{
Node: "baz",
CheckID: types.CheckID("db2-passing"),
Name: "db2-passing",
Status: "passing",
ServiceID: "db2",
ServiceName: "db",
},
},
GatewayService: &structs.GatewayService{
Gateway: structs.ServiceID{ID: "ingress-gateway"},
Service: structs.ServiceID{ID: "db"},
GatewayKind: "ingress-gateway",
Port: 8888,
},
},
{
// Only GatewayService should be returned when upstream isn't registered
GatewayService: &structs.GatewayService{
Gateway: structs.ServiceID{ID: "ingress-gateway"},
Service: structs.ServiceID{ID: "web"},
GatewayKind: "ingress-gateway",
Port: 8080,
},
},
}
assert.ElementsMatch(t, expect, dump)
}
func TestInternal_GatewayServiceDump_Ingress_ACL(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root"))
// Create the ACL.
token, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", `
service "db" { policy = "read" }
service "ingress-gateway" { policy = "read" }
node_prefix "" { policy = "read" }`)
require.NoError(t, err)
// Register gateway and two service instances that will be associated with it
{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "ingress-gateway",
Service: "ingress-gateway",
Kind: structs.ServiceKindIngressGateway,
},
Check: &structs.HealthCheck{
Name: "ingress connect",
Status: api.HealthPassing,
ServiceID: "ingress-gateway",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "db-warning",
Status: api.HealthWarning,
ServiceID: "db",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "baz",
Address: "127.0.0.3",
Service: &structs.NodeService{
ID: "api",
Service: "api",
},
Check: &structs.HealthCheck{
Name: "api-passing",
Status: api.HealthPassing,
ServiceID: "api",
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
}
// Register ingress-gateway config entry, linking it to db and api
{
args := &structs.IngressGatewayConfigEntry{
Name: "ingress-gateway",
Kind: structs.IngressGateway,
Listeners: []structs.IngressListener{
{
Port: 8888,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "db",
},
},
},
{
Port: 8080,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "web",
},
},
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
WriteRequest: structs.WriteRequest{Token: "root"},
}
var out bool
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out))
require.True(t, out)
}
var out structs.IndexedServiceDump
// Not passing a token with service:read on Gateway leads to PermissionDenied
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "ingress-gateway",
}
err = msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out)
require.Error(t, err, acl.ErrPermissionDenied)
// Passing a token without service:read on api leads to it getting filtered out
req = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "ingress-gateway",
QueryOptions: structs.QueryOptions{Token: token.SecretID},
}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServiceDump", &req, &out))
nodes := out.Dump
require.Len(t, nodes, 1)
require.Equal(t, nodes[0].Node.Node, "bar")
require.Equal(t, nodes[0].Service.Service, "db")
require.Equal(t, nodes[0].Checks[0].Status, api.HealthWarning)
}

View File

@ -882,7 +882,7 @@ func (s *Store) serviceListTxn(tx *memdb.Txn, ws memdb.WatchSet, entMeta *struct
results := make(structs.ServiceList, 0, len(unique))
for sid, _ := range unique {
results = append(results, structs.ServiceInfo{Name: sid.ID, EnterpriseMeta: sid.EnterpriseMeta})
results = append(results, structs.ServiceName{Name: sid.ID, EnterpriseMeta: sid.EnterpriseMeta})
}
return idx, results, nil

View File

@ -2,11 +2,6 @@ package state
import (
"fmt"
"reflect"
"sort"
"strings"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
@ -16,6 +11,10 @@ import (
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"reflect"
"sort"
"strings"
"testing"
)
func makeRandomNodeID(t *testing.T) types.NodeID {

View File

@ -94,6 +94,7 @@ func init() {
registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPServer).UINodes)
registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPServer).UINodeInfo)
registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPServer).UIServices)
registerEndpoint("/v1/internal/ui/gateway-services-nodes/", []string{"GET"}, (*HTTPServer).UIGatewayServicesNodes)
registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPServer).ACLAuthorize)
registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPServer).KVSEndpoint)
registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPServer).OperatorRaftConfiguration)

View File

@ -1554,6 +1554,20 @@ func (nodes CheckServiceNodes) Shuffle() {
}
}
func (nodes CheckServiceNodes) ToServiceDump() ServiceDump {
var ret ServiceDump
for i := range nodes {
svc := ServiceInfo{
Node: nodes[i].Node,
Service: nodes[i].Service,
Checks: nodes[i].Checks,
GatewayService: nil,
}
ret = append(ret, &svc)
}
return ret
}
// ShallowClone duplicates the slice and underlying array.
func (nodes CheckServiceNodes) ShallowClone() CheckServiceNodes {
dup := make(CheckServiceNodes, len(nodes))
@ -1617,6 +1631,15 @@ type NodeInfo struct {
// as it is rather expensive to generate.
type NodeDump []*NodeInfo
type ServiceInfo struct {
Node *Node
Service *NodeService
Checks HealthChecks
GatewayService *GatewayService
}
type ServiceDump []*ServiceInfo
type CheckID struct {
ID types.CheckID
EnterpriseMeta
@ -1702,16 +1725,16 @@ type IndexedServices struct {
QueryMeta
}
type ServiceInfo struct {
type ServiceName struct {
Name string
EnterpriseMeta
}
func (si *ServiceInfo) ToServiceID() ServiceID {
func (si *ServiceName) ToServiceID() ServiceID {
return ServiceID{ID: si.Name, EnterpriseMeta: si.EnterpriseMeta}
}
type ServiceList []ServiceInfo
type ServiceList []ServiceName
type IndexedServiceList struct {
Services ServiceList
@ -1755,6 +1778,11 @@ type IndexedNodeDump struct {
QueryMeta
}
type IndexedServiceDump struct {
Dump ServiceDump
QueryMeta
}
type IndexedGatewayServices struct {
Services GatewayServices
QueryMeta

View File

@ -15,6 +15,10 @@ import (
// to extract this.
const metaExternalSource = "external-source"
type GatewayConfig struct {
ListenerPort int
}
// ServiceSummary is used to summarize a service
type ServiceSummary struct {
Kind structs.ServiceKind `json:",omitempty"`
@ -29,6 +33,7 @@ type ServiceSummary struct {
ChecksCritical int
ExternalSources []string
externalSourceSet map[string]struct{} // internal to track uniqueness
GatewayConfig GatewayConfig `json:",omitempty"`
structs.EnterpriseMeta
}
@ -155,10 +160,45 @@ RPC:
}
// Generate the summary
return summarizeServices(out.Nodes), nil
// TODO (gateways) (freddy) Have Internal.ServiceDump return ServiceDump instead. Need to add bexpr filtering for type.
return summarizeServices(out.Nodes.ToServiceDump()), nil
}
func summarizeServices(dump structs.CheckServiceNodes) []*ServiceSummary {
// UIGatewayServices is used to query all the nodes for services associated with a gateway along with their gateway config
func (s *HTTPServer) UIGatewayServicesNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Parse arguments
args := structs.ServiceSpecificRequest{}
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Pull out the service name
args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/internal/ui/gateway-services-nodes/")
if args.ServiceName == "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing gateway name")
return nil, nil
}
// Make the RPC request
var out structs.IndexedServiceDump
defer setMeta(resp, &out.QueryMeta)
RPC:
if err := s.agent.RPC("Internal.GatewayServiceDump", &args, &out); err != nil {
// Retry the request allowing stale data if no leader
if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale {
args.AllowStale = true
goto RPC
}
return nil, err
}
return summarizeServices(out.Dump), nil
}
func summarizeServices(dump structs.ServiceDump) []*ServiceSummary {
// Collect the summary information
var services []structs.ServiceID
summary := make(map[structs.ServiceID]*ServiceSummary)
@ -179,8 +219,19 @@ func summarizeServices(dump structs.CheckServiceNodes) []*ServiceSummary {
}
for _, csn := range dump {
if csn.GatewayService != nil {
sum := getService(csn.GatewayService.Service)
sum.GatewayConfig.ListenerPort = csn.GatewayService.Port
}
// Will happen in cases where we only have the GatewayServices mapping
if csn.Service == nil {
continue
}
sid := structs.NewServiceID(csn.Service.Service, &csn.Service.EnterpriseMeta)
sum := getService(sid)
svc := csn.Service
sum := getService(structs.NewServiceID(svc.Service, &svc.EnterpriseMeta))
sum.Nodes = append(sum.Nodes, csn.Node.Node)
sum.Kind = svc.Kind
sum.InstanceCount += 1
@ -240,9 +291,10 @@ func summarizeServices(dump structs.CheckServiceNodes) []*ServiceSummary {
})
output := make([]*ServiceSummary, len(summary))
for idx, service := range services {
// Sort the nodes
// Sort the nodes and tags
sum := summary[service]
sort.Strings(sum.Nodes)
sort.Strings(sum.Tags)
output[idx] = sum
}
return output

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/testrpc"
cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -415,3 +416,240 @@ func TestUiServices(t *testing.T) {
require.ElementsMatch(t, expected, summary)
})
}
func TestUIGatewayServiceNodes_Terminating(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
// Register terminating gateway and a service that will be associated with it
{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "terminating-gateway",
Service: "terminating-gateway",
Kind: structs.ServiceKindTerminatingGateway,
Port: 443,
},
Check: &structs.HealthCheck{
Name: "terminating connect",
Status: api.HealthPassing,
ServiceID: "terminating-gateway",
},
}
var regOutput struct{}
require.NoError(t, a.RPC("Catalog.Register", &arg, &regOutput))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
},
Check: &structs.HealthCheck{
Name: "db-warning",
Status: api.HealthWarning,
ServiceID: "db",
},
}
require.NoError(t, a.RPC("Catalog.Register", &arg, &regOutput))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "baz",
Address: "127.0.0.3",
Service: &structs.NodeService{
ID: "db2",
Service: "db",
Tags: []string{"backup"},
},
Check: &structs.HealthCheck{
Name: "db2-passing",
Status: api.HealthPassing,
ServiceID: "db2",
},
}
require.NoError(t, a.RPC("Catalog.Register", &arg, &regOutput))
// Register terminating-gateway config entry, linking it to db and redis (does not exist)
args := &structs.TerminatingGatewayConfigEntry{
Name: "terminating-gateway",
Kind: structs.TerminatingGateway,
Services: []structs.LinkedService{
{
Name: "db",
},
{
Name: "redis",
CAFile: "/etc/certs/ca.pem",
CertFile: "/etc/certs/cert.pem",
KeyFile: "/etc/certs/key.pem",
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
}
var configOutput bool
require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput))
require.True(t, configOutput)
}
// Request
req, _ := http.NewRequest("GET", "/v1/internal/ui/gateway-services-nodes/terminating-gateway", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.UIGatewayServicesNodes(resp, req)
assert.Nil(t, err)
assertIndex(t, resp)
dump := obj.([]*ServiceSummary)
expect := []*ServiceSummary{
{
Name: "redis",
},
{
Name: "db",
Tags: []string{"backup", "primary"},
Nodes: []string{"bar", "baz"},
InstanceCount: 2,
ChecksPassing: 1,
ChecksWarning: 1,
ChecksCritical: 0,
},
}
assert.ElementsMatch(t, expect, dump)
}
func TestUIGatewayServiceNodes_Ingress(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
// Register ingress gateway and a service that will be associated with it
{
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "ingress-gateway",
Service: "ingress-gateway",
Kind: structs.ServiceKindIngressGateway,
Port: 8443,
},
Check: &structs.HealthCheck{
Name: "ingress connect",
Status: api.HealthPassing,
ServiceID: "ingress-gateway",
},
}
var regOutput struct{}
require.NoError(t, a.RPC("Catalog.Register", &arg, &regOutput))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
},
Check: &structs.HealthCheck{
Name: "db-warning",
Status: api.HealthWarning,
ServiceID: "db",
},
}
require.NoError(t, a.RPC("Catalog.Register", &arg, &regOutput))
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "baz",
Address: "127.0.0.3",
Service: &structs.NodeService{
ID: "db2",
Service: "db",
Tags: []string{"backup"},
},
Check: &structs.HealthCheck{
Name: "db2-passing",
Status: api.HealthPassing,
ServiceID: "db2",
},
}
require.NoError(t, a.RPC("Catalog.Register", &arg, &regOutput))
// Register ingress-gateway config entry, linking it to db and redis (does not exist)
args := &structs.IngressGatewayConfigEntry{
Name: "ingress-gateway",
Kind: structs.IngressGateway,
Listeners: []structs.IngressListener{
{
Port: 8888,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "db",
},
},
},
{
Port: 8080,
Protocol: "tcp",
Services: []structs.IngressService{
{
Name: "web",
},
},
},
},
}
req := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: args,
}
var configOutput bool
require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput))
require.True(t, configOutput)
}
// Request
req, _ := http.NewRequest("GET", "/v1/internal/ui/gateway-services-nodes/ingress-gateway", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.UIGatewayServicesNodes(resp, req)
assert.Nil(t, err)
assertIndex(t, resp)
dump := obj.([]*ServiceSummary)
expect := []*ServiceSummary{
{
Name: "web",
GatewayConfig: GatewayConfig{ListenerPort: 8080},
},
{
Name: "db",
Tags: []string{"backup", "primary"},
Nodes: []string{"bar", "baz"},
InstanceCount: 2,
ChecksPassing: 1,
ChecksWarning: 1,
ChecksCritical: 0,
GatewayConfig: GatewayConfig{ListenerPort: 8888},
},
}
assert.ElementsMatch(t, expect, dump)
}