From 08759e46ed41ef3f0a7da1408691f290926dd47d Mon Sep 17 00:00:00 2001 From: freddygv Date: Sat, 13 Mar 2021 21:44:24 -0700 Subject: [PATCH] Add RPC endpoint for intention upstreams --- agent/consul/catalog_endpoint.go | 2 +- agent/consul/helper_test.go | 135 +++++++++++++++++++++++++ agent/consul/internal_endpoint.go | 43 ++++++++ agent/consul/internal_endpoint_test.go | 121 ++++++++++++++++++++++ 4 files changed, 300 insertions(+), 1 deletion(-) diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 4641db2d98..9ee9fa9c04 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -397,7 +397,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - index, services, err := state.ServiceList(ws, &args.EnterpriseMeta) + index, services, err := state.ServiceList(ws, nil, &args.EnterpriseMeta) if err != nil { return err } diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 1a0919527c..a80bb97eaf 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -910,3 +910,138 @@ func registerTestTopologyEntries(t *testing.T, codec rpc.ClientCodec, token stri require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out)) } } + +func registetIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token string) { + t.Helper() + + // api and api-proxy on node foo + // web and web-proxy on node foo + // redis and redis-proxy on node foo + // * -> * (deny) intention + // web -> api (allow) + registrations := map[string]*structs.RegisterRequest{ + "Node foo": { + Datacenter: "dc1", + Node: "foo", + ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"), + Address: "127.0.0.2", + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service api on foo": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "api", + Service: "api", + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service api-proxy": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Port: 8443, + Address: "198.18.1.2", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service web on foo": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "web", + Service: "web", + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service web-proxy on foo": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Port: 8080, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service redis on foo": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "redis", + Service: "redis", + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service redis-proxy on foo": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "redis-proxy", + Service: "redis-proxy", + Port: 1234, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "redis", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + } + registerTestCatalogEntriesMap(t, codec, registrations) + + // Add intentions: deny all and web -> api + entries := []structs.ConfigEntryRequest{ + { + Datacenter: "dc1", + Entry: &structs.ServiceIntentionsConfigEntry{ + Kind: structs.ServiceIntentions, + Name: "api", + Sources: []*structs.SourceIntention{ + { + Name: "web", + Action: structs.IntentionActionAllow, + }, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + { + Datacenter: "dc1", + Entry: &structs.ServiceIntentionsConfigEntry{ + Kind: structs.ServiceIntentions, + Name: "*", + Sources: []*structs.SourceIntention{ + { + Name: "*", + Action: structs.IntentionActionDeny, + }, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + } + for _, req := range entries { + var out bool + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &req, &out)) + } +} diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 48fd18505f..f138e486e7 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -188,6 +188,49 @@ func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply * }) } +// IntentionUpstreams returns the upstreams or downstreams of a service. Upstreams and downstreams are inferred from intentions. +// If intentions allow a connection from the target to some candidate service, the candidate service is considered +// an upstream of the target. +func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error { + // Exit early if Connect hasn't been enabled. + if !m.srv.config.ConnectEnabled { + return ErrConnectNotEnabled + } + if args.ServiceName == "" { + return fmt.Errorf("Must provide a service name") + } + if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreams", args, args, reply); done { + return err + } + + authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) + if err != nil { + return err + } + if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + + return m.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + defaultDecision := acl.Allow + if authz != nil { + defaultDecision = authz.IntentionDefaultAllow(nil) + } + + sn := structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta) + index, services, err := state.IntentionTopology(ws, sn, false, defaultDecision) + if err != nil { + return err + } + + reply.Index, reply.Services = index, services + return m.srv.filterACLWithAuthorizer(authz, reply) + }) +} + // GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done { diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index dfc86d6f26..5117a0e05c 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -1885,3 +1885,124 @@ service "web" { policy = "read" } require.True(t, acl.IsErrPermissionDenied(err)) }) } + +func TestInternal_IntentionUpstreams(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + codec := rpcClient(t, s1) + defer codec.Close() + + // Services: + // api and api-proxy on node foo + // web and web-proxy on node foo + // + // Intentions + // * -> * (deny) intention + // web -> api (allow) + registetIntentionUpstreamEntries(t, codec, "") + + t.Run("web", func(t *testing.T) { + retry.Run(t, func(r *retry.R) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + var out structs.IndexedServiceList + require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.IntentionUpstreams", &args, &out)) + + // foo/api + require.Len(r, out.Services, 1) + + expectUp := structs.ServiceList{ + structs.NewServiceName("api", structs.DefaultEnterpriseMeta()), + } + require.Equal(r, expectUp, out.Services) + }) + }) +} + +func TestInternal_IntentionUpstreams_ACL(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = TestDefaultMasterToken + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + codec := rpcClient(t, s1) + defer codec.Close() + + // Services: + // api and api-proxy on node foo + // web and web-proxy on node foo + // + // Intentions + // * -> * (deny) intention + // web -> api (allow) + registetIntentionUpstreamEntries(t, codec, TestDefaultMasterToken) + + t.Run("valid token", func(t *testing.T) { + // Token grants read to read api service + userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", ` +service_prefix "api" { policy = "read" } +`) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + QueryOptions: structs.QueryOptions{Token: userToken.SecretID}, + } + var out structs.IndexedServiceList + require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.IntentionUpstreams", &args, &out)) + + // foo/api + require.Len(r, out.Services, 1) + + expectUp := structs.ServiceList{ + structs.NewServiceName("api", structs.DefaultEnterpriseMeta()), + } + require.Equal(r, expectUp, out.Services) + }) + }) + + t.Run("invalid token filters results", func(t *testing.T) { + // Token grants read to read an unrelated service, mongo + userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", ` +service_prefix "mongo" { policy = "read" } +`) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + QueryOptions: structs.QueryOptions{Token: userToken.SecretID}, + } + var out structs.IndexedServiceList + require.NoError(r, msgpackrpc.CallWithCodec(codec, "Internal.IntentionUpstreams", &args, &out)) + + // Token can't read api service + require.Empty(r, out.Services) + }) + }) +}