mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
proxycfg-glue: server-local implementation of ExportedPeeredServices
This is the OSS portion of enterprise PR 2377. Adds a server-local implementation of the proxycfg.ExportedPeeredServices interface that sources data from a blocking query against the server's state store.
This commit is contained in:
parent
501089292e
commit
a8df87f574
@ -4246,6 +4246,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
|||||||
sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps)
|
sources.ConfigEntry = proxycfgglue.ServerConfigEntry(deps)
|
||||||
sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps)
|
sources.ConfigEntryList = proxycfgglue.ServerConfigEntryList(deps)
|
||||||
sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache))
|
sources.CompiledDiscoveryChain = proxycfgglue.ServerCompiledDiscoveryChain(deps, proxycfgglue.CacheCompiledDiscoveryChain(a.cache))
|
||||||
|
sources.ExportedPeeredServices = proxycfgglue.ServerExportedPeeredServices(deps)
|
||||||
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
|
sources.FederationStateListMeshGateways = proxycfgglue.ServerFederationStateListMeshGateways(deps)
|
||||||
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
|
sources.GatewayServices = proxycfgglue.ServerGatewayServices(deps)
|
||||||
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||||
|
60
agent/proxycfg-glue/exported_peered_services.go
Normal file
60
agent/proxycfg-glue/exported_peered_services.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/watch"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/structs/aclfilter"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices
|
||||||
|
// interface by sourcing data from the agent cache.
|
||||||
|
func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices {
|
||||||
|
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ExportedPeeredServicesName}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerExportedPeeredServices satisifies the proxycfg.ExportedPeeredServices
|
||||||
|
// interface by sourcing data from a blocking query against the server's state
|
||||||
|
// store.
|
||||||
|
func ServerExportedPeeredServices(deps ServerDataSourceDeps) proxycfg.ExportedPeeredServices {
|
||||||
|
return &serverExportedPeeredServices{deps}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverExportedPeeredServices struct {
|
||||||
|
deps ServerDataSourceDeps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverExportedPeeredServices) Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
||||||
|
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedExportedServiceList, error) {
|
||||||
|
// TODO(peering): acls: mesh gateway needs appropriate wildcard service:read
|
||||||
|
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &structs.IndexedExportedServiceList{
|
||||||
|
Services: serviceMap,
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Backend: structs.QueryBackendBlocking,
|
||||||
|
Index: index,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
aclfilter.New(authz, s.deps.Logger).Filter(result)
|
||||||
|
|
||||||
|
return index, result, nil
|
||||||
|
},
|
||||||
|
dispatchBlockingQueryUpdate[*structs.IndexedExportedServiceList](ch),
|
||||||
|
)
|
||||||
|
}
|
112
agent/proxycfg-glue/exported_peered_services_test.go
Normal file
112
agent/proxycfg-glue/exported_peered_services_test.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServerExportedPeeredServices(t *testing.T) {
|
||||||
|
nextIndex := indexGenerator()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
|
||||||
|
for _, peer := range []string{"peer-1", "peer-2", "peer-3"} {
|
||||||
|
require.NoError(t, store.PeeringWrite(nextIndex(), &pbpeering.Peering{
|
||||||
|
ID: testUUID(t),
|
||||||
|
Name: peer,
|
||||||
|
State: pbpeering.PeeringState_ACTIVE,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(nextIndex(), &structs.ExportedServicesConfigEntry{
|
||||||
|
Name: "default",
|
||||||
|
Services: []structs.ExportedService{
|
||||||
|
{
|
||||||
|
Name: "web",
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: "peer-1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "db",
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: "peer-2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
authz := policyAuthorizer(t, `
|
||||||
|
service "web" { policy = "read" }
|
||||||
|
service "api" { policy = "read" }
|
||||||
|
service "db" { policy = "deny" }
|
||||||
|
`)
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
dataSource := ServerExportedPeeredServices(ServerDataSourceDeps{
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
ACLResolver: newStaticResolver(authz),
|
||||||
|
})
|
||||||
|
require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{}, "", eventCh))
|
||||||
|
|
||||||
|
t.Run("initial state", func(t *testing.T) {
|
||||||
|
result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh)
|
||||||
|
require.Equal(t,
|
||||||
|
map[string]structs.ServiceList{
|
||||||
|
"peer-1": {structs.NewServiceName("web", nil)},
|
||||||
|
},
|
||||||
|
result.Services,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("update exported services", func(t *testing.T) {
|
||||||
|
require.NoError(t, store.EnsureConfigEntry(nextIndex(), &structs.ExportedServicesConfigEntry{
|
||||||
|
Name: "default",
|
||||||
|
Services: []structs.ExportedService{
|
||||||
|
{
|
||||||
|
Name: "web",
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: "peer-1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "db",
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: "peer-2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "api",
|
||||||
|
Consumers: []structs.ServiceConsumer{
|
||||||
|
{PeerName: "peer-1"},
|
||||||
|
{PeerName: "peer-3"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh)
|
||||||
|
require.Equal(t,
|
||||||
|
map[string]structs.ServiceList{
|
||||||
|
"peer-1": {
|
||||||
|
structs.NewServiceName("api", nil),
|
||||||
|
structs.NewServiceName("web", nil),
|
||||||
|
},
|
||||||
|
"peer-3": {
|
||||||
|
structs.NewServiceName("api", nil),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
result.Services,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
@ -21,6 +21,7 @@ import (
|
|||||||
type Store interface {
|
type Store interface {
|
||||||
watch.StateStore
|
watch.StateStore
|
||||||
|
|
||||||
|
ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error)
|
||||||
FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error)
|
FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error)
|
||||||
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
|
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
|
||||||
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
||||||
@ -103,12 +104,6 @@ func CacheResolvedServiceConfig(c *cache.Cache) proxycfg.ResolvedServiceConfig {
|
|||||||
return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName}
|
return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices
|
|
||||||
// interface by sourcing data from the agent cache.
|
|
||||||
func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices {
|
|
||||||
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ExportedPeeredServicesName}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cacheProxyDataSource implements a generic wrapper around the agent cache to
|
// cacheProxyDataSource implements a generic wrapper around the agent cache to
|
||||||
// provide data to the proxycfg.Manager.
|
// provide data to the proxycfg.Manager.
|
||||||
type cacheProxyDataSource[ReqType cache.Request] struct {
|
type cacheProxyDataSource[ReqType cache.Request] struct {
|
||||||
|
@ -9,6 +9,14 @@ import (
|
|||||||
"github.com/hashicorp/consul/agent/proxycfg"
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func indexGenerator() func() uint64 {
|
||||||
|
var idx uint64
|
||||||
|
return func() uint64 {
|
||||||
|
idx++
|
||||||
|
return idx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getEventResult[ResultType any](t *testing.T, eventCh <-chan proxycfg.UpdateEvent) ResultType {
|
func getEventResult[ResultType any](t *testing.T, eventCh <-chan proxycfg.UpdateEvent) ResultType {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user