proxycfg-glue: server-local implementation of `FederationStateListMeshGateways`

This is the OSS portion of enterprise PR 2265.

This PR provides a server-local implementation of the
proxycfg.FederationStateListMeshGateways interface based on blocking queries.
This commit is contained in:
Daniel Upton 2022-07-12 11:43:42 +01:00 committed by Dan Upton
parent ccc672013e
commit 3d74efa8ad
4 changed files with 172 additions and 6 deletions

View File

@ -4247,6 +4247,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps) sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps)
sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps) sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps)
sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache)) sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache))
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps) sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Intentions = proxycfgglue.ServerIntentions(deps) sources.Intentions = proxycfgglue.ServerIntentions(deps)

View File

@ -0,0 +1,67 @@
package proxycfgglue
import (
"context"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/structs/aclfilter"
)
// CacheFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways
// interface by sourcing data from the agent cache.
func CacheFederationStateListMeshGateways(c *cache.Cache) proxycfg.FederationStateListMeshGateways {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName}
}
// ServerFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways
// interface by sourcing data from a blocking query against the server's state
// store.
func ServerFederationStateListMeshGateways(deps ServerDataSourceDeps) proxycfg.FederationStateListMeshGateways {
return &serverFederationStateListMeshGateways{deps}
}
type serverFederationStateListMeshGateways struct {
deps ServerDataSourceDeps
}
func (s *serverFederationStateListMeshGateways) Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.DatacenterIndexedCheckServiceNodes, error) {
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil)
if err != nil {
return 0, nil, err
}
index, fedStates, err := store.FederationStateList(ws)
if err != nil {
return 0, nil, err
}
results := make(map[string]structs.CheckServiceNodes)
for _, fs := range fedStates {
if gws := fs.MeshGateways; len(gws) != 0 {
// Shallow clone to prevent ACL filtering manipulating the slice in memdb.
results[fs.Datacenter] = gws.ShallowClone()
}
}
rsp := &structs.DatacenterIndexedCheckServiceNodes{
DatacenterNodes: results,
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendBlocking,
},
}
aclfilter.New(authz, s.deps.Logger).Filter(rsp)
return index, rsp, nil
},
dispatchBlockingQueryUpdate[*structs.DatacenterIndexedCheckServiceNodes](ch),
)
}

View File

@ -0,0 +1,103 @@
package proxycfgglue
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestServerFederationStateListMeshGateways(t *testing.T) {
const index uint64 = 123
store := state.NewStateStore(nil)
authz := policyAuthorizer(t, `
service_prefix "dc2-" { policy = "read" }
node_prefix "dc2-" { policy = "read" }
service_prefix "dc3-" { policy = "read" }
node_prefix "dc3-" { policy = "read" }
`)
require.NoError(t, store.FederationStateSet(index, &structs.FederationState{
Datacenter: "dc2",
MeshGateways: structs.CheckServiceNodes{
{
Service: &structs.NodeService{Service: "dc2-gw1"},
Node: &structs.Node{Node: "dc2-gw1"},
},
},
}))
// No access to this DC, we shouldn't see it in results.
require.NoError(t, store.FederationStateSet(index, &structs.FederationState{
Datacenter: "dc4",
MeshGateways: structs.CheckServiceNodes{
{
Service: &structs.NodeService{Service: "dc4-gw1"},
Node: &structs.Node{Node: "dc4-gw1"},
},
},
}))
dataSource := ServerFederationStateListMeshGateways(ServerDataSourceDeps{
ACLResolver: newStaticResolver(authz),
GetStore: func() Store { return store },
})
eventCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(context.Background(), &structs.DCSpecificRequest{Datacenter: "dc1"}, "", eventCh))
testutil.RunStep(t, "initial state", func(t *testing.T) {
result := getEventResult[*structs.DatacenterIndexedCheckServiceNodes](t, eventCh)
require.Equal(t, map[string]structs.CheckServiceNodes{
"dc2": {
{
Service: &structs.NodeService{Service: "dc2-gw1"},
Node: &structs.Node{Node: "dc2-gw1"},
},
},
}, result.DatacenterNodes)
})
testutil.RunStep(t, "add new datacenter", func(t *testing.T) {
require.NoError(t, store.FederationStateSet(index+1, &structs.FederationState{
Datacenter: "dc3",
MeshGateways: structs.CheckServiceNodes{
{
Service: &structs.NodeService{Service: "dc3-gw1"},
Node: &structs.Node{Node: "dc3-gw1"},
},
},
}))
result := getEventResult[*structs.DatacenterIndexedCheckServiceNodes](t, eventCh)
require.Equal(t, map[string]structs.CheckServiceNodes{
"dc2": {
{
Service: &structs.NodeService{Service: "dc2-gw1"},
Node: &structs.Node{Node: "dc2-gw1"},
},
},
"dc3": {
{
Service: &structs.NodeService{Service: "dc3-gw1"},
Node: &structs.Node{Node: "dc3-gw1"},
},
},
}, result.DatacenterNodes)
})
testutil.RunStep(t, "delete datacenter", func(t *testing.T) {
require.NoError(t, store.FederationStateDelete(index+2, "dc3"))
result := getEventResult[*structs.DatacenterIndexedCheckServiceNodes](t, eventCh)
require.NotContains(t, result.DatacenterNodes, "dc3")
})
}

View File

@ -21,6 +21,7 @@ import (
type Store interface { type Store interface {
watch.StateStore watch.StateStore
FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error)
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error) IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error) ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
@ -53,12 +54,6 @@ func CacheDatacenters(c *cache.Cache) proxycfg.Datacenters {
return &cacheProxyDataSource[*structs.DatacentersRequest]{c, cachetype.CatalogDatacentersName} return &cacheProxyDataSource[*structs.DatacentersRequest]{c, cachetype.CatalogDatacentersName}
} }
// CacheFederationStateListMeshGateways satisfies the proxycfg.FederationStateListMeshGateways
// interface by sourcing data from the agent cache.
func CacheFederationStateListMeshGateways(c *cache.Cache) proxycfg.FederationStateListMeshGateways {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.FederationStateListMeshGatewaysName}
}
// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing // CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing
// data from the agent cache. // data from the agent cache.
func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks { func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {