mirror of https://github.com/status-im/consul.git
proxycfg-glue: server-local implementation of `GatewayServices`
This is the OSS portion of enterprise PR 2259. This PR provides a server-local implementation of the proxycfg.GatewayServices interface based on blocking queries.
This commit is contained in:
parent
15a319dbfe
commit
ccc672013e
|
@ -4247,6 +4247,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
||||||
sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps)
|
sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps)
|
||||||
sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps)
|
sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps)
|
||||||
sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache))
|
sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache))
|
||||||
|
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
|
||||||
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/watch"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/structs/aclfilter"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CacheGatewayServices satisfies the proxycfg.GatewayServices interface by
|
||||||
|
// sourcing data from the agent cache.
|
||||||
|
func CacheGatewayServices(c *cache.Cache) proxycfg.GatewayServices {
|
||||||
|
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.GatewayServicesName}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerGatewayServices satisfies the proxycfg.GatewayServices interface by
|
||||||
|
// sourcing data from a blocking query against the server's state store.
|
||||||
|
func ServerGatewayServices(deps ServerDataSourceDeps) proxycfg.GatewayServices {
|
||||||
|
return &serverGatewayServices{deps}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverGatewayServices struct {
|
||||||
|
deps ServerDataSourceDeps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverGatewayServices) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
||||||
|
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedGatewayServices, error) {
|
||||||
|
var authzContext acl.AuthorizerContext
|
||||||
|
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, &authzContext)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(req.ServiceName, &authzContext); err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
index, services, err := store.GatewayServices(ws, req.ServiceName, &req.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &structs.IndexedGatewayServices{
|
||||||
|
Services: services,
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Backend: structs.QueryBackendBlocking,
|
||||||
|
Index: index,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
aclfilter.New(authz, s.deps.Logger).Filter(response)
|
||||||
|
|
||||||
|
return index, response, nil
|
||||||
|
},
|
||||||
|
dispatchBlockingQueryUpdate[*structs.IndexedGatewayServices](ch),
|
||||||
|
)
|
||||||
|
}
|
|
@ -0,0 +1,155 @@
|
||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServerGatewayServices(t *testing.T) {
|
||||||
|
const index uint64 = 123
|
||||||
|
|
||||||
|
t.Run("ingress gateway", func(t *testing.T) {
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
|
||||||
|
authz := policyAuthorizer(t, `
|
||||||
|
service "igw" { policy = "read" }
|
||||||
|
service "web" { policy = "read" }
|
||||||
|
service "db" { policy = "read" }
|
||||||
|
`)
|
||||||
|
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(index, &structs.IngressGatewayConfigEntry{
|
||||||
|
Name: "igw",
|
||||||
|
Listeners: []structs.IngressListener{
|
||||||
|
{
|
||||||
|
Protocol: "tcp",
|
||||||
|
Services: []structs.IngressService{
|
||||||
|
{Name: "web"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Protocol: "tcp",
|
||||||
|
Services: []structs.IngressService{
|
||||||
|
{Name: "db"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Protocol: "tcp",
|
||||||
|
Services: []structs.IngressService{
|
||||||
|
{Name: "no-access"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
dataSource := ServerGatewayServices(ServerDataSourceDeps{
|
||||||
|
ACLResolver: newStaticResolver(authz),
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
})
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
require.NoError(t, dataSource.Notify(context.Background(), &structs.ServiceSpecificRequest{ServiceName: "igw"}, "", eventCh))
|
||||||
|
|
||||||
|
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||||
|
result := getEventResult[*structs.IndexedGatewayServices](t, eventCh)
|
||||||
|
require.Len(t, result.Services, 2)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "remove service mapping", func(t *testing.T) {
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(index+1, &structs.IngressGatewayConfigEntry{
|
||||||
|
Name: "igw",
|
||||||
|
Listeners: []structs.IngressListener{
|
||||||
|
{
|
||||||
|
Protocol: "tcp",
|
||||||
|
Services: []structs.IngressService{
|
||||||
|
{Name: "web"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
result := getEventResult[*structs.IndexedGatewayServices](t, eventCh)
|
||||||
|
require.Len(t, result.Services, 1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("terminating gateway", func(t *testing.T) {
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
|
||||||
|
authz := policyAuthorizer(t, `
|
||||||
|
service "tgw" { policy = "read" }
|
||||||
|
service "web" { policy = "read" }
|
||||||
|
service "db" { policy = "read" }
|
||||||
|
`)
|
||||||
|
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(index, &structs.TerminatingGatewayConfigEntry{
|
||||||
|
Name: "tgw",
|
||||||
|
Services: []structs.LinkedService{
|
||||||
|
{Name: "web"},
|
||||||
|
{Name: "db"},
|
||||||
|
{Name: "no-access"},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
dataSource := ServerGatewayServices(ServerDataSourceDeps{
|
||||||
|
ACLResolver: newStaticResolver(authz),
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
})
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
require.NoError(t, dataSource.Notify(context.Background(), &structs.ServiceSpecificRequest{ServiceName: "tgw"}, "", eventCh))
|
||||||
|
|
||||||
|
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||||
|
result := getEventResult[*structs.IndexedGatewayServices](t, eventCh)
|
||||||
|
require.Len(t, result.Services, 2)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "remove service mapping", func(t *testing.T) {
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(index+1, &structs.TerminatingGatewayConfigEntry{
|
||||||
|
Name: "tgw",
|
||||||
|
Services: []structs.LinkedService{
|
||||||
|
{Name: "web"},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
result := getEventResult[*structs.IndexedGatewayServices](t, eventCh)
|
||||||
|
require.Len(t, result.Services, 1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("no access to gateway", func(t *testing.T) {
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
|
||||||
|
authz := policyAuthorizer(t, `
|
||||||
|
service "tgw" { policy = "deny" }
|
||||||
|
service "web" { policy = "read" }
|
||||||
|
service "db" { policy = "read" }
|
||||||
|
`)
|
||||||
|
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(index, &structs.TerminatingGatewayConfigEntry{
|
||||||
|
Name: "tgw",
|
||||||
|
Services: []structs.LinkedService{
|
||||||
|
{Name: "web"},
|
||||||
|
{Name: "db"},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
dataSource := ServerGatewayServices(ServerDataSourceDeps{
|
||||||
|
ACLResolver: newStaticResolver(authz),
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
})
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
require.NoError(t, dataSource.Notify(context.Background(), &structs.ServiceSpecificRequest{ServiceName: "tgw"}, "", eventCh))
|
||||||
|
|
||||||
|
err := getEventError(t, eventCh)
|
||||||
|
require.True(t, acl.IsErrPermissionDenied(err), "expected permission denied error")
|
||||||
|
})
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import (
|
||||||
type Store interface {
|
type Store interface {
|
||||||
watch.StateStore
|
watch.StateStore
|
||||||
|
|
||||||
|
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
|
||||||
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
||||||
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
|
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
|
||||||
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
||||||
|
@ -58,12 +59,6 @@ func CacheFederationStateListMeshGateways(c *cache.Cache) proxycfg.FederationSta
|
||||||
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName}
|
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheGatewayServices satisfies the proxycfg.GatewayServices interface by
|
|
||||||
// sourcing data from the agent cache.
|
|
||||||
func CacheGatewayServices(c *cache.Cache) proxycfg.GatewayServices {
|
|
||||||
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.GatewayServicesName}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing
|
// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing
|
||||||
// data from the agent cache.
|
// data from the agent cache.
|
||||||
func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {
|
func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {
|
||||||
|
|
|
@ -32,3 +32,17 @@ func expectNoEvent(t *testing.T, eventCh <-chan proxycfg.UpdateEvent) {
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-time.After(100 * time.Millisecond):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getEventError(t *testing.T, eventCh <-chan proxycfg.UpdateEvent) error {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case event := <-eventCh:
|
||||||
|
require.Error(t, event.Err)
|
||||||
|
return event.Err
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for event")
|
||||||
|
}
|
||||||
|
|
||||||
|
panic("this should never be reached")
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue