mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
proxycfg-glue: server-local implementation of InternalServiceDump
This is the OSS portion of enterprise PR 2489. This PR introduces a server-local implementation of the proxycfg.InternalServiceDump interface that sources data from a blocking query against the server's state store. For simplicity, it only implements the subset of the Internal.ServiceDump RPC handler actually used by proxycfg - as such the result type has been changed to IndexedCheckServiceNodes to avoid confusion.
This commit is contained in:
parent
a31738f76f
commit
f8dba7e9ac
@ -4268,6 +4268,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
|||||||
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||||
|
sources.InternalServiceDump = proxycfgglue.ServerInternalServiceDump(deps, proxycfgglue.CacheInternalServiceDump(a.cache))
|
||||||
sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps)
|
sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps)
|
||||||
sources.ResolvedServiceConfig = proxycfgglue.ServerResolvedServiceConfig(deps, proxycfgglue.CacheResolvedServiceConfig(a.cache))
|
sources.ResolvedServiceConfig = proxycfgglue.ServerResolvedServiceConfig(deps, proxycfgglue.CacheResolvedServiceConfig(a.cache))
|
||||||
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
|
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
|
||||||
|
@ -42,6 +42,7 @@ type Store interface {
|
|||||||
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
||||||
ReadResolvedServiceConfigEntries(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, upstreamIDs []structs.ServiceID, proxyMode structs.ProxyMode) (uint64, *configentry.ResolvedServiceConfigSet, error)
|
ReadResolvedServiceConfigEntries(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, upstreamIDs []structs.ServiceID, proxyMode structs.ProxyMode) (uint64, *configentry.ResolvedServiceConfigSet, error)
|
||||||
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
|
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
|
||||||
|
ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
||||||
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
|
||||||
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||||
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||||
@ -87,12 +88,6 @@ func CacheIntentionUpstreamsDestination(c *cache.Cache) proxycfg.IntentionUpstre
|
|||||||
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsDestinationName}
|
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsDestinationName}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CacheInternalServiceDump satisfies the proxycfg.InternalServiceDump
|
|
||||||
// interface by sourcing data from the agent cache.
|
|
||||||
func CacheInternalServiceDump(c *cache.Cache) proxycfg.InternalServiceDump {
|
|
||||||
return &cacheProxyDataSource[*structs.ServiceDumpRequest]{c, cachetype.InternalServiceDumpName}
|
|
||||||
}
|
|
||||||
|
|
||||||
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
|
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
|
||||||
// sourcing data from the agent cache.
|
// sourcing data from the agent cache.
|
||||||
//
|
//
|
||||||
|
99
agent/proxycfg-glue/internal_service_dump.go
Normal file
99
agent/proxycfg-glue/internal_service_dump.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-bexpr"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/watch"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/structs/aclfilter"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CacheInternalServiceDump satisfies the proxycfg.InternalServiceDump
|
||||||
|
// interface by sourcing data from the agent cache.
|
||||||
|
func CacheInternalServiceDump(c *cache.Cache) proxycfg.InternalServiceDump {
|
||||||
|
return &cacheInternalServiceDump{c}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cacheInternalServiceDump wraps the underlying cache-type to return a simpler
|
||||||
|
// subset of the response (as this is all we use in proxycfg).
|
||||||
|
type cacheInternalServiceDump struct {
|
||||||
|
c *cache.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cacheInternalServiceDump) Notify(ctx context.Context, req *structs.ServiceDumpRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
dispatch := dispatchCacheUpdate(ch)
|
||||||
|
|
||||||
|
return c.c.NotifyCallback(ctx, cachetype.InternalServiceDumpName, req, correlationID,
|
||||||
|
func(ctx context.Context, event cache.UpdateEvent) {
|
||||||
|
if r, _ := event.Result.(*structs.IndexedNodesWithGateways); r != nil {
|
||||||
|
event.Result = &structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: r.Nodes,
|
||||||
|
QueryMeta: r.QueryMeta,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dispatch(ctx, event)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerInternalServiceDump satisfies the proxycfg.InternalServiceDump
|
||||||
|
// interface by sourcing data from a blocking query against the server's
|
||||||
|
// state store.
|
||||||
|
func ServerInternalServiceDump(deps ServerDataSourceDeps, remoteSource proxycfg.InternalServiceDump) proxycfg.InternalServiceDump {
|
||||||
|
return &serverInternalServiceDump{deps, remoteSource}
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverInternalServiceDump struct {
|
||||||
|
deps ServerDataSourceDeps
|
||||||
|
remoteSource proxycfg.InternalServiceDump
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverInternalServiceDump) Notify(ctx context.Context, req *structs.ServiceDumpRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
if req.Datacenter != s.deps.Datacenter {
|
||||||
|
return s.remoteSource.Notify(ctx, req, correlationID, ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
filter, err := bexpr.CreateFilter(req.Filter, nil, structs.CheckServiceNodes{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is just the small subset of the Internal.ServiceDump RPC handler used
|
||||||
|
// by proxycfg.
|
||||||
|
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
|
||||||
|
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedCheckServiceNodes, error) {
|
||||||
|
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
idx, nodes, err := store.ServiceDump(ws, req.ServiceKind, req.UseServiceKind, &req.EnterpriseMeta, structs.DefaultPeerKeyword)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, err := filter.Execute(nodes)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("could not filter local service dump: %w", err)
|
||||||
|
}
|
||||||
|
nodes = raw.(structs.CheckServiceNodes)
|
||||||
|
|
||||||
|
aclfilter.New(authz, s.deps.Logger).Filter(&nodes)
|
||||||
|
|
||||||
|
return idx, &structs.IndexedCheckServiceNodes{
|
||||||
|
Nodes: nodes,
|
||||||
|
QueryMeta: structs.QueryMeta{
|
||||||
|
Index: idx,
|
||||||
|
Backend: structs.QueryBackendBlocking,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
dispatchBlockingQueryUpdate[*structs.IndexedCheckServiceNodes](ch),
|
||||||
|
)
|
||||||
|
}
|
139
agent/proxycfg-glue/internal_service_dump_test.go
Normal file
139
agent/proxycfg-glue/internal_service_dump_test.go
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
package proxycfgglue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServerInternalServiceDump(t *testing.T) {
|
||||||
|
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
var (
|
||||||
|
req = &structs.ServiceDumpRequest{Datacenter: "dc2"}
|
||||||
|
correlationID = "correlation-id"
|
||||||
|
ch = make(chan<- proxycfg.UpdateEvent)
|
||||||
|
result = errors.New("KABOOM")
|
||||||
|
)
|
||||||
|
|
||||||
|
remoteSource := newMockInternalServiceDump(t)
|
||||||
|
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
|
||||||
|
|
||||||
|
dataSource := ServerInternalServiceDump(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
|
||||||
|
err := dataSource.Notify(ctx, req, correlationID, ch)
|
||||||
|
require.Equal(t, result, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("local queries are served from the state store", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
nextIndex := indexGenerator()
|
||||||
|
|
||||||
|
store := state.NewStateStore(nil)
|
||||||
|
|
||||||
|
services := []*structs.NodeService{
|
||||||
|
{
|
||||||
|
Service: "mgw",
|
||||||
|
Kind: structs.ServiceKindMeshGateway,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Service: "web",
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Service: "db",
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for idx, service := range services {
|
||||||
|
require.NoError(t, store.EnsureRegistration(nextIndex(), &structs.RegisterRequest{
|
||||||
|
Node: fmt.Sprintf("node-%d", idx),
|
||||||
|
Service: service,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
authz := newStaticResolver(
|
||||||
|
policyAuthorizer(t, `
|
||||||
|
service "mgw" { policy = "read" }
|
||||||
|
service "web" { policy = "read" }
|
||||||
|
service "db" { policy = "read" }
|
||||||
|
node_prefix "node-" { policy = "read" }
|
||||||
|
`),
|
||||||
|
)
|
||||||
|
|
||||||
|
dataSource := ServerInternalServiceDump(ServerDataSourceDeps{
|
||||||
|
GetStore: func() Store { return store },
|
||||||
|
ACLResolver: authz,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
t.Run("filter by kind", func(t *testing.T) {
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceDumpRequest{
|
||||||
|
ServiceKind: structs.ServiceKindMeshGateway,
|
||||||
|
UseServiceKind: true,
|
||||||
|
}, "", eventCh))
|
||||||
|
|
||||||
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||||
|
require.Len(t, result.Nodes, 1)
|
||||||
|
require.Equal(t, "mgw", result.Nodes[0].Service.Service)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("bexpr filtering", func(t *testing.T) {
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceDumpRequest{
|
||||||
|
QueryOptions: structs.QueryOptions{Filter: `Service.Service == "web"`},
|
||||||
|
}, "", eventCh))
|
||||||
|
|
||||||
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||||
|
require.Len(t, result.Nodes, 1)
|
||||||
|
require.Equal(t, "web", result.Nodes[0].Service.Service)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("all services", func(t *testing.T) {
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceDumpRequest{}, "", eventCh))
|
||||||
|
|
||||||
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||||
|
require.Len(t, result.Nodes, 3)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("access denied", func(t *testing.T) {
|
||||||
|
authz.SwapAuthorizer(acl.DenyAll())
|
||||||
|
|
||||||
|
eventCh := make(chan proxycfg.UpdateEvent)
|
||||||
|
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceDumpRequest{}, "", eventCh))
|
||||||
|
|
||||||
|
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh)
|
||||||
|
require.Empty(t, result.Nodes)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockInternalServiceDump(t *testing.T) *mockInternalServiceDump {
|
||||||
|
mock := &mockInternalServiceDump{}
|
||||||
|
mock.Mock.Test(t)
|
||||||
|
|
||||||
|
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||||
|
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockInternalServiceDump struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockInternalServiceDump) Notify(ctx context.Context, req *structs.ServiceDumpRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
|
||||||
|
return m.Called(ctx, req, correlationID, ch).Error(0)
|
||||||
|
}
|
@ -91,8 +91,8 @@ type DataSources struct {
|
|||||||
// notification channel.
|
// notification channel.
|
||||||
IntentionUpstreamsDestination IntentionUpstreamsDestination
|
IntentionUpstreamsDestination IntentionUpstreamsDestination
|
||||||
|
|
||||||
// InternalServiceDump provides updates about a (gateway) service on a
|
// InternalServiceDump provides updates about services of a given kind (e.g.
|
||||||
// notification channel.
|
// mesh gateways) on a notification channel.
|
||||||
InternalServiceDump InternalServiceDump
|
InternalServiceDump InternalServiceDump
|
||||||
|
|
||||||
// LeafCertificate provides updates about the service's leaf certificate on a
|
// LeafCertificate provides updates about the service's leaf certificate on a
|
||||||
@ -203,8 +203,8 @@ type IntentionUpstreamsDestination interface {
|
|||||||
Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
|
Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// InternalServiceDump is the interface used to consume updates about a (gateway)
|
// InternalServiceDump is the interface used to consume updates about services
|
||||||
// service via the internal ServiceDump RPC.
|
// of a given kind (e.g. mesh gateways).
|
||||||
type InternalServiceDump interface {
|
type InternalServiceDump interface {
|
||||||
Notify(ctx context.Context, req *structs.ServiceDumpRequest, correlationID string, ch chan<- UpdateEvent) error
|
Notify(ctx context.Context, req *structs.ServiceDumpRequest, correlationID string, ch chan<- UpdateEvent) error
|
||||||
}
|
}
|
||||||
|
@ -491,7 +491,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||||||
}
|
}
|
||||||
|
|
||||||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||||
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
}
|
}
|
||||||
|
@ -927,7 +927,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||||||
events: []UpdateEvent{
|
events: []UpdateEvent{
|
||||||
{
|
{
|
||||||
CorrelationID: "mesh-gateway:dc4",
|
CorrelationID: "mesh-gateway:dc4",
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC4Hostname(t),
|
Nodes: TestGatewayNodesDC4Hostname(t),
|
||||||
},
|
},
|
||||||
Err: nil,
|
Err: nil,
|
||||||
|
@ -974,7 +974,7 @@ func NewTestDataSources() *TestDataSources {
|
|||||||
Intentions: NewTestDataSource[*structs.ServiceSpecificRequest, structs.Intentions](),
|
Intentions: NewTestDataSource[*structs.ServiceSpecificRequest, structs.Intentions](),
|
||||||
IntentionUpstreams: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
|
IntentionUpstreams: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
|
||||||
IntentionUpstreamsDestination: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
|
IntentionUpstreamsDestination: NewTestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList](),
|
||||||
InternalServiceDump: NewTestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways](),
|
InternalServiceDump: NewTestDataSource[*structs.ServiceDumpRequest, *structs.IndexedCheckServiceNodes](),
|
||||||
LeafCertificate: NewTestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert](),
|
LeafCertificate: NewTestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert](),
|
||||||
PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](),
|
PreparedQuery: NewTestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse](),
|
||||||
ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](),
|
ResolvedServiceConfig: NewTestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse](),
|
||||||
@ -1000,7 +1000,7 @@ type TestDataSources struct {
|
|||||||
Intentions *TestDataSource[*structs.ServiceSpecificRequest, structs.Intentions]
|
Intentions *TestDataSource[*structs.ServiceSpecificRequest, structs.Intentions]
|
||||||
IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
|
IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
|
||||||
IntentionUpstreamsDestination *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
|
IntentionUpstreamsDestination *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
|
||||||
InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways]
|
InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedCheckServiceNodes]
|
||||||
LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert]
|
LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert]
|
||||||
PeeredUpstreams *TestDataSource[*structs.PartitionSpecificRequest, *structs.IndexedPeeredServiceList]
|
PeeredUpstreams *TestDataSource[*structs.PartitionSpecificRequest, *structs.IndexedPeeredServiceList]
|
||||||
PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse]
|
PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse]
|
||||||
|
@ -316,19 +316,19 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
|
|||||||
baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{
|
baseEvents = testSpliceEvents(baseEvents, []UpdateEvent{
|
||||||
{
|
{
|
||||||
CorrelationID: "mesh-gateway:dc2",
|
CorrelationID: "mesh-gateway:dc2",
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC2(t),
|
Nodes: TestGatewayNodesDC2(t),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
CorrelationID: "mesh-gateway:dc4",
|
CorrelationID: "mesh-gateway:dc4",
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC4Hostname(t),
|
Nodes: TestGatewayNodesDC4Hostname(t),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
CorrelationID: "mesh-gateway:dc6",
|
CorrelationID: "mesh-gateway:dc6",
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC6Hostname(t),
|
Nodes: TestGatewayNodesDC6Hostname(t),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -376,7 +376,7 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
|
|||||||
// Have the cross-dc query mechanism not work for dc2 so
|
// Have the cross-dc query mechanism not work for dc2 so
|
||||||
// fedstates will infill.
|
// fedstates will infill.
|
||||||
CorrelationID: "mesh-gateway:dc2",
|
CorrelationID: "mesh-gateway:dc2",
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: nil,
|
Nodes: nil,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -69,7 +69,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
|||||||
})
|
})
|
||||||
events = append(events, UpdateEvent{
|
events = append(events, UpdateEvent{
|
||||||
CorrelationID: "mesh-gateway:dc2:" + dbUID.String(),
|
CorrelationID: "mesh-gateway:dc2:" + dbUID.String(),
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC2(t),
|
Nodes: TestGatewayNodesDC2(t),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -114,13 +114,13 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
|||||||
})
|
})
|
||||||
events = append(events, UpdateEvent{
|
events = append(events, UpdateEvent{
|
||||||
CorrelationID: "mesh-gateway:dc2:" + dbUID.String(),
|
CorrelationID: "mesh-gateway:dc2:" + dbUID.String(),
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC2(t),
|
Nodes: TestGatewayNodesDC2(t),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
events = append(events, UpdateEvent{
|
events = append(events, UpdateEvent{
|
||||||
CorrelationID: "mesh-gateway:dc3:" + dbUID.String(),
|
CorrelationID: "mesh-gateway:dc3:" + dbUID.String(),
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC3(t),
|
Nodes: TestGatewayNodesDC3(t),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -141,7 +141,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
|||||||
})
|
})
|
||||||
events = append(events, UpdateEvent{
|
events = append(events, UpdateEvent{
|
||||||
CorrelationID: "mesh-gateway:dc1:" + dbUID.String(),
|
CorrelationID: "mesh-gateway:dc1:" + dbUID.String(),
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC1(t),
|
Nodes: TestGatewayNodesDC1(t),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -168,7 +168,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
|||||||
})
|
})
|
||||||
events = append(events, UpdateEvent{
|
events = append(events, UpdateEvent{
|
||||||
CorrelationID: "mesh-gateway:dc1:" + dbUID.String(),
|
CorrelationID: "mesh-gateway:dc1:" + dbUID.String(),
|
||||||
Result: &structs.IndexedNodesWithGateways{
|
Result: &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: TestGatewayNodesDC1(t),
|
Nodes: TestGatewayNodesDC1(t),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
@ -186,7 +186,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
|
|||||||
}
|
}
|
||||||
|
|
||||||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||||
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user