proxycfg-glue: server-local implementation of IntentionUpstreamsDestination

This is the OSS portion of enterprise PR 2463.

Generalises the serverIntentionUpstreams type to support matching on a
service or destination.
This commit is contained in:
Daniel Upton 2022-09-01 15:47:06 +01:00 committed by Dan Upton
parent f8dba7e9ac
commit 8c46e48e0d
4 changed files with 33 additions and 27 deletions

View File

@ -4268,6 +4268,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Intentions = proxycfgglue.ServerIntentions(deps) sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps) sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
sources.IntentionUpstreamsDestination = proxycfgglue.ServerIntentionUpstreamsDestination(deps)
sources.InternalServiceDump = proxycfgglue.ServerInternalServiceDump(deps, proxycfgglue.CacheInternalServiceDump(a.cache)) sources.InternalServiceDump = proxycfgglue.ServerInternalServiceDump(deps, proxycfgglue.CacheInternalServiceDump(a.cache))
sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps) sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps)
sources.ResolvedServiceConfig = proxycfgglue.ServerResolvedServiceConfig(deps, proxycfgglue.CacheResolvedServiceConfig(a.cache)) sources.ResolvedServiceConfig = proxycfgglue.ServerResolvedServiceConfig(deps, proxycfgglue.CacheResolvedServiceConfig(a.cache))

View File

@ -82,12 +82,6 @@ func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {
return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName} return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName}
} }
// CacheIntentionUpstreamsDestination satisfies the proxycfg.IntentionUpstreamsDestination interface
// by sourcing data from the agent cache.
func CacheIntentionUpstreamsDestination(c *cache.Cache) proxycfg.IntentionUpstreams {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsDestinationName}
}
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by // CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
// sourcing data from the agent cache. // sourcing data from the agent cache.
// //
@ -133,6 +127,15 @@ func dispatchCacheUpdate(ch chan<- proxycfg.UpdateEvent) cache.Callback {
} }
} }
func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) {
return func(ctx context.Context, correlationID string, result ResultType, err error) {
select {
case ch <- newUpdateEvent(correlationID, result, err):
case <-ctx.Done():
}
}
}
func newUpdateEvent(correlationID string, result any, err error) proxycfg.UpdateEvent { func newUpdateEvent(correlationID string, result any, err error) proxycfg.UpdateEvent {
// This roughly matches the logic in agent/submatview.LocalMaterializer.isTerminalError. // This roughly matches the logic in agent/submatview.LocalMaterializer.isTerminalError.
if acl.IsErrNotFound(err) { if acl.IsErrNotFound(err) {

View File

@ -14,19 +14,36 @@ import (
) )
// CacheIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface // CacheIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
// by sourcing data from the agent cache. // by sourcing upstreams for the given service, inferred from intentions, from
// the agent cache.
func CacheIntentionUpstreams(c *cache.Cache) proxycfg.IntentionUpstreams { func CacheIntentionUpstreams(c *cache.Cache) proxycfg.IntentionUpstreams {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsName} return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsName}
} }
// CacheIntentionUpstreamsDestination satisfies the proxycfg.IntentionUpstreams
// interface by sourcing upstreams for the given destination, inferred from
// intentions, from the agent cache.
func CacheIntentionUpstreamsDestination(c *cache.Cache) proxycfg.IntentionUpstreams {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsDestinationName}
}
// ServerIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface // ServerIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
// by sourcing data from a blocking query against the server's state store. // by sourcing upstreams for the given service, inferred from intentions, from
// the server's state store.
func ServerIntentionUpstreams(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams { func ServerIntentionUpstreams(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams {
return serverIntentionUpstreams{deps} return serverIntentionUpstreams{deps, structs.IntentionTargetService}
}
// ServerIntentionUpstreamsDestination satisfies the proxycfg.IntentionUpstreams
// interface by sourcing upstreams for the given destination, inferred from
// intentions, from the server's state store.
func ServerIntentionUpstreamsDestination(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams {
return serverIntentionUpstreams{deps, structs.IntentionTargetDestination}
} }
type serverIntentionUpstreams struct { type serverIntentionUpstreams struct {
deps ServerDataSourceDeps deps ServerDataSourceDeps
target structs.IntentionTargetType
} }
func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
@ -40,7 +57,7 @@ func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.Servi
} }
defaultDecision := authz.IntentionDefaultAllow(nil) defaultDecision := authz.IntentionDefaultAllow(nil)
index, services, err := store.IntentionTopology(ws, target, false, defaultDecision, structs.IntentionTargetService) index, services, err := store.IntentionTopology(ws, target, false, defaultDecision, s.target)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -59,12 +76,3 @@ func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.Servi
dispatchBlockingQueryUpdate[*structs.IndexedServiceList](ch), dispatchBlockingQueryUpdate[*structs.IndexedServiceList](ch),
) )
} }
func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) {
return func(ctx context.Context, correlationID string, result ResultType, err error) {
select {
case ch <- newUpdateEvent(correlationID, result, err):
case <-ctx.Done():
}
}
}

View File

@ -89,7 +89,7 @@ type DataSources struct {
// IntentionUpstreamsDestination provides intention-inferred upstream updates on a // IntentionUpstreamsDestination provides intention-inferred upstream updates on a
// notification channel. // notification channel.
IntentionUpstreamsDestination IntentionUpstreamsDestination IntentionUpstreamsDestination IntentionUpstreams
// InternalServiceDump provides updates about services of a given kind (e.g. // InternalServiceDump provides updates about services of a given kind (e.g.
// mesh gateways) on a notification channel. // mesh gateways) on a notification channel.
@ -197,12 +197,6 @@ type IntentionUpstreams interface {
Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
} }
// IntentionUpstreamsDestination is the interface used to consume updates about upstreams destination
// inferred from service intentions.
type IntentionUpstreamsDestination interface {
Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
}
// InternalServiceDump is the interface used to consume updates about services // InternalServiceDump is the interface used to consume updates about services
// of a given kind (e.g. mesh gateways). // of a given kind (e.g. mesh gateways).
type InternalServiceDump interface { type InternalServiceDump interface {