diff --git a/agent/agent.go b/agent/agent.go index d8ff6d6da9..bfee4d32f8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4268,6 +4268,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth)) sources.Intentions = proxycfgglue.ServerIntentions(deps) sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps) + sources.IntentionUpstreamsDestination = proxycfgglue.ServerIntentionUpstreamsDestination(deps) sources.InternalServiceDump = proxycfgglue.ServerInternalServiceDump(deps, proxycfgglue.CacheInternalServiceDump(a.cache)) sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps) sources.ResolvedServiceConfig = proxycfgglue.ServerResolvedServiceConfig(deps, proxycfgglue.CacheResolvedServiceConfig(a.cache)) diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 8c7921debb..6aef1da543 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -82,12 +82,6 @@ func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks { return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName} } -// CacheIntentionUpstreamsDestination satisfies the proxycfg.IntentionUpstreamsDestination interface -// by sourcing data from the agent cache. -func CacheIntentionUpstreamsDestination(c *cache.Cache) proxycfg.IntentionUpstreams { - return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsDestinationName} -} - // CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by // sourcing data from the agent cache. // @@ -133,6 +127,15 @@ func dispatchCacheUpdate(ch chan<- proxycfg.UpdateEvent) cache.Callback { } } +func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) { + return func(ctx context.Context, correlationID string, result ResultType, err error) { + select { + case ch <- newUpdateEvent(correlationID, result, err): + case <-ctx.Done(): + } + } +} + func newUpdateEvent(correlationID string, result any, err error) proxycfg.UpdateEvent { // This roughly matches the logic in agent/submatview.LocalMaterializer.isTerminalError. if acl.IsErrNotFound(err) { diff --git a/agent/proxycfg-glue/intention_upstreams.go b/agent/proxycfg-glue/intention_upstreams.go index a91063c3b8..e5d5e9959c 100644 --- a/agent/proxycfg-glue/intention_upstreams.go +++ b/agent/proxycfg-glue/intention_upstreams.go @@ -14,19 +14,36 @@ import ( ) // CacheIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface -// by sourcing data from the agent cache. +// by sourcing upstreams for the given service, inferred from intentions, from +// the agent cache. func CacheIntentionUpstreams(c *cache.Cache) proxycfg.IntentionUpstreams { return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsName} } +// CacheIntentionUpstreamsDestination satisfies the proxycfg.IntentionUpstreams +// interface by sourcing upstreams for the given destination, inferred from +// intentions, from the agent cache. +func CacheIntentionUpstreamsDestination(c *cache.Cache) proxycfg.IntentionUpstreams { + return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsDestinationName} +} + // ServerIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface -// by sourcing data from a blocking query against the server's state store. +// by sourcing upstreams for the given service, inferred from intentions, from +// the server's state store. func ServerIntentionUpstreams(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams { - return serverIntentionUpstreams{deps} + return serverIntentionUpstreams{deps, structs.IntentionTargetService} +} + +// ServerIntentionUpstreamsDestination satisfies the proxycfg.IntentionUpstreams +// interface by sourcing upstreams for the given destination, inferred from +// intentions, from the server's state store. +func ServerIntentionUpstreamsDestination(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams { + return serverIntentionUpstreams{deps, structs.IntentionTargetDestination} } type serverIntentionUpstreams struct { - deps ServerDataSourceDeps + deps ServerDataSourceDeps + target structs.IntentionTargetType } func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { @@ -40,7 +57,7 @@ func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.Servi } defaultDecision := authz.IntentionDefaultAllow(nil) - index, services, err := store.IntentionTopology(ws, target, false, defaultDecision, structs.IntentionTargetService) + index, services, err := store.IntentionTopology(ws, target, false, defaultDecision, s.target) if err != nil { return 0, nil, err } @@ -59,12 +76,3 @@ func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.Servi dispatchBlockingQueryUpdate[*structs.IndexedServiceList](ch), ) } - -func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) { - return func(ctx context.Context, correlationID string, result ResultType, err error) { - select { - case ch <- newUpdateEvent(correlationID, result, err): - case <-ctx.Done(): - } - } -} diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index 9b8285a52b..c012610710 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -89,7 +89,7 @@ type DataSources struct { // IntentionUpstreamsDestination provides intention-inferred upstream updates on a // notification channel. - IntentionUpstreamsDestination IntentionUpstreamsDestination + IntentionUpstreamsDestination IntentionUpstreams // InternalServiceDump provides updates about services of a given kind (e.g. // mesh gateways) on a notification channel. @@ -197,12 +197,6 @@ type IntentionUpstreams interface { Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error } -// IntentionUpstreamsDestination is the interface used to consume updates about upstreams destination -// inferred from service intentions. -type IntentionUpstreamsDestination interface { - Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- UpdateEvent) error -} - // InternalServiceDump is the interface used to consume updates about services // of a given kind (e.g. mesh gateways). type InternalServiceDump interface {