diff --git a/agent/cache-types/gateway_services.go b/agent/cache-types/gateway_services.go index 96d7575baf..6e3ca380d4 100644 --- a/agent/cache-types/gateway_services.go +++ b/agent/cache-types/gateway_services.go @@ -42,7 +42,7 @@ func (g *GatewayServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac // Fetch var reply structs.IndexedGatewayServices - if err := g.RPC.RPC("Internal.GatewayServices", reqReal, &reply); err != nil { + if err := g.RPC.RPC("Catalog.GatewayServices", reqReal, &reply); err != nil { return result, err } diff --git a/agent/cache-types/gateway_services_test.go b/agent/cache-types/gateway_services_test.go index 4ac30f621d..f625add869 100644 --- a/agent/cache-types/gateway_services_test.go +++ b/agent/cache-types/gateway_services_test.go @@ -17,7 +17,7 @@ func TestGatewayServices(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedGatewayServices - rpc.On("RPC", "Internal.GatewayServices", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", "Catalog.GatewayServices", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { req := args.Get(1).(*structs.ServiceSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 8335ff8dae..b9521fe16c 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -12,13 +12,15 @@ import ( "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/types" bexpr "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" ) // Catalog endpoint is used to manipulate the service catalog type Catalog struct { - srv *Server + srv *Server + logger hclog.Logger } // nodePreApply does the verification of a node before it is applied to Raft. @@ -590,3 +592,67 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru return nil }) } + +func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error { + if done, err := c.srv.forward("Catalog.GatewayServices", args, args, reply); done { + return err + } + + var authzContext acl.AuthorizerContext + authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext) + if err != nil { + return err + } + + if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + + if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { + return acl.ErrPermissionDenied + } + + return c.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + var index uint64 + var services structs.GatewayServices + + supportedGateways := []string{structs.IngressGateway, structs.TerminatingGateway} + var found bool + for _, kind := range supportedGateways { + // We only use this call to validate the RPC call, don't add the watch set + _, entry, err := state.ConfigEntry(nil, kind, args.ServiceName, &args.EnterpriseMeta) + if err != nil { + return err + } + if entry != nil { + found = true + } + } + + // We log a warning here to indicate that there is a potential + // misconfiguration. We explicitly do NOT return an error because this + // can occur in the course of normal operation by deleting a + // configuration entry or starting the proxy before registering the + // config entry. + if !found { + c.logger.Warn("no terminating-gateway or ingress-gateway associated with this gateway", + "gateway", args.ServiceName, + ) + } + + index, services, err = state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta) + if err != nil { + return err + } + + if err := c.srv.filterACL(args.Token, &services); err != nil { + return err + } + + reply.Index, reply.Services = index, services + return nil + }) +} diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index 1e24478f8c..904be9d38c 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -2742,3 +2742,495 @@ func TestCatalog_NodeServices_FilterACL(t *testing.T) { require.True(t, ok) require.Equal(t, "foo", svc.ID) } + +func TestCatalog_GatewayServices_TerminatingGateway(t *testing.T) { + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + { + var out struct{} + + // Register a service "api" + args := structs.TestRegisterRequest(t) + args.Service.Service = "api" + args.Check = &structs.HealthCheck{ + Name: "api", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register a service "db" + args = structs.TestRegisterRequest(t) + args.Service.Service = "db" + args.Check = &structs.HealthCheck{ + Name: "db", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register a service "redis" + args = structs.TestRegisterRequest(t) + args.Service.Service = "redis" + args.Check = &structs.HealthCheck{ + Name: "redis", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register a gateway + args = &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindTerminatingGateway, + Service: "gateway", + Port: 443, + }, + Check: &structs.HealthCheck{ + Name: "gateway", + Status: api.HealthPassing, + ServiceID: "gateway", + }, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + entryArgs := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: &structs.TerminatingGatewayConfigEntry{ + Kind: "terminating-gateway", + Name: "gateway", + Services: []structs.LinkedService{ + { + Name: "api", + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + }, + { + Name: "db", + }, + { + Name: "*", + CAFile: "ca.crt", + CertFile: "client.crt", + KeyFile: "client.key", + SNI: "my-alt-domain", + }, + }, + }, + } + var entryResp bool + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) + } + + retry.Run(t, func(r *retry.R) { + // List should return all three services + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "gateway", + } + var resp structs.IndexedGatewayServices + assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp)) + assert.Len(r, resp.Services, 3) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceID("api", nil), + Gateway: structs.NewServiceID("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + SNI: "my-domain", + }, + { + Service: structs.NewServiceID("db", nil), + Gateway: structs.NewServiceID("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "", + CertFile: "", + KeyFile: "", + }, + { + Service: structs.NewServiceID("redis", nil), + Gateway: structs.NewServiceID("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + CAFile: "ca.crt", + CertFile: "client.crt", + KeyFile: "client.key", + SNI: "my-alt-domain", + FromWildcard: true, + }, + } + + // Ignore raft index for equality + for _, s := range resp.Services { + s.RaftIndex = structs.RaftIndex{} + } + assert.Equal(r, expect, resp.Services) + }) +} + +func TestCatalog_GatewayServices_BothGateways(t *testing.T) { + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + { + var out struct{} + + // Register a service "api" + args := structs.TestRegisterRequest(t) + args.Service.Service = "api" + args.Check = &structs.HealthCheck{ + Name: "api", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register a terminating gateway + args = &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindTerminatingGateway, + Service: "gateway", + Port: 443, + }, + Check: &structs.HealthCheck{ + Name: "gateway", + Status: api.HealthPassing, + ServiceID: "gateway", + }, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + entryArgs := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: &structs.TerminatingGatewayConfigEntry{ + Kind: "terminating-gateway", + Name: "gateway", + Services: []structs.LinkedService{ + { + Name: "api", + }, + }, + }, + } + var entryResp bool + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) + + // Register a service "db" + args = structs.TestRegisterRequest(t) + args.Service.Service = "db" + args.Check = &structs.HealthCheck{ + Name: "db", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register an ingress gateway + args = &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.2", + Service: &structs.NodeService{ + Kind: structs.ServiceKindTerminatingGateway, + Service: "ingress", + Port: 444, + }, + Check: &structs.HealthCheck{ + Name: "ingress", + Status: api.HealthPassing, + ServiceID: "ingress", + }, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + entryArgs = &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: &structs.IngressGatewayConfigEntry{ + Kind: "ingress-gateway", + Name: "ingress", + Listeners: []structs.IngressListener{ + { + Port: 8888, + Services: []structs.IngressService{ + {Name: "db"}, + }, + }, + }, + }, + } + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) + } + + retry.Run(t, func(r *retry.R) { + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "gateway", + } + var resp structs.IndexedGatewayServices + assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp)) + assert.Len(r, resp.Services, 1) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceID("api", nil), + Gateway: structs.NewServiceID("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + }, + } + + // Ignore raft index for equality + for _, s := range resp.Services { + s.RaftIndex = structs.RaftIndex{} + } + assert.Equal(r, expect, resp.Services) + + req.ServiceName = "ingress" + assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp)) + assert.Len(r, resp.Services, 1) + + expect = structs.GatewayServices{ + { + Service: structs.NewServiceID("db", nil), + Gateway: structs.NewServiceID("ingress", nil), + GatewayKind: structs.ServiceKindIngressGateway, + Protocol: "tcp", + Port: 8888, + }, + } + + // Ignore raft index for equality + for _, s := range resp.Services { + s.RaftIndex = structs.RaftIndex{} + } + assert.Equal(r, expect, resp.Services) + }) + + // Test a non-gateway service being requested + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "api", + } + var resp structs.IndexedGatewayServices + err := msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp) + assert.NoError(t, err) + assert.Empty(t, resp.Services) + // Ensure that the index is not zero so that a blocking query still gets the + // latest GatewayServices index + assert.NotEqual(t, 0, resp.Index) +} + +func TestCatalog_GatewayServices_ACLFiltering(t *testing.T) { + t.Parallel() + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root")) + + { + var out struct{} + + // Register a service "api" + args := structs.TestRegisterRequest(t) + args.Service.Service = "api" + args.Check = &structs.HealthCheck{ + Name: "api", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + args.Token = "root" + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register a service "db" + args = structs.TestRegisterRequest(t) + args.Service.Service = "db" + args.Check = &structs.HealthCheck{ + Name: "db", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + args.Token = "root" + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register a service "redis" + args = structs.TestRegisterRequest(t) + args.Service.Service = "redis" + args.Check = &structs.HealthCheck{ + Name: "redis", + Status: api.HealthPassing, + ServiceID: args.Service.Service, + } + args.Token = "root" + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + // Register a gateway + args = &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Kind: structs.ServiceKindTerminatingGateway, + Service: "gateway", + Port: 443, + }, + Check: &structs.HealthCheck{ + Name: "gateway", + Status: api.HealthPassing, + ServiceID: "gateway", + }, + } + args.Token = "root" + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) + + entryArgs := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: &structs.TerminatingGatewayConfigEntry{ + Kind: "terminating-gateway", + Name: "gateway", + Services: []structs.LinkedService{ + { + Name: "api", + CAFile: "api/ca.crt", + CertFile: "api/client.crt", + KeyFile: "api/client.key", + }, + { + Name: "db", + }, + { + Name: "db_replica", + }, + { + Name: "*", + CAFile: "ca.crt", + CertFile: "client.crt", + KeyFile: "client.key", + }, + }, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + + var entryResp bool + assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) + } + + rules := ` +service_prefix "db" { + policy = "read" +} +` + svcToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + // List should return an empty list, since we do not have read on the gateway + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "gateway", + QueryOptions: structs.QueryOptions{Token: svcToken.SecretID}, + } + var resp structs.IndexedGatewayServices + err := msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp) + require.True(r, acl.IsErrPermissionDenied(err)) + }) + + rules = ` +service "gateway" { + policy = "read" +} +` + gwToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + // List should return an empty list, since we do not have read on db + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "gateway", + QueryOptions: structs.QueryOptions{Token: gwToken.SecretID}, + } + var resp structs.IndexedGatewayServices + assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp)) + assert.Len(r, resp.Services, 0) + }) + + rules = ` +service_prefix "db" { + policy = "read" +} +service "gateway" { + policy = "read" +} +` + validToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules) + require.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + // List should return db entry since we have read on db and gateway + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "gateway", + QueryOptions: structs.QueryOptions{Token: validToken.SecretID}, + } + var resp structs.IndexedGatewayServices + assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp)) + assert.Len(r, resp.Services, 2) + + expect := structs.GatewayServices{ + { + Service: structs.NewServiceID("db", nil), + Gateway: structs.NewServiceID("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + }, + { + Service: structs.NewServiceID("db_replica", nil), + Gateway: structs.NewServiceID("gateway", nil), + GatewayKind: structs.ServiceKindTerminatingGateway, + }, + } + + // Ignore raft index for equality + for _, s := range resp.Services { + s.RaftIndex = structs.RaftIndex{} + } + assert.Equal(r, expect, resp.Services) + }) +} diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 0a9ea16f2c..10f259837c 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -374,67 +374,3 @@ func (m *Internal) aclAccessorID(secretID string) string { } return ident.ID() } - -func (m *Internal) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error { - if done, err := m.srv.forward("Internal.GatewayServices", args, args, reply); done { - return err - } - - var authzContext acl.AuthorizerContext - authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext) - if err != nil { - return err - } - - if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { - return err - } - - if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { - return acl.ErrPermissionDenied - } - - return m.srv.blockingQuery( - &args.QueryOptions, - &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.Store) error { - var index uint64 - var services structs.GatewayServices - - supportedGateways := []string{structs.IngressGateway, structs.TerminatingGateway} - var found bool - for _, kind := range supportedGateways { - // We only use this call to validate the RPC call, don't add the watch set - _, entry, err := state.ConfigEntry(nil, kind, args.ServiceName, &args.EnterpriseMeta) - if err != nil { - return err - } - if entry != nil { - found = true - } - } - - // We log a warning here to indicate that there is a potential - // misconfiguration. We explicitly do NOT return an error because this - // can occur in the course of normal operation by deleting a - // configuration entry or starting the proxy before registering the - // config entry. - if !found { - m.logger.Warn("no terminating-gateway or ingress-gateway associated with this gateway", - "gateway", args.ServiceName, - ) - } - - index, services, err = state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta) - if err != nil { - return err - } - - if err := m.srv.filterACL(args.Token, &services); err != nil { - return err - } - - reply.Index, reply.Services = index, services - return nil - }) -} diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 99b700e4ec..2a7dee0d90 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -10,7 +10,6 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib/stringslice" - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/types" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" @@ -657,498 +656,6 @@ func TestInternal_ServiceDump_Kind(t *testing.T) { }) } -func TestInternal_TerminatingGatewayServices(t *testing.T) { - t.Parallel() - - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - codec := rpcClient(t, s1) - defer codec.Close() - - testrpc.WaitForTestAgent(t, s1.RPC, "dc1") - { - var out struct{} - - // Register a service "api" - args := structs.TestRegisterRequest(t) - args.Service.Service = "api" - args.Check = &structs.HealthCheck{ - Name: "api", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register a service "db" - args = structs.TestRegisterRequest(t) - args.Service.Service = "db" - args.Check = &structs.HealthCheck{ - Name: "db", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register a service "redis" - args = structs.TestRegisterRequest(t) - args.Service.Service = "redis" - args.Check = &structs.HealthCheck{ - Name: "redis", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register a gateway - args = &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - Kind: structs.ServiceKindTerminatingGateway, - Service: "gateway", - Port: 443, - }, - Check: &structs.HealthCheck{ - Name: "gateway", - Status: api.HealthPassing, - ServiceID: "gateway", - }, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - entryArgs := &structs.ConfigEntryRequest{ - Op: structs.ConfigEntryUpsert, - Datacenter: "dc1", - Entry: &structs.TerminatingGatewayConfigEntry{ - Kind: "terminating-gateway", - Name: "gateway", - Services: []structs.LinkedService{ - { - Name: "api", - CAFile: "api/ca.crt", - CertFile: "api/client.crt", - KeyFile: "api/client.key", - SNI: "my-domain", - }, - { - Name: "db", - }, - { - Name: "*", - CAFile: "ca.crt", - CertFile: "client.crt", - KeyFile: "client.key", - SNI: "my-alt-domain", - }, - }, - }, - } - var entryResp bool - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) - } - - retry.Run(t, func(r *retry.R) { - // List should return all three services - req := structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "gateway", - } - var resp structs.IndexedGatewayServices - assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)) - assert.Len(r, resp.Services, 3) - - expect := structs.GatewayServices{ - { - Service: structs.NewServiceID("api", nil), - Gateway: structs.NewServiceID("gateway", nil), - GatewayKind: structs.ServiceKindTerminatingGateway, - CAFile: "api/ca.crt", - CertFile: "api/client.crt", - KeyFile: "api/client.key", - SNI: "my-domain", - }, - { - Service: structs.NewServiceID("db", nil), - Gateway: structs.NewServiceID("gateway", nil), - GatewayKind: structs.ServiceKindTerminatingGateway, - CAFile: "", - CertFile: "", - KeyFile: "", - }, - { - Service: structs.NewServiceID("redis", nil), - Gateway: structs.NewServiceID("gateway", nil), - GatewayKind: structs.ServiceKindTerminatingGateway, - CAFile: "ca.crt", - CertFile: "client.crt", - KeyFile: "client.key", - SNI: "my-alt-domain", - FromWildcard: true, - }, - } - - // Ignore raft index for equality - for _, s := range resp.Services { - s.RaftIndex = structs.RaftIndex{} - } - assert.Equal(r, expect, resp.Services) - }) -} - -func TestInternal_GatewayServices_BothGateways(t *testing.T) { - t.Parallel() - - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - codec := rpcClient(t, s1) - defer codec.Close() - - testrpc.WaitForTestAgent(t, s1.RPC, "dc1") - { - var out struct{} - - // Register a service "api" - args := structs.TestRegisterRequest(t) - args.Service.Service = "api" - args.Check = &structs.HealthCheck{ - Name: "api", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register a terminating gateway - args = &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - Kind: structs.ServiceKindTerminatingGateway, - Service: "gateway", - Port: 443, - }, - Check: &structs.HealthCheck{ - Name: "gateway", - Status: api.HealthPassing, - ServiceID: "gateway", - }, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - entryArgs := &structs.ConfigEntryRequest{ - Op: structs.ConfigEntryUpsert, - Datacenter: "dc1", - Entry: &structs.TerminatingGatewayConfigEntry{ - Kind: "terminating-gateway", - Name: "gateway", - Services: []structs.LinkedService{ - { - Name: "api", - }, - }, - }, - } - var entryResp bool - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) - - // Register a service "db" - args = structs.TestRegisterRequest(t) - args.Service.Service = "db" - args.Check = &structs.HealthCheck{ - Name: "db", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register an ingress gateway - args = &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.2", - Service: &structs.NodeService{ - Kind: structs.ServiceKindTerminatingGateway, - Service: "ingress", - Port: 444, - }, - Check: &structs.HealthCheck{ - Name: "ingress", - Status: api.HealthPassing, - ServiceID: "ingress", - }, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - entryArgs = &structs.ConfigEntryRequest{ - Op: structs.ConfigEntryUpsert, - Datacenter: "dc1", - Entry: &structs.IngressGatewayConfigEntry{ - Kind: "ingress-gateway", - Name: "ingress", - Listeners: []structs.IngressListener{ - { - Port: 8888, - Services: []structs.IngressService{ - {Name: "db"}, - }, - }, - }, - }, - } - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) - } - - retry.Run(t, func(r *retry.R) { - req := structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "gateway", - } - var resp structs.IndexedGatewayServices - assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)) - assert.Len(r, resp.Services, 1) - - expect := structs.GatewayServices{ - { - Service: structs.NewServiceID("api", nil), - Gateway: structs.NewServiceID("gateway", nil), - GatewayKind: structs.ServiceKindTerminatingGateway, - }, - } - - // Ignore raft index for equality - for _, s := range resp.Services { - s.RaftIndex = structs.RaftIndex{} - } - assert.Equal(r, expect, resp.Services) - - req.ServiceName = "ingress" - assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)) - assert.Len(r, resp.Services, 1) - - expect = structs.GatewayServices{ - { - Service: structs.NewServiceID("db", nil), - Gateway: structs.NewServiceID("ingress", nil), - GatewayKind: structs.ServiceKindIngressGateway, - Protocol: "tcp", - Port: 8888, - }, - } - - // Ignore raft index for equality - for _, s := range resp.Services { - s.RaftIndex = structs.RaftIndex{} - } - assert.Equal(r, expect, resp.Services) - }) - - // Test a non-gateway service being requested - req := structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "api", - } - var resp structs.IndexedGatewayServices - err := msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp) - assert.NoError(t, err) - assert.Empty(t, resp.Services) - // Ensure that the index is not zero so that a blocking query still gets the - // latest GatewayServices index - assert.NotEqual(t, 0, resp.Index) -} - -func TestInternal_GatewayServices_ACLFiltering(t *testing.T) { - t.Parallel() - - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.ACLDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLMasterToken = "root" - c.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root")) - - { - var out struct{} - - // Register a service "api" - args := structs.TestRegisterRequest(t) - args.Service.Service = "api" - args.Check = &structs.HealthCheck{ - Name: "api", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - args.Token = "root" - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register a service "db" - args = structs.TestRegisterRequest(t) - args.Service.Service = "db" - args.Check = &structs.HealthCheck{ - Name: "db", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - args.Token = "root" - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register a service "redis" - args = structs.TestRegisterRequest(t) - args.Service.Service = "redis" - args.Check = &structs.HealthCheck{ - Name: "redis", - Status: api.HealthPassing, - ServiceID: args.Service.Service, - } - args.Token = "root" - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - // Register a gateway - args = &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - Kind: structs.ServiceKindTerminatingGateway, - Service: "gateway", - Port: 443, - }, - Check: &structs.HealthCheck{ - Name: "gateway", - Status: api.HealthPassing, - ServiceID: "gateway", - }, - } - args.Token = "root" - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out)) - - entryArgs := &structs.ConfigEntryRequest{ - Op: structs.ConfigEntryUpsert, - Datacenter: "dc1", - Entry: &structs.TerminatingGatewayConfigEntry{ - Kind: "terminating-gateway", - Name: "gateway", - Services: []structs.LinkedService{ - { - Name: "api", - CAFile: "api/ca.crt", - CertFile: "api/client.crt", - KeyFile: "api/client.key", - }, - { - Name: "db", - }, - { - Name: "db_replica", - }, - { - Name: "*", - CAFile: "ca.crt", - CertFile: "client.crt", - KeyFile: "client.key", - }, - }, - }, - WriteRequest: structs.WriteRequest{Token: "root"}, - } - - var entryResp bool - assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp)) - } - - rules := ` -service_prefix "db" { - policy = "read" -} -` - svcToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules) - require.NoError(t, err) - - retry.Run(t, func(r *retry.R) { - // List should return an empty list, since we do not have read on the gateway - req := structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "gateway", - QueryOptions: structs.QueryOptions{Token: svcToken.SecretID}, - } - var resp structs.IndexedGatewayServices - err := msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp) - require.True(r, acl.IsErrPermissionDenied(err)) - }) - - rules = ` -service "gateway" { - policy = "read" -} -` - gwToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules) - require.NoError(t, err) - - retry.Run(t, func(r *retry.R) { - // List should return an empty list, since we do not have read on db - req := structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "gateway", - QueryOptions: structs.QueryOptions{Token: gwToken.SecretID}, - } - var resp structs.IndexedGatewayServices - assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)) - assert.Len(r, resp.Services, 0) - }) - - rules = ` -service_prefix "db" { - policy = "read" -} -service "gateway" { - policy = "read" -} -` - validToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules) - require.NoError(t, err) - - retry.Run(t, func(r *retry.R) { - // List should return db entry since we have read on db and gateway - req := structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "gateway", - QueryOptions: structs.QueryOptions{Token: validToken.SecretID}, - } - var resp structs.IndexedGatewayServices - assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)) - assert.Len(r, resp.Services, 2) - - expect := structs.GatewayServices{ - { - Service: structs.NewServiceID("db", nil), - Gateway: structs.NewServiceID("gateway", nil), - GatewayKind: structs.ServiceKindTerminatingGateway, - }, - { - Service: structs.NewServiceID("db_replica", nil), - Gateway: structs.NewServiceID("gateway", nil), - GatewayKind: structs.ServiceKindTerminatingGateway, - }, - } - - // Ignore raft index for equality - for _, s := range resp.Services { - s.RaftIndex = structs.RaftIndex{} - } - assert.Equal(r, expect, resp.Services) - }) -} - func TestInternal_GatewayServiceDump_Terminating(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index cab2e2b558..6304eeeedf 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -6,7 +6,7 @@ import ( func init() { registerEndpoint(func(s *Server) interface{} { return &ACL{s, s.loggers.Named(logging.ACL)} }) - registerEndpoint(func(s *Server) interface{} { return &Catalog{s} }) + registerEndpoint(func(s *Server) interface{} { return &Catalog{s, s.loggers.Named(logging.Catalog)} }) registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s, s.logger) }) registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} }) registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} }) diff --git a/logging/names.go b/logging/names.go index d4c5c8f07d..409dcaf499 100644 --- a/logging/names.go +++ b/logging/names.go @@ -9,6 +9,7 @@ const ( AWS string = "aws" Azure string = "azure" CA string = "ca" + Catalog string = "catalog" CentralConfig string = "central_config" ConfigEntry string = "config_entry" Connect string = "connect"