* added Sameness Group to proto files (#16998)

- added Sameness Group to config entries
- added Sameness Group to subscriptions

* generated proto files

* added Sameness Group events to the state store
- added test cases

* Refactored health RPC Client
- moved code that is common to rpcclient under rpcclient common.go. This will help set us up to support future RPC clients

* Refactored proxycfg glue views
- Moved views to rpcclient config entry. This will allow us to reuse this code for a config entry client

* added config entry RPC Client
- Copied most of the testing code from rpcclient/health

* hooked up new rpcclient in agent

* fixed documentation and comments for clarity
This commit is contained in:
Michael Wilkerson 2023-04-14 09:24:46 -07:00 committed by GitHub
parent 79d4040b6c
commit 0dd4ea2033
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 2419 additions and 1426 deletions

View File

@ -22,6 +22,8 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
@ -384,7 +386,8 @@ type Agent struct {
// TODO: pass directly to HTTPHandlers and DNSServer once those are passed
// into Agent, which will allow us to remove this field.
rpcClientHealth *health.Client
rpcClientHealth *health.Client
rpcClientConfigEntry *configentry.Client
rpcClientPeering pbpeering.PeeringServiceClient
@ -462,22 +465,37 @@ func New(bd BaseDeps) (*Agent, error) {
}
a.rpcClientHealth = &health.Client{
Cache: bd.Cache,
NetRPC: &a,
CacheName: cachetype.HealthServicesName,
ViewStore: bd.ViewStore,
MaterializerDeps: health.MaterializerDeps{
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
Client: rpcclient.Client{
Cache: bd.Cache,
NetRPC: &a,
CacheName: cachetype.HealthServicesName,
ViewStore: bd.ViewStore,
MaterializerDeps: rpcclient.MaterializerDeps{
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
},
UseStreamingBackend: a.config.UseStreamingBackend,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(a.config),
},
UseStreamingBackend: a.config.UseStreamingBackend,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(a.config),
}
a.rpcClientPeering = pbpeering.NewPeeringServiceClient(conn)
a.rpcClientOperator = pboperator.NewOperatorServiceClient(conn)
a.serviceManager = NewServiceManager(&a)
a.rpcClientConfigEntry = &configentry.Client{
Client: rpcclient.Client{
Cache: bd.Cache,
NetRPC: &a,
CacheName: cachetype.ConfigEntryName,
ViewStore: bd.ViewStore,
MaterializerDeps: rpcclient.MaterializerDeps{
Conn: conn,
Logger: bd.Logger.Named("rpcclient.configentry"),
},
QueryOptionDefaults: config.ApplyDefaultQueryOptions(a.config),
},
}
// We used to do this in the Start method. However it doesn't need to go
// there any longer. Originally it did because we passed the agent
@ -1662,6 +1680,7 @@ func (a *Agent) ShutdownAgent() error {
}
a.rpcClientHealth.Close()
a.rpcClientConfigEntry.Close()
// Shutdown SCADA provider
if a.scadaProvider != nil {

View File

@ -421,6 +421,11 @@ func (c *FSM) registerStreamSnapshotHandlers() {
return c.State().IPRateLimiterSnapshot(req, buf)
}, true)
panicIfErr(err)
err = c.deps.Publisher.RegisterHandler(state.EventTopicSamenessGroup, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().SamenessGroupSnapshot(req, buf)
}, true)
panicIfErr(err)
}
func panicIfErr(err error) {

View File

@ -26,6 +26,7 @@ var configEntryKindToTopic = map[string]stream.Topic{
structs.InlineCertificate: EventTopicInlineCertificate,
structs.BoundAPIGateway: EventTopicBoundAPIGateway,
structs.RateLimitIPConfig: EventTopicIPRateLimit,
structs.SamenessGroup: EventTopicSamenessGroup,
}
// EventSubjectConfigEntry is a stream.Subject used to route and receive events
@ -162,6 +163,12 @@ func (s *Store) IPRateLimiterSnapshot(req stream.SubscribeRequest, buf stream.Sn
return s.configEntrySnapshot(structs.RateLimitIPConfig, req, buf)
}
// SamenessGroupSnapshot is a stream.SnapshotFunc that returns a snapshot of
// "sameness-group" config entries.
func (s *Store) SamenessGroupSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.SamenessGroup, req, buf)
}
func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
var (
idx uint64

View File

@ -46,7 +46,7 @@ func PBToStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta acl.E
case EventTopicMeshConfig, EventTopicServiceResolver, EventTopicIngressGateway,
EventTopicServiceIntentions, EventTopicServiceDefaults, EventTopicAPIGateway,
EventTopicTCPRoute, EventTopicHTTPRoute, EventTopicInlineCertificate,
EventTopicBoundAPIGateway:
EventTopicBoundAPIGateway, EventTopicSamenessGroup:
subject = EventSubjectConfigEntry{
Name: named.Key,
EnterpriseMeta: &entMeta,

View File

@ -84,6 +84,32 @@ func TestPBToStreamSubscribeRequest(t *testing.T) {
},
err: nil,
},
"Sameness Group": {
req: &pbsubscribe.SubscribeRequest{
Topic: EventTopicSamenessGroup,
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{
NamedSubject: &pbsubscribe.NamedSubject{
Key: "sg",
Namespace: "consul",
Partition: "partition",
PeerName: "peer",
},
},
Token: aclToken,
Index: 2,
},
entMeta: acl.EnterpriseMeta{},
expectedSubscribeRequest: &stream.SubscribeRequest{
Topic: EventTopicSamenessGroup,
Subject: EventSubjectConfigEntry{
Name: "sg",
EnterpriseMeta: &acl.EnterpriseMeta{},
},
Token: aclToken,
Index: 2,
},
err: nil,
},
"Config": {
req: &pbsubscribe.SubscribeRequest{
Topic: EventTopicAPIGateway,

View File

@ -210,6 +210,7 @@ var (
EventTopicInlineCertificate = pbsubscribe.Topic_InlineCertificate
EventTopicBoundAPIGateway = pbsubscribe.Topic_BoundAPIGateway
EventTopicIPRateLimit = pbsubscribe.Topic_IPRateLimit
EventTopicSamenessGroup = pbsubscribe.Topic_SamenessGroup
)
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {

View File

@ -776,54 +776,63 @@ func (d *DNSServer) dispatch(remoteAddr net.Addr, req, resp *dns.Msg, maxRecursi
return invalid()
}
locality, ok := d.parseLocality(querySuffixes, cfg)
if !ok {
return invalid()
localities, err := d.parseSamenessGroupLocality(cfg, querySuffixes, invalid)
if err != nil {
return err
}
lookup := serviceLookup{
Datacenter: locality.effectiveDatacenter(d.agent.config.Datacenter),
PeerName: locality.peer,
Connect: false,
Ingress: false,
MaxRecursionLevel: maxRecursionLevel,
EnterpriseMeta: locality.EnterpriseMeta,
}
// Only one of dc or peer can be used.
if lookup.PeerName != "" {
lookup.Datacenter = ""
}
// Loop over the localities and return as soon as a lookup is successful
for _, locality := range localities {
d.logger.Debug("labels", "querySuffixes", querySuffixes)
// Support RFC 2782 style syntax
if n == 2 && strings.HasPrefix(queryParts[1], "_") && strings.HasPrefix(queryParts[0], "_") {
// Grab the tag since we make nuke it if it's tcp
tag := queryParts[1][1:]
// Treat _name._tcp.service.consul as a default, no need to filter on that tag
if tag == "tcp" {
tag = ""
lookup := serviceLookup{
Datacenter: locality.effectiveDatacenter(d.agent.config.Datacenter),
PeerName: locality.peer,
Connect: false,
Ingress: false,
MaxRecursionLevel: maxRecursionLevel,
EnterpriseMeta: locality.EnterpriseMeta,
}
// Only one of dc or peer can be used.
if lookup.PeerName != "" {
lookup.Datacenter = ""
}
lookup.Tag = tag
lookup.Service = queryParts[0][1:]
// _name._tag.service.consul
return d.serviceLookup(cfg, lookup, req, resp)
// Support RFC 2782 style syntax
if n == 2 && strings.HasPrefix(queryParts[1], "_") && strings.HasPrefix(queryParts[0], "_") {
// Grab the tag since we make nuke it if it's tcp
tag := queryParts[1][1:]
// Treat _name._tcp.service.consul as a default, no need to filter on that tag
if tag == "tcp" {
tag = ""
}
lookup.Tag = tag
lookup.Service = queryParts[0][1:]
// _name._tag.service.consul
} else {
// Consul 0.3 and prior format for SRV queries
// Support "." in the label, re-join all the parts
tag := ""
if n >= 2 {
tag = strings.Join(queryParts[:n-1], ".")
}
lookup.Tag = tag
lookup.Service = queryParts[n-1]
// tag[.tag].name.service.consul
}
err = d.serviceLookup(cfg, lookup, req, resp)
// Return if we are error free right away, otherwise loop again if we can
if err == nil {
return nil
}
}
// Consul 0.3 and prior format for SRV queries
// Support "." in the label, re-join all the parts
tag := ""
if n >= 2 {
tag = strings.Join(queryParts[:n-1], ".")
}
lookup.Tag = tag
lookup.Service = queryParts[n-1]
// tag[.tag].name.service.consul
return d.serviceLookup(cfg, lookup, req, resp)
// We've exhausted all DNS possibilities so return here
return err
case "connect":
if len(queryParts) < 1 {
return invalid()

View File

@ -57,6 +57,17 @@ func (d *DNSServer) parseLocality(labels []string, cfg *dnsConfig) (queryLocalit
return queryLocality{}, false
}
type querySameness struct{}
// parseSamenessGroupLocality wraps parseLocality in OSS
func (d *DNSServer) parseSamenessGroupLocality(cfg *dnsConfig, labels []string, errfnc func() error) ([]queryLocality, error) {
locality, ok := d.parseLocality(labels, cfg)
if !ok {
return nil, errfnc()
}
return []queryLocality{locality}, nil
}
func serviceCanonicalDNSName(name, kind, datacenter, domain string, _ *acl.EnterpriseMeta) string {
return fmt.Sprintf("%s.%s.%s.%s", name, kind, datacenter, domain)
}

View File

@ -7,14 +7,12 @@ import (
"context"
"fmt"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
@ -78,7 +76,7 @@ func newConfigEntryRequest(req *structs.ConfigEntryQuery, deps ServerDataSourceD
case structs.RateLimitIPConfig:
topic = pbsubscribe.Topic_IPRateLimit
default:
return nil, fmt.Errorf("cannot map config entry kind: %s to a topic", req.Kind)
return nil, fmt.Errorf("cannot map config entry kind: %q to a topic", req.Kind)
}
return &configEntryRequest{
topic: topic,
@ -98,9 +96,9 @@ func (r *configEntryRequest) CacheInfo() cache.RequestInfo { return r.req.CacheI
func (r *configEntryRequest) NewMaterializer() (submatview.Materializer, error) {
var view submatview.View
if r.req.Name == "" {
view = newConfigEntryListView(r.req.Kind, r.req.EnterpriseMeta)
view = configentry.NewConfigEntryListView(r.req.Kind, r.req.EnterpriseMeta)
} else {
view = &configEntryView{}
view = &configentry.ConfigEntryView{}
}
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
@ -140,120 +138,3 @@ func (r *configEntryRequest) Request(index uint64) *pbsubscribe.SubscribeRequest
return req
}
// configEntryView implements a submatview.View for a single config entry.
type configEntryView struct {
state structs.ConfigEntry
}
func (v *configEntryView) Reset() {
v.state = nil
}
func (v *configEntryView) Result(index uint64) any {
return &structs.ConfigEntryResponse{
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
Entry: v.state,
}
}
func (v *configEntryView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
update := event.GetConfigEntry()
if update == nil {
continue
}
switch update.Op {
case pbsubscribe.ConfigEntryUpdate_Delete:
v.state = nil
case pbsubscribe.ConfigEntryUpdate_Upsert:
v.state = pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
}
}
return nil
}
// configEntryListView implements a submatview.View for a list of config entries
// that are all of the same kind (name is treated as unique).
type configEntryListView struct {
kind string
entMeta acl.EnterpriseMeta
state map[string]structs.ConfigEntry
}
func newConfigEntryListView(kind string, entMeta acl.EnterpriseMeta) *configEntryListView {
view := &configEntryListView{kind: kind, entMeta: entMeta}
view.Reset()
return view
}
func (v *configEntryListView) Reset() {
v.state = make(map[string]structs.ConfigEntry)
}
func (v *configEntryListView) Result(index uint64) any {
entries := make([]structs.ConfigEntry, 0, len(v.state))
for _, entry := range v.state {
entries = append(entries, entry)
}
return &structs.IndexedConfigEntries{
Kind: v.kind,
Entries: entries,
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
}
}
func (v *configEntryListView) Update(events []*pbsubscribe.Event) error {
for _, event := range filterByEnterpriseMeta(events, v.entMeta) {
update := event.GetConfigEntry()
configEntry := pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
name := structs.NewServiceName(configEntry.GetName(), configEntry.GetEnterpriseMeta()).String()
switch update.Op {
case pbsubscribe.ConfigEntryUpdate_Delete:
delete(v.state, name)
case pbsubscribe.ConfigEntryUpdate_Upsert:
v.state[name] = configEntry
}
}
return nil
}
// filterByEnterpriseMeta filters the given set of events to remove those that
// don't match the request's enterprise meta - this is necessary because when
// subscribing to a topic with SubjectWildcard we'll get events for resources
// in all partitions and namespaces.
func filterByEnterpriseMeta(events []*pbsubscribe.Event, entMeta acl.EnterpriseMeta) []*pbsubscribe.Event {
partition := entMeta.PartitionOrDefault()
namespace := entMeta.NamespaceOrDefault()
filtered := make([]*pbsubscribe.Event, 0, len(events))
for _, event := range events {
var eventEntMeta *pbcommon.EnterpriseMeta
switch payload := event.Payload.(type) {
case *pbsubscribe.Event_ConfigEntry:
eventEntMeta = payload.ConfigEntry.ConfigEntry.GetEnterpriseMeta()
case *pbsubscribe.Event_Service:
eventEntMeta = payload.Service.GetEnterpriseMeta()
default:
continue
}
if partition != acl.WildcardName && !acl.EqualPartitions(partition, eventEntMeta.GetPartition()) {
continue
}
if namespace != acl.WildcardName && !acl.EqualNamespaces(namespace, eventEntMeta.GetNamespace()) {
continue
}
filtered = append(filtered, event)
}
return filtered
}

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
@ -164,7 +165,7 @@ func (r intentionsRequest) NewMaterializer() (submatview.Materializer, error) {
Backend: r.deps.EventPublisher,
ACLResolver: r.deps.ACLResolver,
Deps: submatview.Deps{
View: &configEntryView{},
View: &configentry.ConfigEntryView{},
Logger: r.deps.Logger,
Request: r.Request,
},

View File

@ -125,3 +125,35 @@ func (v *serviceListView) Result(index uint64) any {
},
}
}
// filterByEnterpriseMeta filters the given set of events to remove those that
// don't match the request's enterprise meta - this is necessary because when
// subscribing to a topic with SubjectWildcard we'll get events for resources
// in all partitions and namespaces.
func filterByEnterpriseMeta(events []*pbsubscribe.Event, entMeta acl.EnterpriseMeta) []*pbsubscribe.Event {
partition := entMeta.PartitionOrDefault()
namespace := entMeta.NamespaceOrDefault()
filtered := make([]*pbsubscribe.Event, 0, len(events))
for _, event := range events {
var eventEntMeta *pbcommon.EnterpriseMeta
switch payload := event.Payload.(type) {
case *pbsubscribe.Event_ConfigEntry:
eventEntMeta = payload.ConfigEntry.ConfigEntry.GetEnterpriseMeta()
case *pbsubscribe.Event_Service:
eventEntMeta = payload.Service.GetEnterpriseMeta()
default:
continue
}
if partition != acl.WildcardName && !acl.EqualPartitions(partition, eventEntMeta.GetPartition()) {
continue
}
if namespace != acl.WildcardName && !acl.EqualNamespaces(namespace, eventEntMeta.GetNamespace()) {
continue
}
filtered = append(filtered, event)
}
return filtered
}

55
agent/rpcclient/common.go Normal file
View File

@ -0,0 +1,55 @@
package rpcclient
import (
"context"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
)
// NetRPC reprents an interface for making RPC requests
type NetRPC interface {
RPC(ctx context.Context, method string, args interface{}, reply interface{}) error
}
// CacheGetter represents an interface for interacting with the cache
type CacheGetter interface {
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
NotifyCallback(ctx context.Context, t string, r cache.Request, cID string, cb cache.Callback) error
}
// MaterializedViewStore represents an interface for interacting with the material view store
type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error
}
// MaterializerDeps include the dependencies for the materializer
type MaterializerDeps struct {
Conn *grpc.ClientConn
Logger hclog.Logger
}
// Client represents a rpc client, a new Client is created in each sub-package
// embedding this object. Methods are therefore implemented in the subpackages
// as well.
type Client struct {
NetRPC NetRPC
Cache CacheGetter
ViewStore MaterializedViewStore
MaterializerDeps MaterializerDeps
CacheName string
UseStreamingBackend bool
QueryOptionDefaults func(options *structs.QueryOptions)
}
// Close any underlying connections used by the client.
func (c *Client) Close() error {
if c == nil {
return nil
}
return c.MaterializerDeps.Conn.Close()
}

View File

@ -0,0 +1,178 @@
package configentry
import (
"context"
"fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
// Client provides access to config entry data.
type Client struct {
rpcclient.Client
}
// GetSamenessGroup returns the sameness group config entry (if possible) given the
// provided config entry query
func (c *Client) GetSamenessGroup(
ctx context.Context,
req *structs.ConfigEntryQuery,
) (structs.SamenessGroupConfigEntry, cache.ResultMeta, error) {
if req.Kind != structs.SamenessGroup {
return structs.SamenessGroupConfigEntry{}, cache.ResultMeta{}, fmt.Errorf("wrong kind in query %s, expected %s", req.Kind, structs.SamenessGroup)
}
out, meta, err := c.GetConfigEntry(ctx, req)
if err != nil {
return structs.SamenessGroupConfigEntry{}, cache.ResultMeta{}, err
}
sg, ok := out.Entry.(*structs.SamenessGroupConfigEntry)
if !ok {
return structs.SamenessGroupConfigEntry{}, cache.ResultMeta{}, fmt.Errorf("%s config entry with name %s not found", structs.SamenessGroup, req.Name)
}
return *sg, meta, nil
}
// GetConfigEntry returns the config entry (if possible) given the
// provided config entry query
func (c *Client) GetConfigEntry(
ctx context.Context,
req *structs.ConfigEntryQuery,
) (structs.ConfigEntryResponse, cache.ResultMeta, error) {
if c.UseStreamingBackend && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
c.QueryOptionDefaults(&req.QueryOptions)
cfgReq, err := c.newConfigEntryRequest(req)
if err != nil {
return structs.ConfigEntryResponse{}, cache.ResultMeta{}, err
}
result, err := c.ViewStore.Get(ctx, cfgReq)
if err != nil {
return structs.ConfigEntryResponse{}, cache.ResultMeta{}, err
}
meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached}
return *result.Value.(*structs.ConfigEntryResponse), meta, err
}
out, md, err := c.getConfigEntryRPC(ctx, req)
if err != nil {
return out, md, err
}
if req.QueryOptions.AllowStale && req.QueryOptions.MaxStaleDuration > 0 && out.LastContact > req.MaxStaleDuration {
req.AllowStale = false
err := c.NetRPC.RPC(ctx, "ConfigEntry.Get", &req, &out)
return out, cache.ResultMeta{}, err
}
return out, md, err
}
func (c *Client) getConfigEntryRPC(
ctx context.Context,
req *structs.ConfigEntryQuery,
) (structs.ConfigEntryResponse, cache.ResultMeta, error) {
var out structs.ConfigEntryResponse
if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC(context.Background(), "ConfigEntry.Get", req, &out)
return out, cache.ResultMeta{}, err
}
raw, md, err := c.Cache.Get(ctx, c.CacheName, req)
if err != nil {
return out, md, err
}
value, ok := raw.(*structs.ConfigEntryResponse)
if !ok {
panic("wrong response type for cachetype.HealthServicesName")
}
return *value, md, nil
}
var _ submatview.Request = (*configEntryRequest)(nil)
type configEntryRequest struct {
Topic pbsubscribe.Topic
req *structs.ConfigEntryQuery
deps rpcclient.MaterializerDeps
}
func (c *Client) newConfigEntryRequest(req *structs.ConfigEntryQuery) (*configEntryRequest, error) {
var topic pbsubscribe.Topic
switch req.Kind {
case structs.SamenessGroup:
topic = pbsubscribe.Topic_SamenessGroup
default:
return nil, fmt.Errorf("cannot map config entry kind: %q to a topic", req.Kind)
}
return &configEntryRequest{
Topic: topic,
req: req,
deps: c.MaterializerDeps,
}, nil
}
// CacheInfo returns information used for caching the config entry request.
func (r *configEntryRequest) CacheInfo() cache.RequestInfo {
return r.req.CacheInfo()
}
// Type returns a string which uniquely identifies the config entry of request.
// The returned value is used as the prefix of the key used to index
// entries in the Store.
func (r *configEntryRequest) Type() string {
return "agent.rpcclient.configentry.configentryrequest"
}
// Request creates a new pbsubscribe.SubscribeRequest for a config entry including
// wildcards and enterprise fields
func (r *configEntryRequest) Request(index uint64) *pbsubscribe.SubscribeRequest {
req := &pbsubscribe.SubscribeRequest{
Topic: r.Topic,
Index: index,
Datacenter: r.req.Datacenter,
Token: r.req.QueryOptions.Token,
}
if name := r.req.Name; name == "" {
req.Subject = &pbsubscribe.SubscribeRequest_WildcardSubject{
WildcardSubject: true,
}
} else {
req.Subject = &pbsubscribe.SubscribeRequest_NamedSubject{
NamedSubject: &pbsubscribe.NamedSubject{
Key: name,
Partition: r.req.PartitionOrDefault(),
Namespace: r.req.NamespaceOrDefault(),
},
}
}
return req
}
// NewMaterializer will be called if there is no active materializer to fulfill
// the request. It returns a Materializer appropriate for streaming
// data to fulfil the config entry request.
func (r *configEntryRequest) NewMaterializer() (submatview.Materializer, error) {
var view submatview.View
if r.req.Name == "" {
view = NewConfigEntryListView(r.req.Kind, r.req.EnterpriseMeta)
} else {
view = &ConfigEntryView{}
}
deps := submatview.Deps{
View: view,
Logger: r.deps.Logger,
Request: r.Request,
}
return submatview.NewRPCMaterializer(pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn), deps), nil
}

View File

@ -0,0 +1,265 @@
package configentry
import (
"context"
"errors"
"testing"
"time"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
)
func TestClient_SamenessGroup_BackendRouting(t *testing.T) {
type testCase struct {
name string
req structs.ConfigEntryQuery
useStreamingBackend bool
expected func(t *testing.T, c *Client, err error)
}
run := func(t *testing.T, tc testCase) {
c := &Client{
Client: rpcclient.Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{configEntryType: structs.SamenessGroup, configEntryName: "sg1"},
ViewStore: &fakeViewStore{configEntryType: structs.SamenessGroup, configEntryName: "sg1"},
CacheName: "cache-no-streaming",
UseStreamingBackend: tc.useStreamingBackend,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{}),
},
}
_, _, err := c.GetSamenessGroup(context.Background(), &tc.req)
tc.expected(t, c, err)
}
var testCases = []testCase{
{
name: "rpc by default",
req: structs.ConfigEntryQuery{
Kind: structs.SamenessGroup,
Name: "sg1",
Datacenter: "dc1",
},
useStreamingBackend: true,
expected: useRPC,
},
{
name: "use streaming instead of cache",
req: structs.ConfigEntryQuery{
Kind: structs.SamenessGroup,
Name: "sg1",
QueryOptions: structs.QueryOptions{UseCache: true},
Datacenter: "dc1",
},
useStreamingBackend: true,
expected: useStreaming,
},
{
name: "use streaming for MinQueryIndex",
req: structs.ConfigEntryQuery{
Kind: structs.SamenessGroup,
Name: "sg1",
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
},
useStreamingBackend: true,
expected: useStreaming,
},
{
name: "use cache",
req: structs.ConfigEntryQuery{
Kind: structs.SamenessGroup,
Name: "sg1",
Datacenter: "dc1",
QueryOptions: structs.QueryOptions{UseCache: true},
},
useStreamingBackend: false,
expected: useCache,
},
{
name: "wrong kind error",
req: structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
},
expected: expectError,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func TestClient_SamenessGroup_SetsDefaults(t *testing.T) {
store := &fakeViewStore{}
c := &Client{
Client: rpcclient.Client{
ViewStore: store,
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{
MaxQueryTime: 200 * time.Second,
DefaultQueryTime: 100 * time.Second,
}),
},
}
req := structs.ConfigEntryQuery{
Datacenter: "dc1",
Kind: structs.SamenessGroup,
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
}
_, _, err := c.GetConfigEntry(context.Background(), &req)
require.NoError(t, err)
require.Len(t, store.calls, 1)
require.Equal(t, 100*time.Second, store.calls[0].CacheInfo().Timeout)
}
func useRPC(t *testing.T, c *Client, err error) {
t.Helper()
require.NoError(t, err)
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, cache.calls, 0)
require.Len(t, store.calls, 0)
require.Equal(t, []string{"ConfigEntry.Get"}, rpc.calls)
}
func useStreaming(t *testing.T, c *Client, err error) {
t.Helper()
require.NoError(t, err)
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, cache.calls, 0)
require.Len(t, rpc.calls, 0)
require.Len(t, store.calls, 1)
}
func useCache(t *testing.T, c *Client, err error) {
t.Helper()
require.NoError(t, err)
rpc, ok := c.NetRPC.(*fakeNetRPC)
require.True(t, ok, "test setup error, expected *fakeNetRPC, got %T", c.NetRPC)
cache, ok := c.Cache.(*fakeCache)
require.True(t, ok, "test setup error, expected *fakeCache, got %T", c.Cache)
store, ok := c.ViewStore.(*fakeViewStore)
require.True(t, ok, "test setup error, expected *fakeViewSTore, got %T", c.ViewStore)
require.Len(t, rpc.calls, 0)
require.Len(t, store.calls, 0)
require.Equal(t, []string{"cache-no-streaming"}, cache.calls)
}
func expectError(t *testing.T, _ *Client, err error) {
t.Helper()
require.Error(t, err)
}
var _ rpcclient.CacheGetter = (*fakeCache)(nil)
type fakeCache struct {
calls []string
configEntryType string
configEntryName string
}
func (f *fakeCache) Get(_ context.Context, t string, _ cache.Request) (interface{}, cache.ResultMeta, error) {
f.calls = append(f.calls, t)
result := &structs.ConfigEntryResponse{}
switch f.configEntryType {
case structs.SamenessGroup:
result = &structs.ConfigEntryResponse{
Entry: &structs.SamenessGroupConfigEntry{
Name: f.configEntryName,
},
}
}
return result, cache.ResultMeta{}, nil
}
func (f *fakeCache) NotifyCallback(_ context.Context, t string, _ cache.Request, _ string, _ cache.Callback) error {
f.calls = append(f.calls, t)
return nil
}
var _ rpcclient.NetRPC = (*fakeNetRPC)(nil)
type fakeNetRPC struct {
calls []string
}
func (f *fakeNetRPC) RPC(ctx context.Context, method string, req interface{}, out interface{}) error {
f.calls = append(f.calls, method)
r, ok := req.(*structs.ConfigEntryQuery)
if !ok {
return errors.New("not a config entry query")
}
switch r.Kind {
case structs.SamenessGroup:
resp := &structs.ConfigEntryResponse{
Entry: &structs.SamenessGroupConfigEntry{
Name: r.Name,
},
}
*out.(*structs.ConfigEntryResponse) = *resp
}
return nil
}
var _ rpcclient.MaterializedViewStore = (*fakeViewStore)(nil)
type fakeViewStore struct {
calls []submatview.Request
configEntryType string
configEntryName string
}
func (f *fakeViewStore) Get(_ context.Context, req submatview.Request) (submatview.Result, error) {
f.calls = append(f.calls, req)
switch f.configEntryType {
case structs.SamenessGroup:
return submatview.Result{Value: &structs.ConfigEntryResponse{
Entry: &structs.SamenessGroupConfigEntry{
Name: f.configEntryName,
},
}}, nil
default:
return submatview.Result{Value: &structs.ConfigEntryResponse{}}, nil
}
}
func (f *fakeViewStore) NotifyCallback(_ context.Context, req submatview.Request, _ string, _ cache.Callback) error {
f.calls = append(f.calls, req)
return nil
}

View File

@ -0,0 +1,138 @@
package configentry
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
var _ submatview.View = (*ConfigEntryView)(nil)
// ConfigEntryView implements a submatview.View for a single config entry.
type ConfigEntryView struct {
state structs.ConfigEntry
}
// Reset resets the state to nil for the ConfigEntryView
func (v *ConfigEntryView) Reset() {
v.state = nil
}
// Result returns the structs.ConfigEntryResponse stored by this view.
func (v *ConfigEntryView) Result(index uint64) any {
return &structs.ConfigEntryResponse{
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
Entry: v.state,
}
}
// Update updates the state containing a config entry based on events
func (v *ConfigEntryView) Update(events []*pbsubscribe.Event) error {
for _, event := range events {
update := event.GetConfigEntry()
if update == nil {
continue
}
switch update.Op {
case pbsubscribe.ConfigEntryUpdate_Delete:
v.state = nil
case pbsubscribe.ConfigEntryUpdate_Upsert:
v.state = pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
}
}
return nil
}
var _ submatview.View = (*ConfigEntryListView)(nil)
// ConfigEntryListView implements a submatview.View for a list of config entries
// that are all of the same kind (name is treated as unique).
type ConfigEntryListView struct {
kind string
entMeta acl.EnterpriseMeta
state map[string]structs.ConfigEntry
}
// NewConfigEntryListView contructs a ConfigEntryListView based on the enterprise meta data and the kind
func NewConfigEntryListView(kind string, entMeta acl.EnterpriseMeta) *ConfigEntryListView {
view := &ConfigEntryListView{kind: kind, entMeta: entMeta}
view.Reset()
return view
}
// Reset resets the states of the list view to an empty map of Config Entries
func (v *ConfigEntryListView) Reset() {
v.state = make(map[string]structs.ConfigEntry)
}
// Result returns the structs.IndexedConfigEntries stored by this view.
func (v *ConfigEntryListView) Result(index uint64) any {
entries := make([]structs.ConfigEntry, 0, len(v.state))
for _, entry := range v.state {
entries = append(entries, entry)
}
return &structs.IndexedConfigEntries{
Kind: v.kind,
Entries: entries,
QueryMeta: structs.QueryMeta{
Index: index,
Backend: structs.QueryBackendStreaming,
},
}
}
// Update updates the states containing a config entry based on events
func (v *ConfigEntryListView) Update(events []*pbsubscribe.Event) error {
for _, event := range filterByEnterpriseMeta(events, v.entMeta) {
update := event.GetConfigEntry()
configEntry := pbconfigentry.ConfigEntryToStructs(update.ConfigEntry)
name := structs.NewServiceName(configEntry.GetName(), configEntry.GetEnterpriseMeta()).String()
switch update.Op {
case pbsubscribe.ConfigEntryUpdate_Delete:
delete(v.state, name)
case pbsubscribe.ConfigEntryUpdate_Upsert:
v.state[name] = configEntry
}
}
return nil
}
// filterByEnterpriseMeta filters the given set of events to remove those that
// don't match the request's enterprise meta - this is necessary because when
// subscribing to a topic with SubjectWildcard we'll get events for resources
// in all partitions and namespaces.
func filterByEnterpriseMeta(events []*pbsubscribe.Event, entMeta acl.EnterpriseMeta) []*pbsubscribe.Event {
partition := entMeta.PartitionOrDefault()
namespace := entMeta.NamespaceOrDefault()
filtered := make([]*pbsubscribe.Event, 0, len(events))
for _, event := range events {
var eventEntMeta *pbcommon.EnterpriseMeta
switch payload := event.Payload.(type) {
case *pbsubscribe.Event_ConfigEntry:
eventEntMeta = payload.ConfigEntry.ConfigEntry.GetEnterpriseMeta()
case *pbsubscribe.Event_Service:
eventEntMeta = payload.Service.GetEnterpriseMeta()
default:
continue
}
if partition != acl.WildcardName && !acl.EqualPartitions(partition, eventEntMeta.GetPartition()) {
continue
}
if namespace != acl.WildcardName && !acl.EqualNamespaces(namespace, eventEntMeta.GetNamespace()) {
continue
}
filtered = append(filtered, event)
}
return filtered
}

View File

@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package proxycfgglue
package configentry
import (
"testing"
@ -18,7 +18,7 @@ import (
func TestConfigEntryView(t *testing.T) {
const index uint64 = 123
view := &configEntryView{}
view := &ConfigEntryView{}
testutil.RunStep(t, "initial state", func(t *testing.T) {
result := view.Result(index)
@ -102,7 +102,7 @@ func TestConfigEntryView(t *testing.T) {
func TestConfigEntryListView(t *testing.T) {
const index uint64 = 123
view := newConfigEntryListView(structs.ServiceResolver, *acl.DefaultEnterpriseMeta())
view := NewConfigEntryListView(structs.ServiceResolver, *acl.DefaultEnterpriseMeta())
testutil.RunStep(t, "initial state", func(t *testing.T) {
result := view.Result(index)
@ -184,3 +184,9 @@ func TestConfigEntryListView(t *testing.T) {
require.Equal(t, "db", serviceResolver.Name)
})
}
func TestConfigEntryListView_Reset(t *testing.T) {
view := &ConfigEntryView{state: &structs.SamenessGroupConfigEntry{}}
view.Reset()
require.Nil(t, view.state)
}

View File

@ -6,6 +6,7 @@ package health
import (
"context"
"github.com/hashicorp/consul/agent/rpcclient"
"google.golang.org/grpc/connectivity"
"github.com/hashicorp/consul/agent/cache"
@ -16,27 +17,7 @@ import (
// Client provides access to service health data.
type Client struct {
NetRPC NetRPC
Cache CacheGetter
ViewStore MaterializedViewStore
MaterializerDeps MaterializerDeps
CacheName string
UseStreamingBackend bool
QueryOptionDefaults func(options *structs.QueryOptions)
}
type NetRPC interface {
RPC(ctx context.Context, method string, args interface{}, reply interface{}) error
}
type CacheGetter interface {
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
NotifyCallback(ctx context.Context, t string, r cache.Request, cID string, cb cache.Callback) error
}
type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
NotifyCallback(ctx context.Context, req submatview.Request, cID string, cb cache.Callback) error
rpcclient.Client
}
// IsReadyForStreaming will indicate if the underlying gRPC connection is ready.
@ -129,17 +110,11 @@ func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRe
}
}
// Close any underlying connections used by the client.
func (c *Client) Close() error {
if c == nil {
return nil
}
return c.MaterializerDeps.Conn.Close()
}
var _ submatview.Request = (*serviceRequest)(nil)
type serviceRequest struct {
structs.ServiceSpecificRequest
deps MaterializerDeps
deps rpcclient.MaterializerDeps
}
func (r serviceRequest) CacheInfo() cache.RequestInfo {

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
@ -25,12 +26,14 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
run := func(t *testing.T, tc testCase) {
c := &Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{}),
Client: rpcclient.Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{}),
},
}
_, _, err := c.ServiceNodes(context.Background(), tc.req)
@ -202,11 +205,13 @@ func TestClient_Notify_BackendRouting(t *testing.T) {
run := func(t *testing.T, tc testCase) {
c := &Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
Client: rpcclient.Client{
NetRPC: &fakeNetRPC{},
Cache: &fakeCache{},
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
},
}
err := c.Notify(context.Background(), tc.req, "cid", nil)
@ -253,13 +258,15 @@ func TestClient_Notify_BackendRouting(t *testing.T) {
func TestClient_ServiceNodes_SetsDefaults(t *testing.T) {
store := &fakeViewStore{}
c := &Client{
ViewStore: store,
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{
MaxQueryTime: 200 * time.Second,
DefaultQueryTime: 100 * time.Second,
}),
Client: rpcclient.Client{
ViewStore: store,
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{
MaxQueryTime: 200 * time.Second,
DefaultQueryTime: 100 * time.Second,
}),
},
}
req := structs.ServiceSpecificRequest{

View File

@ -10,20 +10,13 @@ import (
"sort"
"strings"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/private/pbservice"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
"github.com/hashicorp/go-bexpr"
)
type MaterializerDeps struct {
Conn *grpc.ClientConn
Logger hclog.Logger
}
func NewMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index uint64) *pbsubscribe.SubscribeRequest {
return func(index uint64) *pbsubscribe.SubscribeRequest {
req := &pbsubscribe.SubscribeRequest{
@ -60,6 +53,8 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
}, nil
}
var _ submatview.View = (*HealthView)(nil)
// HealthView implements submatview.View for storing the view state
// of a service health result. We store it as a map to make updates and
// deletions a little easier but we could just store a result type

View File

@ -980,3 +980,12 @@ func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) {
require.Equal(t, "127.0.0.1", node.Nodes[0].Service.Address)
require.Equal(t, 8443, node.Nodes[0].Service.Port)
}
func TestConfigEntryListView_Reset(t *testing.T) {
emptyMap := make(map[string]structs.CheckServiceNode)
view := &HealthView{state: map[string]structs.CheckServiceNode{
"test": {},
}}
view.Reset()
require.Equal(t, emptyMap, view.state)
}

View File

@ -110,7 +110,7 @@ func (s *Store) Run(ctx context.Context) {
// fields are ignored (ex: MaxAge, and MustRevalidate).
type Request interface {
cache.Request
// NewMaterializer will be called if there is no active materializer to fulfil
// NewMaterializer will be called if there is no active materializer to fulfill
// the request. It should return a Materializer appropriate for streaming
// data to fulfil this request.
NewMaterializer() (Materializer, error)

View File

@ -16,6 +16,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
@ -303,11 +304,13 @@ func newConsumer(t *testing.T, addr net.Addr, store *submatview.Store, srv strin
require.NoError(t, err)
c := &health.Client{
UseStreamingBackend: true,
ViewStore: store,
MaterializerDeps: health.MaterializerDeps{
Conn: conn,
Logger: hclog.New(nil),
Client: rpcclient.Client{
UseStreamingBackend: true,
ViewStore: store,
MaterializerDeps: rpcclient.MaterializerDeps{
Conn: conn,
Logger: hclog.New(nil),
},
},
}

View File

@ -1208,6 +1208,58 @@ func RingHashConfigFromStructs(t *structs.RingHashConfig, s *RingHashConfig) {
s.MinimumRingSize = t.MinimumRingSize
s.MaximumRingSize = t.MaximumRingSize
}
func SamenessGroupToStructs(s *SamenessGroup, t *structs.SamenessGroupConfigEntry) {
if s == nil {
return
}
t.Name = s.Name
t.DefaultForFailover = s.DefaultForFailover
t.IncludeLocal = s.IncludeLocal
{
t.Members = make([]structs.SamenessGroupMember, len(s.Members))
for i := range s.Members {
if s.Members[i] != nil {
SamenessGroupMemberToStructs(s.Members[i], &t.Members[i])
}
}
}
t.Meta = s.Meta
t.EnterpriseMeta = enterpriseMetaToStructs(s.EnterpriseMeta)
}
func SamenessGroupFromStructs(t *structs.SamenessGroupConfigEntry, s *SamenessGroup) {
if s == nil {
return
}
s.Name = t.Name
s.DefaultForFailover = t.DefaultForFailover
s.IncludeLocal = t.IncludeLocal
{
s.Members = make([]*SamenessGroupMember, len(t.Members))
for i := range t.Members {
{
var x SamenessGroupMember
SamenessGroupMemberFromStructs(&t.Members[i], &x)
s.Members[i] = &x
}
}
}
s.Meta = t.Meta
s.EnterpriseMeta = enterpriseMetaFromStructs(t.EnterpriseMeta)
}
func SamenessGroupMemberToStructs(s *SamenessGroupMember, t *structs.SamenessGroupMember) {
if s == nil {
return
}
t.Partition = s.Partition
t.Peer = s.Peer
}
func SamenessGroupMemberFromStructs(t *structs.SamenessGroupMember, s *SamenessGroupMember) {
if s == nil {
return
}
s.Partition = t.Partition
s.Peer = t.Peer
}
func ServiceDefaultsToStructs(s *ServiceDefaults, t *structs.ServiceConfigEntry) {
if s == nil {
return

View File

@ -100,6 +100,14 @@ func ConfigEntryToStructs(s *ConfigEntry) structs.ConfigEntry {
pbcommon.RaftIndexToStructs(s.RaftIndex, &target.RaftIndex)
pbcommon.EnterpriseMetaToStructs(s.EnterpriseMeta, &target.EnterpriseMeta)
return &target
case Kind_KindSamenessGroup:
var target structs.SamenessGroupConfigEntry
target.Name = s.Name
SamenessGroupToStructs(s.GetSamenessGroup(), &target)
pbcommon.RaftIndexToStructs(s.RaftIndex, &target.RaftIndex)
pbcommon.EnterpriseMetaToStructs(s.EnterpriseMeta, &target.EnterpriseMeta)
return &target
default:
panic(fmt.Sprintf("unable to convert ConfigEntry of kind %s to structs", s.Kind))
}
@ -196,6 +204,14 @@ func ConfigEntryFromStructs(s structs.ConfigEntry) *ConfigEntry {
configEntry.Entry = &ConfigEntry_InlineCertificate{
InlineCertificate: &cert,
}
case *structs.SamenessGroupConfigEntry:
var sg SamenessGroup
SamenessGroupFromStructs(v, &sg)
configEntry.Kind = Kind_KindSamenessGroup
configEntry.Entry = &ConfigEntry_SamenessGroup{
SamenessGroup: &sg,
}
default:
panic(fmt.Sprintf("unable to convert %T to proto", s))
}

View File

@ -626,3 +626,23 @@ func (msg *TCPService) MarshalBinary() ([]byte, error) {
func (msg *TCPService) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *SamenessGroup) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *SamenessGroup) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *SamenessGroupMember) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *SamenessGroupMember) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,7 @@ enum Kind {
KindBoundAPIGateway = 8;
KindHTTPRoute = 9;
KindTCPRoute = 10;
KindSamenessGroup = 11;
}
message ConfigEntry {
@ -41,6 +42,7 @@ message ConfigEntry {
TCPRoute TCPRoute = 12;
HTTPRoute HTTPRoute = 13;
InlineCertificate InlineCertificate = 14;
SamenessGroup SamenessGroup = 15;
}
}
@ -884,3 +886,29 @@ message TCPService {
// mog: func-to=enterpriseMetaToStructs func-from=enterpriseMetaFromStructs
common.EnterpriseMeta EnterpriseMeta = 2;
}
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.SamenessGroupConfigEntry
// output=config_entry.gen.go
// name=Structs
// ignore-fields=RaftIndex
message SamenessGroup {
string Name = 1;
bool DefaultForFailover = 2;
bool IncludeLocal = 3;
repeated SamenessGroupMember Members = 4;
map<string, string> Meta = 5;
// mog: func-to=enterpriseMetaToStructs func-from=enterpriseMetaFromStructs
common.EnterpriseMeta EnterpriseMeta = 6;
}
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.SamenessGroupMember
// output=config_entry.gen.go
// name=Structs
message SamenessGroupMember {
string Partition = 1;
string Peer = 2;
}

View File

@ -19,14 +19,15 @@
package pbsubscribe
import (
reflect "reflect"
sync "sync"
_ "github.com/hashicorp/consul/proto-public/annotations/ratelimit"
pbcommon "github.com/hashicorp/consul/proto/private/pbcommon"
pbconfigentry "github.com/hashicorp/consul/proto/private/pbconfigentry"
pbservice "github.com/hashicorp/consul/proto/private/pbservice"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@ -74,6 +75,8 @@ const (
Topic_BoundAPIGateway Topic = 13
// IPRateLimit topic contains events for changes to control-plane-request-limit
Topic_IPRateLimit Topic = 14
// SamenessGroup topic contains events for changes to Sameness Groups
Topic_SamenessGroup Topic = 15
)
// Enum value maps for Topic.
@ -94,6 +97,7 @@ var (
12: "InlineCertificate",
13: "BoundAPIGateway",
14: "IPRateLimit",
15: "SamenessGroup",
}
Topic_value = map[string]int32{
"Unknown": 0,
@ -111,6 +115,7 @@ var (
"InlineCertificate": 12,
"BoundAPIGateway": 13,
"IPRateLimit": 14,
"SamenessGroup": 15,
}
)
@ -992,7 +997,7 @@ var file_private_pbsubscribe_subscribe_proto_rawDesc = []byte{
0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x0e, 0x45,
0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x1a, 0x0a,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xa1, 0x02, 0x0a, 0x05, 0x54, 0x6f,
0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xb4, 0x02, 0x0a, 0x05, 0x54, 0x6f,
0x70, 0x69, 0x63, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00,
0x12, 0x11, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74,
0x68, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65,
@ -1010,26 +1015,28 @@ var file_private_pbsubscribe_subscribe_proto_rawDesc = []byte{
0x12, 0x15, 0x0a, 0x11, 0x49, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x43, 0x65, 0x72, 0x74, 0x69, 0x66,
0x69, 0x63, 0x61, 0x74, 0x65, 0x10, 0x0c, 0x12, 0x13, 0x0a, 0x0f, 0x42, 0x6f, 0x75, 0x6e, 0x64,
0x41, 0x50, 0x49, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x10, 0x0d, 0x12, 0x0f, 0x0a, 0x0b,
0x49, 0x50, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x10, 0x0e, 0x2a, 0x29, 0x0a,
0x09, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65,
0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65,
0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x61, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74,
0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x12, 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62,
0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e,
0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22,
0x08, 0xe2, 0x86, 0x04, 0x04, 0x08, 0x02, 0x10, 0x09, 0x30, 0x01, 0x42, 0x9a, 0x01, 0x0a, 0x0d,
0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a,
0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68,
0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x2f, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62,
0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63,
0x72, 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x49, 0x50, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x10, 0x0e, 0x12, 0x11, 0x0a,
0x0d, 0x53, 0x61, 0x6d, 0x65, 0x6e, 0x65, 0x73, 0x73, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x10, 0x0f,
0x2a, 0x29, 0x0a, 0x09, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a,
0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44,
0x65, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x61, 0x0a, 0x17, 0x53,
0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x62, 0x65, 0x12, 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x10, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65,
0x6e, 0x74, 0x22, 0x08, 0xe2, 0x86, 0x04, 0x04, 0x08, 0x02, 0x10, 0x09, 0x30, 0x01, 0x42, 0x9a,
0x01, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x42, 0x0e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f,
0x50, 0x01, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68,
0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x2f, 0x70, 0x62,
0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa,
0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x62, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea,
0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
}
var (

View File

@ -100,6 +100,9 @@ enum Topic {
// IPRateLimit topic contains events for changes to control-plane-request-limit
IPRateLimit = 14;
// SamenessGroup topic contains events for changes to Sameness Groups
SamenessGroup = 15;
}
message NamedSubject {

View File

@ -26,7 +26,7 @@ and envoy sidecars are deployed.
> Note that all consul agents and user workloads such as application services, mesh-gateway are running in docker containers.
In general, each upgrade test has following steps:
In general, each upgrade test has the following steps:
1. Create a cluster with a specified number of server and client agents, then enable the feature to be tested.
2. Create some workload in the cluster, e.g., registering 2 services: static-server, static-client.
Static-server is a simple http application and the upstream service of static-client.
@ -35,7 +35,7 @@ connection between static client and server. Ensure that a connection cannot be
4. Upgrade Consul cluster to the `target-version` and restart the Envoy sidecars
(we restart Envoy sidecar to ensure the upgraded Consul binary can read the state from
the previous version and generate the correct Envoy configurations)
5. Re-validate the client, server and sidecars to ensure the persisted data from the pervious
5. Re-validate the client, server and sidecars to ensure the persisted data from the previous
version can be accessed in the target version. Verify connection / disconnection
(e.g., deny Action)