mirror of https://github.com/status-im/consul.git
Add new internal endpoint to list exported services to a peer
This commit is contained in:
parent
440a161b3a
commit
2203cdc4db
|
@ -3,7 +3,7 @@ package consul
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
bexpr "github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
@ -613,6 +613,7 @@ func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply
|
|||
|
||||
// ExportedPeeredServices is used to query the exported services for peers.
|
||||
// Returns services as a map of ServiceNames by peer.
|
||||
// To get exported services for a single peer, use ExportedServicesForPeer.
|
||||
func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply *structs.IndexedExportedServiceList) error {
|
||||
if done, err := m.srv.ForwardRPC("Internal.ExportedPeeredServices", args, reply); done {
|
||||
return err
|
||||
|
@ -642,6 +643,59 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply
|
|||
})
|
||||
}
|
||||
|
||||
// ExportedServicesForPeer returns a list of Service names that are exported for a given peer.
|
||||
func (m *Internal) ExportedServicesForPeer(args *structs.ServiceDumpRequest, reply *structs.IndexedServiceList) error {
|
||||
if done, err := m.srv.ForwardRPC("Internal.ExportedServicesForPeer", args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
var authzCtx acl.AuthorizerContext
|
||||
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if args.PeerName == "" {
|
||||
return fmt.Errorf("must provide PeerName")
|
||||
}
|
||||
|
||||
return m.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, store *state.Store) error {
|
||||
|
||||
idx, p, err := store.PeeringRead(ws, state.Query{
|
||||
Value: args.PeerName,
|
||||
EnterpriseMeta: args.EnterpriseMeta,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while fetching peer %q: %w", args.PeerName, err)
|
||||
}
|
||||
if p == nil {
|
||||
reply.Index = idx
|
||||
reply.Services = nil
|
||||
return errNotFound
|
||||
}
|
||||
idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while listing exported services for peer %q: %w", args.PeerName, err)
|
||||
}
|
||||
|
||||
reply.Index = idx
|
||||
reply.Services = exportedSvcs.Services
|
||||
|
||||
// If MeshWrite is allowed, we assume it is an operator role and
|
||||
// return all the services. Otherwise, the results are filtered.
|
||||
if authz.MeshWrite(&authzCtx) != acl.Allow {
|
||||
m.srv.filterACLWithAuthorizer(authz, reply)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// PeeredUpstreams returns all imported services as upstreams for any service in a given partition.
|
||||
// Cluster peering does not replicate intentions so all imported services are considered potential upstreams.
|
||||
func (m *Internal) PeeredUpstreams(args *structs.PartitionSpecificRequest, reply *structs.IndexedPeeredServiceList) error {
|
||||
|
|
|
@ -3487,6 +3487,124 @@ func TestInternal_PeeredUpstreams_ACLEnforcement(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestInternal_ExportedServicesForPeer_ACLEnforcement(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
_, s := testServerWithConfig(t, testServerACLConfig)
|
||||
codec := rpcClient(t, s)
|
||||
|
||||
require.NoError(t, s.fsm.State().PeeringWrite(1, &pbpeering.PeeringWriteRequest{
|
||||
Peering: &pbpeering.Peering{
|
||||
ID: testUUID(),
|
||||
Name: "peer-1",
|
||||
},
|
||||
}))
|
||||
require.NoError(t, s.fsm.State().PeeringWrite(1, &pbpeering.PeeringWriteRequest{
|
||||
Peering: &pbpeering.Peering{
|
||||
ID: testUUID(),
|
||||
Name: "peer-2",
|
||||
},
|
||||
}))
|
||||
require.NoError(t, s.fsm.State().EnsureConfigEntry(1, &structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: "web",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{PeerName: "peer-1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "db",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{PeerName: "peer-2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "api",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{PeerName: "peer-1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
type testcase struct {
|
||||
name string
|
||||
token string
|
||||
expect structs.ServiceList
|
||||
expectErr string
|
||||
}
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
var out *structs.IndexedServiceList
|
||||
req := structs.ServiceDumpRequest{
|
||||
Datacenter: "dc1",
|
||||
PeerName: "peer-1",
|
||||
QueryOptions: structs.QueryOptions{Token: tc.token},
|
||||
}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Internal.ExportedServicesForPeer", &req, &out)
|
||||
|
||||
if tc.expectErr != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), tc.expectErr)
|
||||
require.Nil(t, out)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, out.Services)
|
||||
}
|
||||
tcs := []testcase{
|
||||
{
|
||||
name: "can read all",
|
||||
token: tokenWithRules(t, codec, TestDefaultInitialManagementToken,
|
||||
`
|
||||
service_prefix "" {
|
||||
policy = "read"
|
||||
}
|
||||
`),
|
||||
expect: structs.ServiceList{
|
||||
structs.NewServiceName("api", nil),
|
||||
structs.NewServiceName("web", nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "filtered",
|
||||
token: tokenWithRules(t, codec, TestDefaultInitialManagementToken,
|
||||
`
|
||||
service "web" { policy = "read" }
|
||||
service "api" { policy = "deny" }
|
||||
`),
|
||||
expect: structs.ServiceList{
|
||||
structs.NewServiceName("web", nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no service rules filters all results",
|
||||
token: tokenWithRules(t, codec, TestDefaultInitialManagementToken,
|
||||
``),
|
||||
expect: structs.ServiceList{},
|
||||
},
|
||||
{
|
||||
name: "no service rules but mesh write shows all results",
|
||||
token: tokenWithRules(t, codec, TestDefaultInitialManagementToken,
|
||||
`mesh = "write"`),
|
||||
expect: structs.ServiceList{
|
||||
structs.NewServiceName("api", nil),
|
||||
structs.NewServiceName("web", nil),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testUUID() string {
|
||||
buf := make([]byte, 16)
|
||||
if _, err := rand.Read(buf); err != nil {
|
||||
|
|
|
@ -91,6 +91,7 @@ func init() {
|
|||
registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPHandlers).UINodes)
|
||||
registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPHandlers).UINodeInfo)
|
||||
registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPHandlers).UIServices)
|
||||
registerEndpoint("/v1/internal/ui/exported-services", []string{"GET"}, (*HTTPHandlers).UIExportedServices)
|
||||
registerEndpoint("/v1/internal/ui/catalog-overview", []string{"GET"}, (*HTTPHandlers).UICatalogOverview)
|
||||
registerEndpoint("/v1/internal/ui/gateway-services-nodes/", []string{"GET"}, (*HTTPHandlers).UIGatewayServicesNodes)
|
||||
registerEndpoint("/v1/internal/ui/gateway-intentions/", []string{"GET"}, (*HTTPHandlers).UIGatewayIntentions)
|
||||
|
|
|
@ -780,3 +780,49 @@ func (s *HTTPHandlers) UIMetricsProxy(resp http.ResponseWriter, req *http.Reques
|
|||
proxy.ServeHTTP(resp, req)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// UIExportedServices is used to list the exported services to a given peer. We return a
|
||||
// barebones ServiceListingSummary which only contains the name and enterprise meta of a service.
|
||||
// Currently, the request and response mirror UIServices but the API may change in the future.
|
||||
func (s *HTTPHandlers) UIExportedServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
// Parse arguments
|
||||
args := structs.ServiceDumpRequest{}
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
if peer := req.URL.Query().Get("peer"); peer != "" {
|
||||
args.PeerName = peer
|
||||
}
|
||||
if err := s.parseEntMeta(req, &args.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Make the RPC request
|
||||
var out structs.IndexedServiceList
|
||||
defer setMeta(resp, &out.QueryMeta)
|
||||
RPC:
|
||||
if err := s.agent.RPC("Internal.ExportedServicesForPeer", &args, &out); err != nil {
|
||||
// Retry the request allowing stale data if no leader
|
||||
if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale {
|
||||
args.AllowStale = true
|
||||
goto RPC
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// Ensure at least a zero length slice
|
||||
result := make([]*ServiceListingSummary, 0)
|
||||
for _, svc := range out.Services {
|
||||
// We synthesize a minimal summary for the frontend.
|
||||
// The shape of the data may change in the future but
|
||||
// currently only the service name is required.
|
||||
sum := ServiceListingSummary{
|
||||
ServiceSummary: ServiceSummary{
|
||||
Name: svc.Name,
|
||||
EnterpriseMeta: svc.EnterpriseMeta,
|
||||
Datacenter: args.Datacenter,
|
||||
},
|
||||
}
|
||||
result = append(result, &sum)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package agent
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -669,6 +670,181 @@ func TestUIServices(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestUIExportedServices(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
a := StartTestAgent(t, TestAgent{Overrides: `peering = { test_allow_peer_registrations = true }`})
|
||||
defer a.Shutdown()
|
||||
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
|
||||
requests := []*structs.RegisterRequest{
|
||||
// register api service
|
||||
{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
Service: "api",
|
||||
ID: "api-1",
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "node",
|
||||
Name: "api svc check",
|
||||
ServiceName: "api",
|
||||
ServiceID: "api-1",
|
||||
Status: api.HealthWarning,
|
||||
},
|
||||
},
|
||||
},
|
||||
// register api-proxy svc
|
||||
{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
SkipNodeUpdate: true,
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
Service: "api-proxy",
|
||||
ID: "api-proxy-1",
|
||||
Tags: []string{},
|
||||
Meta: map[string]string{structs.MetaExternalSource: "k8s"},
|
||||
Port: 1234,
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "api",
|
||||
},
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "node",
|
||||
Name: "api proxy listening",
|
||||
ServiceName: "api-proxy",
|
||||
ServiceID: "api-proxy-1",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
},
|
||||
// register service web
|
||||
{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.2",
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
Service: "web",
|
||||
ID: "web-1",
|
||||
Tags: []string{},
|
||||
Meta: map[string]string{structs.MetaExternalSource: "k8s"},
|
||||
Port: 1234,
|
||||
},
|
||||
Checks: []*structs.HealthCheck{
|
||||
{
|
||||
Node: "bar",
|
||||
Name: "web svc check",
|
||||
Status: api.HealthCritical,
|
||||
ServiceName: "web",
|
||||
ServiceID: "web-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, args := range requests {
|
||||
var out struct{}
|
||||
require.NoError(t, a.RPC("Catalog.Register", args, &out))
|
||||
}
|
||||
|
||||
// establish "peer1"
|
||||
{
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
req := &pbpeering.GenerateTokenRequest{
|
||||
PeerName: "peer1",
|
||||
}
|
||||
_, err := a.rpcClientPeering.GenerateToken(ctx, req)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
{
|
||||
// Register exported services
|
||||
args := &structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: "api",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "peer1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
req := structs.ConfigEntryRequest{
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Datacenter: "dc1",
|
||||
Entry: args,
|
||||
}
|
||||
var configOutput bool
|
||||
require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput))
|
||||
require.True(t, configOutput)
|
||||
}
|
||||
|
||||
t.Run("valid peer", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/exported-services?peer=peer1", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
a.srv.h.ServeHTTP(resp, req)
|
||||
require.Equal(t, http.StatusOK, resp.Code)
|
||||
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
var summary []*ServiceListingSummary
|
||||
require.NoError(t, decoder.Decode(&summary))
|
||||
assertIndex(t, resp)
|
||||
|
||||
require.Len(t, summary, 1)
|
||||
|
||||
// internal accounting that users don't see can be blown away
|
||||
for _, sum := range summary {
|
||||
sum.transparentProxySet = false
|
||||
sum.externalSourceSet = nil
|
||||
sum.checks = nil
|
||||
}
|
||||
|
||||
expected := []*ServiceListingSummary{
|
||||
{
|
||||
ServiceSummary: ServiceSummary{
|
||||
Kind: structs.ServiceKindTypical,
|
||||
Name: "api",
|
||||
Datacenter: "dc1",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Equal(t, expected, summary)
|
||||
})
|
||||
|
||||
t.Run("invalid peer", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
req, _ := http.NewRequest("GET", "/v1/internal/ui/exported-services?peer=peer2", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
a.srv.h.ServeHTTP(resp, req)
|
||||
require.Equal(t, http.StatusOK, resp.Code)
|
||||
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
var summary []*ServiceListingSummary
|
||||
require.NoError(t, decoder.Decode(&summary))
|
||||
assertIndex(t, resp)
|
||||
|
||||
require.Len(t, summary, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestUIGatewayServiceNodes_Terminating(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
Loading…
Reference in New Issue