Move GatewayServices out of Internal

This commit is contained in:
freddygv 2020-06-11 20:05:07 -06:00
parent 166a8b2a58
commit 15c74d6943
8 changed files with 563 additions and 561 deletions

View File

@ -42,7 +42,7 @@ func (g *GatewayServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac
// Fetch // Fetch
var reply structs.IndexedGatewayServices var reply structs.IndexedGatewayServices
if err := g.RPC.RPC("Internal.GatewayServices", reqReal, &reply); err != nil { if err := g.RPC.RPC("Catalog.GatewayServices", reqReal, &reply); err != nil {
return result, err return result, err
} }

View File

@ -17,7 +17,7 @@ func TestGatewayServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value // Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments. // since that is return-by-pointer in the arguments.
var resp *structs.IndexedGatewayServices var resp *structs.IndexedGatewayServices
rpc.On("RPC", "Internal.GatewayServices", mock.Anything, mock.Anything).Return(nil). rpc.On("RPC", "Catalog.GatewayServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest) req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)

View File

@ -12,13 +12,15 @@ import (
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
bexpr "github.com/hashicorp/go-bexpr" bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
) )
// Catalog endpoint is used to manipulate the service catalog // Catalog endpoint is used to manipulate the service catalog
type Catalog struct { type Catalog struct {
srv *Server srv *Server
logger hclog.Logger
} }
// nodePreApply does the verification of a node before it is applied to Raft. // nodePreApply does the verification of a node before it is applied to Raft.
@ -590,3 +592,67 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru
return nil return nil
}) })
} }
func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error {
if done, err := c.srv.forward("Catalog.GatewayServices", args, args, reply); done {
return err
}
var authzContext acl.AuthorizerContext
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var services structs.GatewayServices
supportedGateways := []string{structs.IngressGateway, structs.TerminatingGateway}
var found bool
for _, kind := range supportedGateways {
// We only use this call to validate the RPC call, don't add the watch set
_, entry, err := state.ConfigEntry(nil, kind, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if entry != nil {
found = true
}
}
// We log a warning here to indicate that there is a potential
// misconfiguration. We explicitly do NOT return an error because this
// can occur in the course of normal operation by deleting a
// configuration entry or starting the proxy before registering the
// config entry.
if !found {
c.logger.Warn("no terminating-gateway or ingress-gateway associated with this gateway",
"gateway", args.ServiceName,
)
}
index, services, err = state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if err := c.srv.filterACL(args.Token, &services); err != nil {
return err
}
reply.Index, reply.Services = index, services
return nil
})
}

View File

@ -2742,3 +2742,495 @@ func TestCatalog_NodeServices_FilterACL(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.Equal(t, "foo", svc.ID) require.Equal(t, "foo", svc.ID)
} }
func TestCatalog_GatewayServices_TerminatingGateway(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
{
var out struct{}
// Register a service "api"
args := structs.TestRegisterRequest(t)
args.Service.Service = "api"
args.Check = &structs.HealthCheck{
Name: "api",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "db"
args = structs.TestRegisterRequest(t)
args.Service.Service = "db"
args.Check = &structs.HealthCheck{
Name: "db",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "redis"
args = structs.TestRegisterRequest(t)
args.Service.Service = "redis"
args.Check = &structs.HealthCheck{
Name: "redis",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "gateway",
Port: 443,
},
Check: &structs.HealthCheck{
Name: "gateway",
Status: api.HealthPassing,
ServiceID: "gateway",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
Services: []structs.LinkedService{
{
Name: "api",
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
SNI: "my-domain",
},
{
Name: "db",
},
{
Name: "*",
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
SNI: "my-alt-domain",
},
},
},
}
var entryResp bool
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
}
retry.Run(t, func(r *retry.R) {
// List should return all three services
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 3)
expect := structs.GatewayServices{
{
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
SNI: "my-domain",
},
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
CAFile: "",
CertFile: "",
KeyFile: "",
},
{
Service: structs.NewServiceID("redis", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
SNI: "my-alt-domain",
FromWildcard: true,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
}
func TestCatalog_GatewayServices_BothGateways(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
{
var out struct{}
// Register a service "api"
args := structs.TestRegisterRequest(t)
args.Service.Service = "api"
args.Check = &structs.HealthCheck{
Name: "api",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a terminating gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "gateway",
Port: 443,
},
Check: &structs.HealthCheck{
Name: "gateway",
Status: api.HealthPassing,
ServiceID: "gateway",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
Services: []structs.LinkedService{
{
Name: "api",
},
},
},
}
var entryResp bool
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
// Register a service "db"
args = structs.TestRegisterRequest(t)
args.Service.Service = "db"
args.Check = &structs.HealthCheck{
Name: "db",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register an ingress gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.2",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "ingress",
Port: 444,
},
Check: &structs.HealthCheck{
Name: "ingress",
Status: api.HealthPassing,
ServiceID: "ingress",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs = &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 8888,
Services: []structs.IngressService{
{Name: "db"},
},
},
},
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
}
retry.Run(t, func(r *retry.R) {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 1)
expect := structs.GatewayServices{
{
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
req.ServiceName = "ingress"
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 1)
expect = structs.GatewayServices{
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("ingress", nil),
GatewayKind: structs.ServiceKindIngressGateway,
Protocol: "tcp",
Port: 8888,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
// Test a non-gateway service being requested
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
}
var resp structs.IndexedGatewayServices
err := msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp)
assert.NoError(t, err)
assert.Empty(t, resp.Services)
// Ensure that the index is not zero so that a blocking query still gets the
// latest GatewayServices index
assert.NotEqual(t, 0, resp.Index)
}
func TestCatalog_GatewayServices_ACLFiltering(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root"))
{
var out struct{}
// Register a service "api"
args := structs.TestRegisterRequest(t)
args.Service.Service = "api"
args.Check = &structs.HealthCheck{
Name: "api",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "db"
args = structs.TestRegisterRequest(t)
args.Service.Service = "db"
args.Check = &structs.HealthCheck{
Name: "db",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "redis"
args = structs.TestRegisterRequest(t)
args.Service.Service = "redis"
args.Check = &structs.HealthCheck{
Name: "redis",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "gateway",
Port: 443,
},
Check: &structs.HealthCheck{
Name: "gateway",
Status: api.HealthPassing,
ServiceID: "gateway",
},
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
Services: []structs.LinkedService{
{
Name: "api",
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
},
{
Name: "db",
},
{
Name: "db_replica",
},
{
Name: "*",
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
},
},
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var entryResp bool
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
}
rules := `
service_prefix "db" {
policy = "read"
}
`
svcToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
// List should return an empty list, since we do not have read on the gateway
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
QueryOptions: structs.QueryOptions{Token: svcToken.SecretID},
}
var resp structs.IndexedGatewayServices
err := msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp)
require.True(r, acl.IsErrPermissionDenied(err))
})
rules = `
service "gateway" {
policy = "read"
}
`
gwToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
// List should return an empty list, since we do not have read on db
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
QueryOptions: structs.QueryOptions{Token: gwToken.SecretID},
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 0)
})
rules = `
service_prefix "db" {
policy = "read"
}
service "gateway" {
policy = "read"
}
`
validToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
// List should return db entry since we have read on db and gateway
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
QueryOptions: structs.QueryOptions{Token: validToken.SecretID},
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Catalog.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 2)
expect := structs.GatewayServices{
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
},
{
Service: structs.NewServiceID("db_replica", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
}

View File

@ -374,67 +374,3 @@ func (m *Internal) aclAccessorID(secretID string) string {
} }
return ident.ID() return ident.ID()
} }
func (m *Internal) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error {
if done, err := m.srv.forward("Internal.GatewayServices", args, args, reply); done {
return err
}
var authzContext acl.AuthorizerContext
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var services structs.GatewayServices
supportedGateways := []string{structs.IngressGateway, structs.TerminatingGateway}
var found bool
for _, kind := range supportedGateways {
// We only use this call to validate the RPC call, don't add the watch set
_, entry, err := state.ConfigEntry(nil, kind, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if entry != nil {
found = true
}
}
// We log a warning here to indicate that there is a potential
// misconfiguration. We explicitly do NOT return an error because this
// can occur in the course of normal operation by deleting a
// configuration entry or starting the proxy before registering the
// config entry.
if !found {
m.logger.Warn("no terminating-gateway or ingress-gateway associated with this gateway",
"gateway", args.ServiceName,
)
}
index, services, err = state.GatewayServices(ws, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
if err := m.srv.filterACL(args.Token, &services); err != nil {
return err
}
reply.Index, reply.Services = index, services
return nil
})
}

View File

@ -10,7 +10,6 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
@ -657,498 +656,6 @@ func TestInternal_ServiceDump_Kind(t *testing.T) {
}) })
} }
func TestInternal_TerminatingGatewayServices(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
{
var out struct{}
// Register a service "api"
args := structs.TestRegisterRequest(t)
args.Service.Service = "api"
args.Check = &structs.HealthCheck{
Name: "api",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "db"
args = structs.TestRegisterRequest(t)
args.Service.Service = "db"
args.Check = &structs.HealthCheck{
Name: "db",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "redis"
args = structs.TestRegisterRequest(t)
args.Service.Service = "redis"
args.Check = &structs.HealthCheck{
Name: "redis",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "gateway",
Port: 443,
},
Check: &structs.HealthCheck{
Name: "gateway",
Status: api.HealthPassing,
ServiceID: "gateway",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
Services: []structs.LinkedService{
{
Name: "api",
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
SNI: "my-domain",
},
{
Name: "db",
},
{
Name: "*",
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
SNI: "my-alt-domain",
},
},
},
}
var entryResp bool
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
}
retry.Run(t, func(r *retry.R) {
// List should return all three services
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 3)
expect := structs.GatewayServices{
{
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
SNI: "my-domain",
},
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
CAFile: "",
CertFile: "",
KeyFile: "",
},
{
Service: structs.NewServiceID("redis", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
SNI: "my-alt-domain",
FromWildcard: true,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
}
func TestInternal_GatewayServices_BothGateways(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
{
var out struct{}
// Register a service "api"
args := structs.TestRegisterRequest(t)
args.Service.Service = "api"
args.Check = &structs.HealthCheck{
Name: "api",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a terminating gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "gateway",
Port: 443,
},
Check: &structs.HealthCheck{
Name: "gateway",
Status: api.HealthPassing,
ServiceID: "gateway",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
Services: []structs.LinkedService{
{
Name: "api",
},
},
},
}
var entryResp bool
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
// Register a service "db"
args = structs.TestRegisterRequest(t)
args.Service.Service = "db"
args.Check = &structs.HealthCheck{
Name: "db",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register an ingress gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.2",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "ingress",
Port: 444,
},
Check: &structs.HealthCheck{
Name: "ingress",
Status: api.HealthPassing,
ServiceID: "ingress",
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs = &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.IngressGatewayConfigEntry{
Kind: "ingress-gateway",
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 8888,
Services: []structs.IngressService{
{Name: "db"},
},
},
},
},
}
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
}
retry.Run(t, func(r *retry.R) {
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 1)
expect := structs.GatewayServices{
{
Service: structs.NewServiceID("api", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
req.ServiceName = "ingress"
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 1)
expect = structs.GatewayServices{
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("ingress", nil),
GatewayKind: structs.ServiceKindIngressGateway,
Protocol: "tcp",
Port: 8888,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
// Test a non-gateway service being requested
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
}
var resp structs.IndexedGatewayServices
err := msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)
assert.NoError(t, err)
assert.Empty(t, resp.Services)
// Ensure that the index is not zero so that a blocking query still gets the
// latest GatewayServices index
assert.NotEqual(t, 0, resp.Index)
}
func TestInternal_GatewayServices_ACLFiltering(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1", testrpc.WithToken("root"))
{
var out struct{}
// Register a service "api"
args := structs.TestRegisterRequest(t)
args.Service.Service = "api"
args.Check = &structs.HealthCheck{
Name: "api",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "db"
args = structs.TestRegisterRequest(t)
args.Service.Service = "db"
args.Check = &structs.HealthCheck{
Name: "db",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a service "redis"
args = structs.TestRegisterRequest(t)
args.Service.Service = "redis"
args.Check = &structs.HealthCheck{
Name: "redis",
Status: api.HealthPassing,
ServiceID: args.Service.Service,
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
// Register a gateway
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Kind: structs.ServiceKindTerminatingGateway,
Service: "gateway",
Port: 443,
},
Check: &structs.HealthCheck{
Name: "gateway",
Status: api.HealthPassing,
ServiceID: "gateway",
},
}
args.Token = "root"
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &args, &out))
entryArgs := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.TerminatingGatewayConfigEntry{
Kind: "terminating-gateway",
Name: "gateway",
Services: []structs.LinkedService{
{
Name: "api",
CAFile: "api/ca.crt",
CertFile: "api/client.crt",
KeyFile: "api/client.key",
},
{
Name: "db",
},
{
Name: "db_replica",
},
{
Name: "*",
CAFile: "ca.crt",
CertFile: "client.crt",
KeyFile: "client.key",
},
},
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var entryResp bool
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &entryArgs, &entryResp))
}
rules := `
service_prefix "db" {
policy = "read"
}
`
svcToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
// List should return an empty list, since we do not have read on the gateway
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
QueryOptions: structs.QueryOptions{Token: svcToken.SecretID},
}
var resp structs.IndexedGatewayServices
err := msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp)
require.True(r, acl.IsErrPermissionDenied(err))
})
rules = `
service "gateway" {
policy = "read"
}
`
gwToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
// List should return an empty list, since we do not have read on db
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
QueryOptions: structs.QueryOptions{Token: gwToken.SecretID},
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 0)
})
rules = `
service_prefix "db" {
policy = "read"
}
service "gateway" {
policy = "read"
}
`
validToken, err := upsertTestTokenWithPolicyRules(codec, "root", "dc1", rules)
require.NoError(t, err)
retry.Run(t, func(r *retry.R) {
// List should return db entry since we have read on db and gateway
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "gateway",
QueryOptions: structs.QueryOptions{Token: validToken.SecretID},
}
var resp structs.IndexedGatewayServices
assert.Nil(r, msgpackrpc.CallWithCodec(codec, "Internal.GatewayServices", &req, &resp))
assert.Len(r, resp.Services, 2)
expect := structs.GatewayServices{
{
Service: structs.NewServiceID("db", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
},
{
Service: structs.NewServiceID("db_replica", nil),
Gateway: structs.NewServiceID("gateway", nil),
GatewayKind: structs.ServiceKindTerminatingGateway,
},
}
// Ignore raft index for equality
for _, s := range resp.Services {
s.RaftIndex = structs.RaftIndex{}
}
assert.Equal(r, expect, resp.Services)
})
}
func TestInternal_GatewayServiceDump_Terminating(t *testing.T) { func TestInternal_GatewayServiceDump_Terminating(t *testing.T) {
t.Parallel() t.Parallel()
dir1, s1 := testServer(t) dir1, s1 := testServer(t)

View File

@ -6,7 +6,7 @@ import (
func init() { func init() {
registerEndpoint(func(s *Server) interface{} { return &ACL{s, s.loggers.Named(logging.ACL)} }) registerEndpoint(func(s *Server) interface{} { return &ACL{s, s.loggers.Named(logging.ACL)} })
registerEndpoint(func(s *Server) interface{} { return &Catalog{s} }) registerEndpoint(func(s *Server) interface{} { return &Catalog{s, s.loggers.Named(logging.Catalog)} })
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s, s.logger) }) registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s, s.logger) })
registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} }) registerEndpoint(func(s *Server) interface{} { return &ConfigEntry{s} })
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} }) registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} })

View File

@ -9,6 +9,7 @@ const (
AWS string = "aws" AWS string = "aws"
Azure string = "azure" Azure string = "azure"
CA string = "ca" CA string = "ca"
Catalog string = "catalog"
CentralConfig string = "central_config" CentralConfig string = "central_config"
ConfigEntry string = "config_entry" ConfigEntry string = "config_entry"
Connect string = "connect" Connect string = "connect"