Trigger the V1 Compat exported-services Controller when V1 Config Entries are Updated (#20456)

* Trigger the v1 compat exported-services controller when the v1 config entry is modified.

* Hook up exported-services config entries to the event publisher.
* Add tests to the v2 exported services shim.
* Use the local materializer trigger updates on the v1 compat exported services controller when exported-services config entries are modified.

* stop sleeping when context is cancelled
This commit is contained in:
Eric Haberkorn 2024-02-02 15:30:04 -05:00 committed by GitHub
parent 1fe0a87546
commit 543c6a30af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 2472 additions and 1720 deletions

View File

@ -437,7 +437,11 @@ func (c *FSM) registerStreamSnapshotHandlers() {
err = c.deps.Publisher.RegisterHandler(state.EventTopicJWTProvider, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { err = c.deps.Publisher.RegisterHandler(state.EventTopicJWTProvider, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().JWTProviderSnapshot(req, buf) return c.State().JWTProviderSnapshot(req, buf)
}, true) }, true)
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicExportedServices, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().ExportedServicesSnapshot(req, buf)
}, true)
panicIfErr(err) panicIfErr(err)
} }

View File

@ -1008,7 +1008,6 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error
if s.useV2Resources { if s.useV2Resources {
catalog.RegisterControllers(s.controllerManager) catalog.RegisterControllers(s.controllerManager)
multicluster.RegisterControllers(s.controllerManager, multicluster.DefaultControllerDependencies(&V1ServiceExportsShim{s: s}))
defaultAllow, err := s.config.ACLResolverSettings.IsDefaultAllow() defaultAllow, err := s.config.ACLResolverSettings.IsDefaultAllow()
if err != nil { if err != nil {
return err return err
@ -1046,7 +1045,8 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error
auth.RegisterControllers(s.controllerManager, auth.DefaultControllerDependencies()) auth.RegisterControllers(s.controllerManager, auth.DefaultControllerDependencies())
} }
multicluster.RegisterControllers(s.controllerManager, multicluster.DefaultControllerDependencies(&V1ServiceExportsShim{s: s})) shim := NewExportedServicesShim(s)
multicluster.RegisterControllers(s.controllerManager, multicluster.DefaultControllerDependencies(shim))
reaper.RegisterControllers(s.controllerManager) reaper.RegisterControllers(s.controllerManager)

View File

@ -28,6 +28,7 @@ var configEntryKindToTopic = map[string]stream.Topic{
structs.RateLimitIPConfig: EventTopicIPRateLimit, structs.RateLimitIPConfig: EventTopicIPRateLimit,
structs.SamenessGroup: EventTopicSamenessGroup, structs.SamenessGroup: EventTopicSamenessGroup,
structs.JWTProvider: EventTopicJWTProvider, structs.JWTProvider: EventTopicJWTProvider,
structs.ExportedServices: EventTopicExportedServices,
} }
// EventSubjectConfigEntry is a stream.Subject used to route and receive events // EventSubjectConfigEntry is a stream.Subject used to route and receive events
@ -176,6 +177,12 @@ func (s *Store) JWTProviderSnapshot(req stream.SubscribeRequest, buf stream.Snap
return s.configEntrySnapshot(structs.JWTProvider, req, buf) return s.configEntrySnapshot(structs.JWTProvider, req, buf)
} }
// ExportedServicesSnapshot is a stream.SnapshotFunc that returns a snapshot of
// exported-services config entries.
func (s *Store) ExportedServicesSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.ExportedServices, req, buf)
}
func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
var ( var (
idx uint64 idx uint64

View File

@ -46,7 +46,7 @@ func PBToStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta acl.E
case EventTopicMeshConfig, EventTopicServiceResolver, EventTopicIngressGateway, case EventTopicMeshConfig, EventTopicServiceResolver, EventTopicIngressGateway,
EventTopicServiceIntentions, EventTopicServiceDefaults, EventTopicAPIGateway, EventTopicServiceIntentions, EventTopicServiceDefaults, EventTopicAPIGateway,
EventTopicTCPRoute, EventTopicHTTPRoute, EventTopicJWTProvider, EventTopicInlineCertificate, EventTopicTCPRoute, EventTopicHTTPRoute, EventTopicJWTProvider, EventTopicInlineCertificate,
EventTopicBoundAPIGateway, EventTopicSamenessGroup: EventTopicBoundAPIGateway, EventTopicSamenessGroup, EventTopicExportedServices:
subject = EventSubjectConfigEntry{ subject = EventSubjectConfigEntry{
Name: named.Key, Name: named.Key,
EnterpriseMeta: &entMeta, EnterpriseMeta: &entMeta,

View File

@ -212,6 +212,7 @@ var (
EventTopicIPRateLimit = pbsubscribe.Topic_IPRateLimit EventTopicIPRateLimit = pbsubscribe.Topic_IPRateLimit
EventTopicSamenessGroup = pbsubscribe.Topic_SamenessGroup EventTopicSamenessGroup = pbsubscribe.Topic_SamenessGroup
EventTopicJWTProvider = pbsubscribe.Topic_JWTProvider EventTopicJWTProvider = pbsubscribe.Topic_JWTProvider
EventTopicExportedServices = pbsubscribe.Topic_ExportedServices
) )
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {

View File

@ -23,7 +23,7 @@ type EventPublisher struct {
// seconds. // seconds.
snapCacheTTL time.Duration snapCacheTTL time.Duration
// This lock protects the snapCache, topicBuffers and topicBuffer.refs. // This lock protects the snapCache, topicBuffers, snapshotHandlers, and topicBuffer.refs.
lock sync.RWMutex lock sync.RWMutex
// topicBuffers stores the head of the linked-list buffers to publish events to // topicBuffers stores the head of the linked-list buffers to publish events to
@ -116,16 +116,18 @@ func NewEventPublisher(snapCacheTTL time.Duration) *EventPublisher {
} }
// RegisterHandler will register a new snapshot handler function. The expectation is // RegisterHandler will register a new snapshot handler function. The expectation is
// that all handlers get registered prior to the event publisher being Run. Handler // that all handlers get registered prior to the event publisher being Run. Passing
// registration is therefore not concurrency safe and access to handlers is internally // supportsWildcard allows consumers to subscribe to events on this topic with *any*
// not synchronized. Passing supportsWildcard allows consumers to subscribe to events // subject (by requesting SubjectWildcard) but this must be supported by the handler
// on this topic with *any* subject (by requesting SubjectWildcard) but this must be // function.
// supported by the handler function.
func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc, supportsWildcard bool) error { func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc, supportsWildcard bool) error {
if topic.String() == "" { if topic.String() == "" {
return fmt.Errorf("the topic cannnot be empty") return fmt.Errorf("the topic cannnot be empty")
} }
e.lock.Lock()
defer e.lock.Unlock()
if _, found := e.snapshotHandlers[topic]; found { if _, found := e.snapshotHandlers[topic]; found {
return fmt.Errorf("a handler is already registered for the topic: %s", topic.String()) return fmt.Errorf("a handler is already registered for the topic: %s", topic.String())
} }
@ -143,7 +145,11 @@ func (e *EventPublisher) RegisterHandler(topic Topic, handler SnapshotFunc, supp
} }
func (e *EventPublisher) RefreshTopic(topic Topic) error { func (e *EventPublisher) RefreshTopic(topic Topic) error {
if _, found := e.snapshotHandlers[topic]; !found { e.lock.Lock()
_, found := e.snapshotHandlers[topic]
e.lock.Unlock()
if !found {
return fmt.Errorf("topic %s is not registered", topic) return fmt.Errorf("topic %s is not registered", topic)
} }

View File

@ -210,9 +210,11 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer sub.Unsubscribe() defer sub.Unsubscribe()
publisher.lock.Lock()
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) { publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used") return 0, fmt.Errorf("error should not be seen, cache should have been used")
} }
publisher.lock.Unlock()
sub, err = publisher.Subscribe(req) sub, err = publisher.Subscribe(req)
require.NoError(t, err) require.NoError(t, err)
@ -394,9 +396,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
publisher.publishEvent([]Event{nextEvent}) publisher.publishEvent([]Event{nextEvent})
}) })
publisher.lock.Lock()
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) { publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used") return 0, fmt.Errorf("error should not be seen, cache should have been used")
} }
publisher.lock.Unlock()
testutil.RunStep(t, "resume the subscription", func(t *testing.T) { testutil.RunStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req newReq := *req
@ -476,9 +480,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi
require.Equal(t, uint64(3), next.Index) require.Equal(t, uint64(3), next.Index)
}) })
publisher.lock.Lock()
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) { publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used") return 0, fmt.Errorf("error should not be seen, cache should have been used")
} }
publisher.lock.Unlock()
testutil.RunStep(t, "resume the subscription", func(t *testing.T) { testutil.RunStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req newReq := *req

View File

@ -5,17 +5,103 @@ package consul
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
"github.com/hashicorp/go-hclog"
) )
type V1ServiceExportsShim struct { type v1ServiceExportsShim struct {
s *Server s *Server
eventCh chan controller.Event
} }
func (s *V1ServiceExportsShim) GetExportedServicesConfigEntry(_ context.Context, name string, entMeta *acl.EnterpriseMeta) (*structs.ExportedServicesConfigEntry, error) { func NewExportedServicesShim(s *Server) *v1ServiceExportsShim {
eventCh := make(chan controller.Event)
return &v1ServiceExportsShim{
s: s,
eventCh: eventCh,
}
}
func (s *v1ServiceExportsShim) Start(ctx context.Context) {
logger := s.s.logger.Named(logging.V2ExportsShim)
// TODO replace this with a proper supervisor.
for ctx.Err() == nil {
err := subscribeToExportedServicesEvents(ctx, logger, s.s.publisher, s.eventCh)
if err != nil {
logger.Warn("encountered an error while streaming exported services", "error", err)
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
} else {
return
}
}
}
func subscribeToExportedServicesEvents(ctx context.Context, logger hclog.Logger, publisher *stream.EventPublisher, eventCh chan controller.Event) error {
subscription, err := publisher.Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicExportedServices,
Subject: stream.SubjectWildcard,
})
if err != nil {
return err
}
defer subscription.Unsubscribe()
var index uint64
for {
event, err := subscription.Next(ctx)
switch {
case errors.Is(err, context.Canceled):
return nil
case err != nil:
return err
}
if event.IsFramingEvent() {
continue
}
if event.Index <= index {
continue
}
index = event.Index
e := event.Payload.ToSubscriptionEvent(event.Index)
configEntry := e.GetConfigEntry().GetConfigEntry()
if configEntry.GetKind() != pbconfigentry.Kind_KindExportedServices {
logger.Error("unexpected config entry kind", "kind", configEntry.GetKind())
continue
}
partition := acl.PartitionOrDefault(configEntry.GetEnterpriseMeta().GetPartition())
eventCh <- controller.Event{
Obj: exportedServiceItemType{partition: partition},
}
}
}
func (s *v1ServiceExportsShim) EventChannel() chan controller.Event {
return s.eventCh
}
func (s *v1ServiceExportsShim) GetExportedServicesConfigEntry(_ context.Context, name string, entMeta *acl.EnterpriseMeta) (*structs.ExportedServicesConfigEntry, error) {
_, entry, err := s.s.fsm.State().ConfigEntry(nil, structs.ExportedServices, name, entMeta) _, entry, err := s.s.fsm.State().ConfigEntry(nil, structs.ExportedServices, name, entMeta)
if err != nil { if err != nil {
return nil, err return nil, err
@ -33,7 +119,7 @@ func (s *V1ServiceExportsShim) GetExportedServicesConfigEntry(_ context.Context,
return exp, nil return exp, nil
} }
func (s *V1ServiceExportsShim) WriteExportedServicesConfigEntry(_ context.Context, cfg *structs.ExportedServicesConfigEntry) error { func (s *v1ServiceExportsShim) WriteExportedServicesConfigEntry(_ context.Context, cfg *structs.ExportedServicesConfigEntry) error {
if err := cfg.Normalize(); err != nil { if err := cfg.Normalize(); err != nil {
return err return err
} }
@ -51,7 +137,11 @@ func (s *V1ServiceExportsShim) WriteExportedServicesConfigEntry(_ context.Contex
return err return err
} }
func (s *V1ServiceExportsShim) DeleteExportedServicesConfigEntry(_ context.Context, name string, entMeta *acl.EnterpriseMeta) error { func (s *v1ServiceExportsShim) DeleteExportedServicesConfigEntry(_ context.Context, name string, entMeta *acl.EnterpriseMeta) error {
if entMeta == nil {
entMeta = acl.DefaultEnterpriseMeta()
}
req := &structs.ConfigEntryRequest{ req := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryDelete, Op: structs.ConfigEntryDelete,
Entry: &structs.ExportedServicesConfigEntry{ Entry: &structs.ExportedServicesConfigEntry{
@ -67,3 +157,13 @@ func (s *V1ServiceExportsShim) DeleteExportedServicesConfigEntry(_ context.Conte
_, err := s.s.raftApply(structs.ConfigEntryRequestType, req) _, err := s.s.raftApply(structs.ConfigEntryRequestType, req)
return err return err
} }
type exportedServiceItemType struct {
partition string
}
var _ queue.ItemType = (*exportedServiceItemType)(nil)
func (e exportedServiceItemType) Key() string {
return e.partition
}

View File

@ -0,0 +1,97 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
)
func TestV1ServiceExportsShim_Integration(t *testing.T) {
t.Parallel()
_, srv := testServerDC(t, "dc1")
shim := NewExportedServicesShim(srv)
testrpc.WaitForLeader(t, srv.RPC, "dc1")
v1ServiceExportsShimTests(t, shim, []*structs.ExportedServicesConfigEntry{
{
Name: "default",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{
{Peer: "cluster-01"},
},
},
},
RaftIndex: structs.RaftIndex{
CreateIndex: 0,
ModifyIndex: 1,
},
},
{
Name: "default",
Services: []structs.ExportedService{
{
Name: "bar",
Consumers: []structs.ServiceConsumer{
{Peer: "cluster-01"},
},
},
},
RaftIndex: structs.RaftIndex{
CreateIndex: 0,
ModifyIndex: 2,
},
},
})
}
func v1ServiceExportsShimTests(t *testing.T, shim *v1ServiceExportsShim, configs []*structs.ExportedServicesConfigEntry) {
ctx := context.Background()
go shim.Start(context.Background())
partitions := make(map[string]*acl.EnterpriseMeta)
for _, config := range configs {
partitions[config.PartitionOrDefault()] = config.GetEnterpriseMeta()
}
for _, entMeta := range partitions {
exportedServices, err := shim.GetExportedServicesConfigEntry(ctx, entMeta.PartitionOrDefault(), entMeta)
require.Nil(t, err)
require.Nil(t, exportedServices)
}
for _, config := range configs {
err := shim.WriteExportedServicesConfigEntry(ctx, config)
require.NoError(t, err)
shim.assertPartitionEvent(t, config.PartitionOrDefault())
}
for _, entMeta := range partitions {
err := shim.DeleteExportedServicesConfigEntry(ctx, entMeta.PartitionOrDefault(), entMeta)
require.NoError(t, err)
shim.assertPartitionEvent(t, entMeta.PartitionOrDefault())
}
}
func (s *v1ServiceExportsShim) assertPartitionEvent(t *testing.T, partition string) {
t.Helper()
select {
case event := <-s.eventCh:
require.Equal(t, partition, event.Obj.Key())
case <-time.After(250 * time.Millisecond):
t.Fatal("timeout waiting for view to receive events")
}
}

View File

@ -24,9 +24,11 @@ const (
) )
type AggregatedConfig interface { type AggregatedConfig interface {
Start(context.Context)
GetExportedServicesConfigEntry(context.Context, string, *acl.EnterpriseMeta) (*structs.ExportedServicesConfigEntry, error) GetExportedServicesConfigEntry(context.Context, string, *acl.EnterpriseMeta) (*structs.ExportedServicesConfigEntry, error)
WriteExportedServicesConfigEntry(context.Context, *structs.ExportedServicesConfigEntry) error WriteExportedServicesConfigEntry(context.Context, *structs.ExportedServicesConfigEntry) error
DeleteExportedServicesConfigEntry(context.Context, string, *acl.EnterpriseMeta) error DeleteExportedServicesConfigEntry(context.Context, string, *acl.EnterpriseMeta) error
EventChannel() chan controller.Event
} }
func mapExportedServices(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { func mapExportedServices(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
@ -43,12 +45,31 @@ func mapExportedServices(_ context.Context, _ controller.Runtime, res *pbresourc
}, nil }, nil
} }
func mapConfigEntryEvents(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) {
partition := event.Obj.Key()
return []controller.Request{
{
ID: &pbresource.ID{
Type: pbmulticluster.ComputedExportedServicesType,
Tenancy: &pbresource.Tenancy{
Partition: partition,
},
Name: types.ComputedExportedServicesName,
},
},
}, nil
}
func Controller(config AggregatedConfig) *controller.Controller { func Controller(config AggregatedConfig) *controller.Controller {
return controller.NewController(ControllerName, pbmulticluster.ComputedExportedServicesType). return controller.NewController(ControllerName, pbmulticluster.ComputedExportedServicesType).
WithNotifyStart(func(ctx context.Context, r controller.Runtime) {
go config.Start(ctx)
}).
WithWatch(pbmulticluster.PartitionExportedServicesType, mapExportedServices). WithWatch(pbmulticluster.PartitionExportedServicesType, mapExportedServices).
WithWatch(pbmulticluster.NamespaceExportedServicesType, mapExportedServices). WithWatch(pbmulticluster.NamespaceExportedServicesType, mapExportedServices).
WithWatch(pbmulticluster.ExportedServicesType, mapExportedServices). WithWatch(pbmulticluster.ExportedServicesType, mapExportedServices).
// TODO Add custom watch for exported-services for config entry events to attempt re-reconciliation when that changes WithCustomWatch(&controller.Source{Source: config.EventChannel()}, mapConfigEntryEvents).
WithReconciler(&reconciler{config: config}) WithReconciler(&reconciler{config: config})
} }

View File

@ -72,6 +72,7 @@ const (
XDS string = "xds" XDS string = "xds"
XDSCapacityController string = "xds_capacity_controller" XDSCapacityController string = "xds_capacity_controller"
Vault string = "vault" Vault string = "vault"
V2ExportsShim string = "v2_exports_shim"
Health string = "health" Health string = "health"
GRPCAPI string = "grpc-api" GRPCAPI string = "grpc-api"
Resource string = "resource" Resource string = "resource"

View File

@ -278,6 +278,86 @@ func DestinationConfigFromStructs(t *structs.DestinationConfig, s *DestinationCo
s.Addresses = t.Addresses s.Addresses = t.Addresses
s.Port = int32(t.Port) s.Port = int32(t.Port)
} }
func ExportedServicesToStructs(s *ExportedServices, t *structs.ExportedServicesConfigEntry) {
if s == nil {
return
}
{
t.Services = make([]structs.ExportedService, len(s.Services))
for i := range s.Services {
if s.Services[i] != nil {
ExportedServicesServiceToStructs(s.Services[i], &t.Services[i])
}
}
}
t.Meta = s.Meta
t.Hash = s.Hash
}
func ExportedServicesFromStructs(t *structs.ExportedServicesConfigEntry, s *ExportedServices) {
if s == nil {
return
}
{
s.Services = make([]*ExportedServicesService, len(t.Services))
for i := range t.Services {
{
var x ExportedServicesService
ExportedServicesServiceFromStructs(&t.Services[i], &x)
s.Services[i] = &x
}
}
}
s.Meta = t.Meta
s.Hash = t.Hash
}
func ExportedServicesConsumerToStructs(s *ExportedServicesConsumer, t *structs.ServiceConsumer) {
if s == nil {
return
}
t.Partition = s.Partition
t.Peer = s.Peer
t.SamenessGroup = s.SamenessGroup
}
func ExportedServicesConsumerFromStructs(t *structs.ServiceConsumer, s *ExportedServicesConsumer) {
if s == nil {
return
}
s.Partition = t.Partition
s.Peer = t.Peer
s.SamenessGroup = t.SamenessGroup
}
func ExportedServicesServiceToStructs(s *ExportedServicesService, t *structs.ExportedService) {
if s == nil {
return
}
t.Name = s.Name
t.Namespace = s.Namespace
{
t.Consumers = make([]structs.ServiceConsumer, len(s.Consumers))
for i := range s.Consumers {
if s.Consumers[i] != nil {
ExportedServicesConsumerToStructs(s.Consumers[i], &t.Consumers[i])
}
}
}
}
func ExportedServicesServiceFromStructs(t *structs.ExportedService, s *ExportedServicesService) {
if s == nil {
return
}
s.Name = t.Name
s.Namespace = t.Namespace
{
s.Consumers = make([]*ExportedServicesConsumer, len(t.Consumers))
for i := range t.Consumers {
{
var x ExportedServicesConsumer
ExportedServicesConsumerFromStructs(&t.Consumers[i], &x)
s.Consumers[i] = &x
}
}
}
}
func ExposeConfigToStructs(s *ExposeConfig, t *structs.ExposeConfig) { func ExposeConfigToStructs(s *ExposeConfig, t *structs.ExposeConfig) {
if s == nil { if s == nil {
return return

View File

@ -117,6 +117,14 @@ func ConfigEntryToStructs(s *ConfigEntry) structs.ConfigEntry {
pbcommon.RaftIndexToStructs(s.RaftIndex, &target.RaftIndex) pbcommon.RaftIndexToStructs(s.RaftIndex, &target.RaftIndex)
pbcommon.EnterpriseMetaToStructs(s.EnterpriseMeta, &target.EnterpriseMeta) pbcommon.EnterpriseMetaToStructs(s.EnterpriseMeta, &target.EnterpriseMeta)
return &target return &target
case Kind_KindExportedServices:
var target structs.ExportedServicesConfigEntry
target.Name = s.Name
ExportedServicesToStructs(s.GetExportedServices(), &target)
pbcommon.RaftIndexToStructs(s.RaftIndex, &target.RaftIndex)
pbcommon.EnterpriseMetaToStructs(s.EnterpriseMeta, &target.EnterpriseMeta)
return &target
default: default:
panic(fmt.Sprintf("unable to convert ConfigEntry of kind %s to structs", s.Kind)) panic(fmt.Sprintf("unable to convert ConfigEntry of kind %s to structs", s.Kind))
} }
@ -228,6 +236,14 @@ func ConfigEntryFromStructs(s structs.ConfigEntry) *ConfigEntry {
configEntry.Entry = &ConfigEntry_JWTProvider{ configEntry.Entry = &ConfigEntry_JWTProvider{
JWTProvider: &jwtProvider, JWTProvider: &jwtProvider,
} }
case *structs.ExportedServicesConfigEntry:
var es ExportedServices
ExportedServicesFromStructs(v, &es)
configEntry.Kind = Kind_KindExportedServices
configEntry.Entry = &ConfigEntry_ExportedServices{
ExportedServices: &es,
}
default: default:
panic(fmt.Sprintf("unable to convert %T to proto", s)) panic(fmt.Sprintf("unable to convert %T to proto", s))
} }

View File

@ -1006,3 +1006,33 @@ func (msg *JWTCacheConfig) MarshalBinary() ([]byte, error) {
func (msg *JWTCacheConfig) UnmarshalBinary(b []byte) error { func (msg *JWTCacheConfig) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ExportedServices) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ExportedServices) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ExportedServicesService) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ExportedServicesService) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ExportedServicesConsumer) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ExportedServicesConsumer) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

File diff suppressed because it is too large Load Diff

View File

@ -53,6 +53,7 @@ enum Kind {
KindTCPRoute = 10; KindTCPRoute = 10;
KindSamenessGroup = 11; KindSamenessGroup = 11;
KindJWTProvider = 12; KindJWTProvider = 12;
KindExportedServices = 13;
} }
message ConfigEntry { message ConfigEntry {
@ -75,6 +76,7 @@ message ConfigEntry {
InlineCertificate InlineCertificate = 14; InlineCertificate InlineCertificate = 14;
SamenessGroup SamenessGroup = 15; SamenessGroup SamenessGroup = 15;
JWTProvider JWTProvider = 16; JWTProvider JWTProvider = 16;
ExportedServices ExportedServices = 17;
} }
} }
@ -1305,3 +1307,40 @@ message JWTCacheConfig {
// mog: func-to=int func-from=int32 // mog: func-to=int func-from=int32
int32 Size = 1; int32 Size = 1;
} }
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.ExportedServicesConfigEntry
// output=config_entry.gen.go
// name=Structs
// ignore-fields=Kind,Name,RaftIndex,EnterpriseMeta
message ExportedServices {
string Name = 1;
// mog: func-to=enterpriseMetaToStructs func-from=enterpriseMetaFromStructs
common.EnterpriseMeta EnterpriseMeta = 2;
map<string, string> Meta = 3;
uint64 Hash = 4;
repeated ExportedServicesService Services = 5;
}
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.ExportedService
// output=config_entry.gen.go
// name=Structs
message ExportedServicesService {
string Name = 1;
string Namespace = 2;
repeated ExportedServicesConsumer Consumers = 3;
}
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.ServiceConsumer
// output=config_entry.gen.go
// name=Structs
message ExportedServicesConsumer {
string Partition = 1;
string Peer = 2;
string SamenessGroup = 3;
}

View File

@ -78,6 +78,8 @@ const (
Topic_SamenessGroup Topic = 15 Topic_SamenessGroup Topic = 15
// JWTProvider topic contains events for changes to jwt-provider // JWTProvider topic contains events for changes to jwt-provider
Topic_JWTProvider Topic = 16 Topic_JWTProvider Topic = 16
// ExportedServices topic contains events for changes to exported-services.
Topic_ExportedServices Topic = 17
) )
// Enum value maps for Topic. // Enum value maps for Topic.
@ -100,6 +102,7 @@ var (
14: "IPRateLimit", 14: "IPRateLimit",
15: "SamenessGroup", 15: "SamenessGroup",
16: "JWTProvider", 16: "JWTProvider",
17: "ExportedServices",
} }
Topic_value = map[string]int32{ Topic_value = map[string]int32{
"Unknown": 0, "Unknown": 0,
@ -119,6 +122,7 @@ var (
"IPRateLimit": 14, "IPRateLimit": 14,
"SamenessGroup": 15, "SamenessGroup": 15,
"JWTProvider": 16, "JWTProvider": 16,
"ExportedServices": 17,
} }
) )
@ -1000,7 +1004,7 @@ var file_private_pbsubscribe_subscribe_proto_rawDesc = []byte{
0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x0e, 0x45, 0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x0e, 0x45,
0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x1a, 0x0a,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xc5, 0x02, 0x0a, 0x05, 0x54, 0x6f, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xdb, 0x02, 0x0a, 0x05, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x70, 0x69, 0x63, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00,
0x12, 0x11, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x68, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65,
@ -1021,26 +1025,27 @@ var file_private_pbsubscribe_subscribe_proto_rawDesc = []byte{
0x49, 0x50, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x10, 0x0e, 0x12, 0x11, 0x0a, 0x49, 0x50, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x10, 0x0e, 0x12, 0x11, 0x0a,
0x0d, 0x53, 0x61, 0x6d, 0x65, 0x6e, 0x65, 0x73, 0x73, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x10, 0x0f, 0x0d, 0x53, 0x61, 0x6d, 0x65, 0x6e, 0x65, 0x73, 0x73, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x10, 0x0f,
0x12, 0x0f, 0x0a, 0x0b, 0x4a, 0x57, 0x54, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x10, 0x12, 0x0f, 0x0a, 0x0b, 0x4a, 0x57, 0x54, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x10,
0x10, 0x2a, 0x29, 0x0a, 0x09, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x10, 0x12, 0x14, 0x0a, 0x10, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x53, 0x65, 0x72,
0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x76, 0x69, 0x63, 0x65, 0x73, 0x10, 0x11, 0x2a, 0x29, 0x0a, 0x09, 0x43, 0x61, 0x74, 0x61, 0x6c,
0x44, 0x65, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x61, 0x0a, 0x17, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72,
0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72,
0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x10, 0x01, 0x32, 0x61, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67,
0x72, 0x69, 0x62, 0x65, 0x12, 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a,
0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1b, 0x2e, 0x73, 0x75, 0x62,
0x74, 0x1a, 0x10, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x65, 0x6e, 0x74, 0x22, 0x08, 0xe2, 0x86, 0x04, 0x04, 0x08, 0x02, 0x10, 0x09, 0x30, 0x01, 0x42, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72,
0x9a, 0x01, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x08, 0xe2, 0x86, 0x04, 0x04, 0x08,
0x65, 0x42, 0x0e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x02, 0x10, 0x09, 0x30, 0x01, 0x42, 0x9a, 0x01, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75,
0x6f, 0x50, 0x01, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x62, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f,
0x62, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x69,
0xaa, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x72, 0x69, 0x62, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xe2, 0x02,
0xea, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65,
0x6f, 0x74, 0x6f, 0x33, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

View File

@ -106,6 +106,9 @@ enum Topic {
// JWTProvider topic contains events for changes to jwt-provider // JWTProvider topic contains events for changes to jwt-provider
JWTProvider = 16; JWTProvider = 16;
// ExportedServices topic contains events for changes to exported-services.
ExportedServices = 17;
} }
message NamedSubject { message NamedSubject {