consul/agent/proxycfg-glue/config_entry.go
Thomas Eckert 13da1a5285
Native API Gateway Config Entries (#15897)
* Stub Config Entries for Consul Native API Gateway (#15644)
* Add empty InlineCertificate struct and protobuf
* apigateway stubs
* Stub HTTPRoute in api pkg
* Stub HTTPRoute in structs pkg
* Simplify api.APIGatewayConfigEntry to be consistent w/ other entries
* Update makeConfigEntry switch, add docstring for HTTPRouteConfigEntry
* Add TCPRoute to MakeConfigEntry, return unique Kind
* Stub BoundAPIGatewayConfigEntry in agent
* Add RaftIndex to APIGatewayConfigEntry stub
* Add new config entry kinds to validation allow-list
* Add RaftIndex to other added config entry stubs
* Update usage metrics assertions to include new cfg entries
* Add Meta and acl.EnterpriseMeta to all new ConfigEntry types
* Remove unnecessary Services field from added config entry types
* Implement GetMeta(), GetEnterpriseMeta() for added config entry types
* Add meta field to proto, name consistently w/ existing config entries
* Format config_entry.proto
* Add initial implementation of CanRead + CanWrite for new config entry types
* Add unit tests for decoding of new config entry types
* Add unit tests for parsing of new config entry types
* Add unit tests for API Gateway config entry ACLs
* Return typed PermissionDeniedError on BoundAPIGateway CanWrite
* Add unit tests for added config entry ACLs
* Add BoundAPIGateway type to AllConfigEntryKinds
* Return proper kind from BoundAPIGateway
* Add docstrings for new config entry types
* Add missing config entry kinds to proto def
* Update usagemetrics_oss_test.go
* Use utility func for returning PermissionDeniedError
* EventPublisher subscriptions for Consul Native API Gateway (#15757)
* Create new event topics in subscribe proto
* Add tests for PBSubscribe func
* Make configs singular, add all configs to PBToStreamSubscribeRequest
* Add snapshot methods
* Add config_entry_events tests
* Add config entry kind to topic for new configs
* Add unit tests for snapshot methods
* Start adding integration test
* Test using the new controller code
* Update agent/consul/state/config_entry_events.go
* Check value of error
* Add controller stubs for API Gateway (#15837)
* update initial stub implementation
* move files, clean up mutex references
* Remove embed, use idiomatic names for constructors
* Remove stray file introduced in merge
* Add APIGateway validation (#15847)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Move struct fields around a bit
* APIGateway InlineCertificate validation (#15856)
* Add APIGateway validation
* Add additional validations
* Add protobuf definitions
* Tabs to spaces
* Add API structs
* Move struct fields around a bit
* Add validation for InlineCertificate
* Fix ACL test
* APIGateway BoundAPIGateway validation (#15858)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Move struct fields around a bit
* Add validation for BoundAPIGateway
* APIGateway TCPRoute validation (#15855)
* Add APIGateway validation
* Add additional validations
* Add cert ref validation
* Add protobuf definitions
* Fix up field types
* Add API structs
* Add TCPRoute normalization and validation
* Add forgotten Status
* Add some more field docs in api package
* Fix test
* Format imports
* Rename snapshot test variable names
* Add plumbing for Native API GW Subscriptions (#16003)

Co-authored-by: Sarah Alsmiller <sarah.alsmiller@hashicorp.com>
Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
Co-authored-by: sarahalsmiller <100602640+sarahalsmiller@users.noreply.github.com>
Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
2023-01-18 22:14:34 +00:00

255 lines
7.6 KiB
Go

package proxycfgglue
import (
"context"
"fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbconfigentry"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// CacheConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
// data from the agent cache.
func CacheConfigEntry(c *cache.Cache) proxycfg.ConfigEntry {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryName}
}
// CacheConfigEntryList satisfies the proxycfg.ConfigEntryList interface by
// sourcing data from the agent cache.
func CacheConfigEntryList(c *cache.Cache) proxycfg.ConfigEntryList {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryListName}
}
// ServerConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
// data from a local materialized view (backed by an EventPublisher subscription).
func ServerConfigEntry(deps ServerDataSourceDeps) proxycfg.ConfigEntry {
return serverConfigEntry{deps}
}
// ServerConfigEntryList satisfies the proxycfg.ConfigEntry interface by sourcing
// data from a local materialized view (backed by an EventPublisher subscription).
func ServerConfigEntryList(deps ServerDataSourceDeps) proxycfg.ConfigEntryList {
return serverConfigEntry{deps}
}
type serverConfigEntry struct {
deps ServerDataSourceDeps
}
func (e serverConfigEntry) Notify(ctx context.Context, req *structs.ConfigEntryQuery, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
cfgReq, err := newConfigEntryRequest(req, e.deps)
if err != nil {
return err
}
return e.deps.ViewStore.NotifyCallback(ctx, cfgReq, correlationID, dispatchCacheUpdate(ch))
}
func newConfigEntryRequest(req *structs.ConfigEntryQuery, deps ServerDataSourceDeps) (*configEntryRequest, error) {
var topic pbsubscribe.Topic
switch req.Kind {
case structs.MeshConfig:
topic = pbsubscribe.Topic_MeshConfig
case structs.ServiceResolver:
topic = pbsubscribe.Topic_ServiceResolver
case structs.IngressGateway:
topic = pbsubscribe.Topic_IngressGateway
case structs.ServiceDefaults:
topic = pbsubscribe.Topic_ServiceDefaults
case structs.APIGateway:
topic = pbsubscribe.Topic_APIGateway
case structs.HTTPRoute:
topic = pbsubscribe.Topic_HTTPRoute
case structs.TCPRoute:
topic = pbsubscribe.Topic_TCPRoute
case structs.InlineCertificate:
topic = pbsubscribe.Topic_InlineCertificate
case structs.BoundAPIGateway:
topic = pbsubscribe.Topic_BoundAPIGateway
default:
return nil, fmt.Errorf("cannot map config entry kind: %s to a topic", req.Kind)
}
return &configEntryRequest{
topic: topic,
req: req,
deps: deps,
}, nil
}
type configEntryRequest struct {
topic pbsubscribe.Topic
req *structs.ConfigEntryQuery
deps ServerDataSourceDeps
}
func (r *configEntryRequest) CacheInfo() cache.RequestInfo { return r.req.CacheInfo() }
func (r *configEntryRequest) NewMaterializer() (submatview.Materializer, error) {
var view submatview.View
if r.req.Name == "" {
view = newConfigEntryListView(r.req.Kind, r.req.EnterpriseMeta)
} else {
view = &configEntryView{}
}
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
Backend: r.deps.EventPublisher,
ACLResolver: r.deps.ACLResolver,
Deps: submatview.Deps{
View: view,
Logger: r.deps.Logger,
Request: r.Request,
},
}), nil
}
func (r *configEntryRequest) Type() string { return "proxycfgglue.ConfigEntry" }
func (r *configEntryRequest) Request(index uint64) *pbsubscribe.SubscribeRequest {
req := &pbsubscribe.SubscribeRequest{
Topic: r.topic,
Index: index,
Datacenter: r.req.Datacenter,
Token: r.req.QueryOptions.Token,
}
if name := r.req.Name; name == "" {
req.Subject = &pbsubscribe.SubscribeRequest_WildcardSubject{
WildcardSubject: true,
}
} else {
req.Subject = &pbsubscribe.SubscribeRequest_NamedSubject{
NamedSubject: &pbsubscribe.NamedSubject{
Key: name,
Partition: r.req.PartitionOrDefault(),
Namespace: r.req.NamespaceOrDefault(),
},
}
}
return req
}
// configEntryView implements a submatview.View for a single config entry.
type configEntryView struct {
state structs.ConfigEntry
}
func (v *configEntryView) Reset() {
v.state = nil
}
func (v *configEntryView) Result(index uint64) any {
return &structs.ConfigEntryResponse{
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
Entry: v.state,
}
}
func (v *configEntryView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
update := event.GetConfigEntry()
if update == nil {
continue
}
switch update.Op {
case pbsubscribe.ConfigEntryUpdate_Delete:
v.state = nil
case pbsubscribe.ConfigEntryUpdate_Upsert:
v.state = pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
}
}
return nil
}
// configEntryListView implements a submatview.View for a list of config entries
// that are all of the same kind (name is treated as unique).
type configEntryListView struct {
kind string
entMeta acl.EnterpriseMeta
state map[string]structs.ConfigEntry
}
func newConfigEntryListView(kind string, entMeta acl.EnterpriseMeta) *configEntryListView {
view := &configEntryListView{kind: kind, entMeta: entMeta}
view.Reset()
return view
}
func (v *configEntryListView) Reset() {
v.state = make(map[string]structs.ConfigEntry)
}
func (v *configEntryListView) Result(index uint64) any {
entries := make([]structs.ConfigEntry, 0, len(v.state))
for _, entry := range v.state {
entries = append(entries, entry)
}
return &structs.IndexedConfigEntries{
Kind: v.kind,
Entries: entries,
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
}
}
func (v *configEntryListView) Update(events []*pbsubscribe.Event) error {
for _, event := range filterByEnterpriseMeta(events, v.entMeta) {
update := event.GetConfigEntry()
configEntry := pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
name := structs.NewServiceName(configEntry.GetName(), configEntry.GetEnterpriseMeta()).String()
switch update.Op {
case pbsubscribe.ConfigEntryUpdate_Delete:
delete(v.state, name)
case pbsubscribe.ConfigEntryUpdate_Upsert:
v.state[name] = configEntry
}
}
return nil
}
// filterByEnterpriseMeta filters the given set of events to remove those that
// don't match the request's enterprise meta - this is necessary because when
// subscribing to a topic with SubjectWildcard we'll get events for resources
// in all partitions and namespaces.
func filterByEnterpriseMeta(events []*pbsubscribe.Event, entMeta acl.EnterpriseMeta) []*pbsubscribe.Event {
partition := entMeta.PartitionOrDefault()
namespace := entMeta.NamespaceOrDefault()
filtered := make([]*pbsubscribe.Event, 0, len(events))
for _, event := range events {
var eventEntMeta *pbcommon.EnterpriseMeta
switch payload := event.Payload.(type) {
case *pbsubscribe.Event_ConfigEntry:
eventEntMeta = payload.ConfigEntry.ConfigEntry.GetEnterpriseMeta()
case *pbsubscribe.Event_Service:
eventEntMeta = payload.Service.GetEnterpriseMeta()
default:
continue
}
if partition != acl.WildcardName && !acl.EqualPartitions(partition, eventEntMeta.GetPartition()) {
continue
}
if namespace != acl.WildcardName && !acl.EqualNamespaces(namespace, eventEntMeta.GetNamespace()) {
continue
}
filtered = append(filtered, event)
}
return filtered
}