diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 534513c8ab..6d43fc8f96 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -3,7 +3,7 @@ package consul import ( "fmt" - bexpr "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/serf/serf" @@ -613,6 +613,7 @@ func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply // ExportedPeeredServices is used to query the exported services for peers. // Returns services as a map of ServiceNames by peer. +// To get exported services for a single peer, use ExportedServicesForPeer. func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply *structs.IndexedExportedServiceList) error { if done, err := m.srv.ForwardRPC("Internal.ExportedPeeredServices", args, reply); done { return err @@ -642,6 +643,59 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply }) } +// ExportedServicesForPeer returns a list of Service names that are exported for a given peer. +func (m *Internal) ExportedServicesForPeer(args *structs.ServiceDumpRequest, reply *structs.IndexedServiceList) error { + if done, err := m.srv.ForwardRPC("Internal.ExportedServicesForPeer", args, reply); done { + return err + } + + var authzCtx acl.AuthorizerContext + authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzCtx) + if err != nil { + return err + } + if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + if args.PeerName == "" { + return fmt.Errorf("must provide PeerName") + } + + return m.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, store *state.Store) error { + + idx, p, err := store.PeeringRead(ws, state.Query{ + Value: args.PeerName, + EnterpriseMeta: args.EnterpriseMeta, + }) + if err != nil { + return fmt.Errorf("error while fetching peer %q: %w", args.PeerName, err) + } + if p == nil { + reply.Index = idx + reply.Services = nil + return errNotFound + } + idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, "") + if err != nil { + return fmt.Errorf("error while listing exported services for peer %q: %w", args.PeerName, err) + } + + reply.Index = idx + reply.Services = exportedSvcs.Services + + // If MeshWrite is allowed, we assume it is an operator role and + // return all the services. Otherwise, the results are filtered. + if authz.MeshWrite(&authzCtx) != acl.Allow { + m.srv.filterACLWithAuthorizer(authz, reply) + } + + return nil + }) +} + // PeeredUpstreams returns all imported services as upstreams for any service in a given partition. // Cluster peering does not replicate intentions so all imported services are considered potential upstreams. func (m *Internal) PeeredUpstreams(args *structs.PartitionSpecificRequest, reply *structs.IndexedPeeredServiceList) error { diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index ecec960ada..698fc56818 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -3487,6 +3487,124 @@ func TestInternal_PeeredUpstreams_ACLEnforcement(t *testing.T) { } } +func TestInternal_ExportedServicesForPeer_ACLEnforcement(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + _, s := testServerWithConfig(t, testServerACLConfig) + codec := rpcClient(t, s) + + require.NoError(t, s.fsm.State().PeeringWrite(1, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + ID: testUUID(), + Name: "peer-1", + }, + })) + require.NoError(t, s.fsm.State().PeeringWrite(1, &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + ID: testUUID(), + Name: "peer-2", + }, + })) + require.NoError(t, s.fsm.State().EnsureConfigEntry(1, &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "web", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-1"}, + }, + }, + { + Name: "db", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-2"}, + }, + }, + { + Name: "api", + Consumers: []structs.ServiceConsumer{ + {PeerName: "peer-1"}, + }, + }, + }, + })) + + type testcase struct { + name string + token string + expect structs.ServiceList + expectErr string + } + run := func(t *testing.T, tc testcase) { + var out *structs.IndexedServiceList + req := structs.ServiceDumpRequest{ + Datacenter: "dc1", + PeerName: "peer-1", + QueryOptions: structs.QueryOptions{Token: tc.token}, + } + err := msgpackrpc.CallWithCodec(codec, "Internal.ExportedServicesForPeer", &req, &out) + + if tc.expectErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectErr) + require.Nil(t, out) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expect, out.Services) + } + tcs := []testcase{ + { + name: "can read all", + token: tokenWithRules(t, codec, TestDefaultInitialManagementToken, + ` + service_prefix "" { + policy = "read" + } + `), + expect: structs.ServiceList{ + structs.NewServiceName("api", nil), + structs.NewServiceName("web", nil), + }, + }, + { + name: "filtered", + token: tokenWithRules(t, codec, TestDefaultInitialManagementToken, + ` + service "web" { policy = "read" } + service "api" { policy = "deny" } + `), + expect: structs.ServiceList{ + structs.NewServiceName("web", nil), + }, + }, + { + name: "no service rules filters all results", + token: tokenWithRules(t, codec, TestDefaultInitialManagementToken, + ``), + expect: structs.ServiceList{}, + }, + { + name: "no service rules but mesh write shows all results", + token: tokenWithRules(t, codec, TestDefaultInitialManagementToken, + `mesh = "write"`), + expect: structs.ServiceList{ + structs.NewServiceName("api", nil), + structs.NewServiceName("web", nil), + }, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + func testUUID() string { buf := make([]byte, 16) if _, err := rand.Read(buf); err != nil { diff --git a/agent/http_register.go b/agent/http_register.go index cfd1dc0861..6dd8b41c60 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -91,6 +91,7 @@ func init() { registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPHandlers).UINodes) registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPHandlers).UINodeInfo) registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPHandlers).UIServices) + registerEndpoint("/v1/internal/ui/exported-services", []string{"GET"}, (*HTTPHandlers).UIExportedServices) registerEndpoint("/v1/internal/ui/catalog-overview", []string{"GET"}, (*HTTPHandlers).UICatalogOverview) registerEndpoint("/v1/internal/ui/gateway-services-nodes/", []string{"GET"}, (*HTTPHandlers).UIGatewayServicesNodes) registerEndpoint("/v1/internal/ui/gateway-intentions/", []string{"GET"}, (*HTTPHandlers).UIGatewayIntentions) diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index a418a4017a..dde74c5853 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -780,3 +780,49 @@ func (s *HTTPHandlers) UIMetricsProxy(resp http.ResponseWriter, req *http.Reques proxy.ServeHTTP(resp, req) return nil, nil } + +// UIExportedServices is used to list the exported services to a given peer. We return a +// barebones ServiceListingSummary which only contains the name and enterprise meta of a service. +// Currently, the request and response mirror UIServices but the API may change in the future. +func (s *HTTPHandlers) UIExportedServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Parse arguments + args := structs.ServiceDumpRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + if peer := req.URL.Query().Get("peer"); peer != "" { + args.PeerName = peer + } + if err := s.parseEntMeta(req, &args.EnterpriseMeta); err != nil { + return nil, err + } + + // Make the RPC request + var out structs.IndexedServiceList + defer setMeta(resp, &out.QueryMeta) +RPC: + if err := s.agent.RPC("Internal.ExportedServicesForPeer", &args, &out); err != nil { + // Retry the request allowing stale data if no leader + if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale { + args.AllowStale = true + goto RPC + } + return nil, err + } + // Ensure at least a zero length slice + result := make([]*ServiceListingSummary, 0) + for _, svc := range out.Services { + // We synthesize a minimal summary for the frontend. + // The shape of the data may change in the future but + // currently only the service name is required. + sum := ServiceListingSummary{ + ServiceSummary: ServiceSummary{ + Name: svc.Name, + EnterpriseMeta: svc.EnterpriseMeta, + Datacenter: args.Datacenter, + }, + } + result = append(result, &sum) + } + return result, nil +} diff --git a/agent/ui_endpoint_test.go b/agent/ui_endpoint_test.go index 54543e2758..5ba9ca833d 100644 --- a/agent/ui_endpoint_test.go +++ b/agent/ui_endpoint_test.go @@ -3,6 +3,7 @@ package agent import ( "bytes" "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -669,6 +670,181 @@ func TestUIServices(t *testing.T) { }) } +func TestUIExportedServices(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := StartTestAgent(t, TestAgent{Overrides: `peering = { test_allow_peer_registrations = true }`}) + defer a.Shutdown() + + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + requests := []*structs.RegisterRequest{ + // register api service + { + Datacenter: "dc1", + Node: "node", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + Service: "api", + ID: "api-1", + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "node", + Name: "api svc check", + ServiceName: "api", + ServiceID: "api-1", + Status: api.HealthWarning, + }, + }, + }, + // register api-proxy svc + { + Datacenter: "dc1", + Node: "node", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + Service: "api-proxy", + ID: "api-proxy-1", + Tags: []string{}, + Meta: map[string]string{structs.MetaExternalSource: "k8s"}, + Port: 1234, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + }, + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "node", + Name: "api proxy listening", + ServiceName: "api-proxy", + ServiceID: "api-proxy-1", + Status: api.HealthPassing, + }, + }, + }, + // register service web + { + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + Service: "web", + ID: "web-1", + Tags: []string{}, + Meta: map[string]string{structs.MetaExternalSource: "k8s"}, + Port: 1234, + }, + Checks: []*structs.HealthCheck{ + { + Node: "bar", + Name: "web svc check", + Status: api.HealthCritical, + ServiceName: "web", + ServiceID: "web-1", + }, + }, + }, + } + + for _, args := range requests { + var out struct{} + require.NoError(t, a.RPC("Catalog.Register", args, &out)) + } + + // establish "peer1" + { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + req := &pbpeering.GenerateTokenRequest{ + PeerName: "peer1", + } + _, err := a.rpcClientPeering.GenerateToken(ctx, req) + require.NoError(t, err) + } + + { + // Register exported services + args := &structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "api", + Consumers: []structs.ServiceConsumer{ + { + PeerName: "peer1", + }, + }, + }, + }, + } + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: args, + } + var configOutput bool + require.NoError(t, a.RPC("ConfigEntry.Apply", &req, &configOutput)) + require.True(t, configOutput) + } + + t.Run("valid peer", func(t *testing.T) { + t.Parallel() + req, _ := http.NewRequest("GET", "/v1/internal/ui/exported-services?peer=peer1", nil) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + decoder := json.NewDecoder(resp.Body) + var summary []*ServiceListingSummary + require.NoError(t, decoder.Decode(&summary)) + assertIndex(t, resp) + + require.Len(t, summary, 1) + + // internal accounting that users don't see can be blown away + for _, sum := range summary { + sum.transparentProxySet = false + sum.externalSourceSet = nil + sum.checks = nil + } + + expected := []*ServiceListingSummary{ + { + ServiceSummary: ServiceSummary{ + Kind: structs.ServiceKindTypical, + Name: "api", + Datacenter: "dc1", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + }, + } + require.Equal(t, expected, summary) + }) + + t.Run("invalid peer", func(t *testing.T) { + t.Parallel() + req, _ := http.NewRequest("GET", "/v1/internal/ui/exported-services?peer=peer2", nil) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + decoder := json.NewDecoder(resp.Body) + var summary []*ServiceListingSummary + require.NoError(t, decoder.Decode(&summary)) + assertIndex(t, resp) + + require.Len(t, summary, 0) + }) +} + func TestUIGatewayServiceNodes_Terminating(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short")