consul/agent/proxycfg-glue/intention_upstreams.go
Daniel Upton 45886848b4 proxycfg: server-local intention upstreams data source
This is the OSS portion of enterprise PR 2157.

It builds on the local blocking query work in #13438 to implement the
proxycfg.IntentionUpstreams interface using server-local data.

Also moves the ACL filtering logic from agent/consul into the acl/filter
package so that it can be reused here.
2022-07-04 10:48:36 +01:00

68 lines
2.1 KiB
Go

package proxycfgglue
import (
"context"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/structs/aclfilter"
)
// ServerIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
// by sourcing data from a blocking query against the server's state store.
func ServerIntentionUpstreams(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams {
return serverIntentionUpstreams{deps}
}
type serverIntentionUpstreams struct {
deps ServerDataSourceDeps
}
func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
target := structs.NewServiceName(req.ServiceName, &req.EnterpriseMeta)
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedServiceList, error) {
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil)
if err != nil {
return 0, nil, err
}
defaultDecision := authz.IntentionDefaultAllow(nil)
index, services, err := store.IntentionTopology(ws, target, false, defaultDecision, structs.IntentionTargetService)
if err != nil {
return 0, nil, err
}
result := &structs.IndexedServiceList{
Services: services,
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendBlocking,
},
}
aclfilter.New(authz, s.deps.Logger).Filter(result)
return index, result, nil
},
dispatchBlockingQueryUpdate[*structs.IndexedServiceList](ch),
)
}
func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) {
return func(ctx context.Context, correlationID string, result ResultType, err error) {
event := proxycfg.UpdateEvent{
CorrelationID: correlationID,
Result: result,
Err: err,
}
select {
case ch <- event:
case <-ctx.Done():
}
}
}