mirror of https://github.com/status-im/consul.git
server: suppress spurious blocking query returns where multiple config entries are involved (#12362)
Starting from and extending the mechanism introduced in #12110 we can specially handle the 3 main special Consul RPC endpoints that react to many config entries in a single blocking query in Connect: - `DiscoveryChain.Get` - `ConfigEntry.ResolveServiceConfig` - `Intentions.Match` All of these will internally watch for many config entries, and at least one of those will likely be not found in any given query. Because these are blends of multiple reads the exact solution from #12110 isn't perfectly aligned, but we can tweak the approach slightly and regain the utility of that mechanism. ### No Config Entries Found In this case, despite looking for many config entries none may be found at all. Unlike #12110 in this scenario we do not return an empty reply to the caller, but instead synthesize a struct from default values to return. This can be handled nearly identically to #12110 with the first 1-2 replies being non-empty payloads followed by the standard spurious wakeup suppression mechanism from #12110. ### No Change Since Last Wakeup Once a blocking query loop on the server has completed and slept at least once, there is a further optimization we can make here to detect if any of the config entries that were present at specific versions for the prior execution of the loop are identical for the loop we just woke up for. In that scenario we can return a slightly different internal sentinel error and basically externally handle it similar to #12110. This would mean that even if 20 discovery chain read RPC handling goroutines wakeup due to the creation of an unrelated config entry, the only ones that will terminate and reply with a blob of data are those that genuinely have new data to report. ### Extra Endpoints Since this pattern is pretty reusable, other key config-entry-adjacent endpoints used by `agent/proxycfg` also were updated: - `ConfigEntry.List` - `Internal.IntentionUpstreams` (tproxy)
This commit is contained in:
parent
25f4a425d1
commit
7b0548dd8d
|
@ -0,0 +1,3 @@
|
|||
```release-note:improvement
|
||||
server: suppress spurious blocking query returns where multiple config entries are involved
|
||||
```
|
|
@ -32,3 +32,7 @@ func NewKindName(kind, name string, entMeta *structs.EnterpriseMeta) KindName {
|
|||
ret.Normalize()
|
||||
return ret
|
||||
}
|
||||
|
||||
func NewKindNameForEntry(entry structs.ConfigEntry) KindName {
|
||||
return NewKindName(entry.GetKind(), entry.GetName(), entry.GetEnterpriseMeta())
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
package configentry
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// ResolvedServiceConfigSet is a wrapped set of raw cross-referenced config
|
||||
// entries necessary for the ConfigEntry.ResolveServiceConfig RPC process.
|
||||
//
|
||||
// None of these are defaulted.
|
||||
type ResolvedServiceConfigSet struct {
|
||||
ServiceDefaults map[structs.ServiceID]*structs.ServiceConfigEntry
|
||||
ProxyDefaults map[string]*structs.ProxyConfigEntry
|
||||
}
|
||||
|
||||
func (r *ResolvedServiceConfigSet) IsEmpty() bool {
|
||||
return len(r.ServiceDefaults) == 0 && len(r.ProxyDefaults) == 0
|
||||
}
|
||||
|
||||
func (r *ResolvedServiceConfigSet) GetServiceDefaults(sid structs.ServiceID) *structs.ServiceConfigEntry {
|
||||
if r.ServiceDefaults == nil {
|
||||
return nil
|
||||
}
|
||||
return r.ServiceDefaults[sid]
|
||||
}
|
||||
|
||||
func (r *ResolvedServiceConfigSet) GetProxyDefaults(partition string) *structs.ProxyConfigEntry {
|
||||
if r.ProxyDefaults == nil {
|
||||
return nil
|
||||
}
|
||||
return r.ProxyDefaults[partition]
|
||||
}
|
||||
|
||||
func (r *ResolvedServiceConfigSet) AddServiceDefaults(entry *structs.ServiceConfigEntry) {
|
||||
if entry == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.ServiceDefaults == nil {
|
||||
r.ServiceDefaults = make(map[structs.ServiceID]*structs.ServiceConfigEntry)
|
||||
}
|
||||
|
||||
sid := structs.NewServiceID(entry.Name, &entry.EnterpriseMeta)
|
||||
r.ServiceDefaults[sid] = entry
|
||||
}
|
||||
|
||||
func (r *ResolvedServiceConfigSet) AddProxyDefaults(entry *structs.ProxyConfigEntry) {
|
||||
if entry == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if r.ProxyDefaults == nil {
|
||||
r.ProxyDefaults = make(map[string]*structs.ProxyConfigEntry)
|
||||
}
|
||||
|
||||
r.ProxyDefaults[entry.PartitionOrDefault()] = entry
|
||||
}
|
|
@ -10,8 +10,10 @@ import (
|
|||
"github.com/hashicorp/go-hclog"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/mitchellh/copystructure"
|
||||
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/configentry"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
@ -236,6 +238,10 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe
|
|||
}
|
||||
}
|
||||
|
||||
var (
|
||||
priorHash uint64
|
||||
ranOnce bool
|
||||
)
|
||||
return c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
|
@ -258,6 +264,26 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe
|
|||
reply.Kind = args.Kind
|
||||
reply.Index = index
|
||||
reply.Entries = filteredEntries
|
||||
|
||||
// Generate a hash of the content driving this response. Use it to
|
||||
// determine if the response is identical to a prior wakeup.
|
||||
newHash, err := hashstructure_v2.Hash(filteredEntries, hashstructure_v2.FormatV2, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||
}
|
||||
|
||||
if ranOnce && priorHash == newHash {
|
||||
priorHash = newHash
|
||||
return errNotChanged
|
||||
} else {
|
||||
priorHash = newHash
|
||||
ranOnce = true
|
||||
}
|
||||
|
||||
if len(reply.Entries) == 0 {
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -417,36 +443,108 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
var (
|
||||
priorHash uint64
|
||||
ranOnce bool
|
||||
)
|
||||
return c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
var thisReply structs.ServiceConfigResponse
|
||||
var (
|
||||
upstreamIDs = args.UpstreamIDs
|
||||
legacyUpstreams = false
|
||||
)
|
||||
|
||||
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
||||
// TODO(freddy) Refactor this into smaller set of state store functions
|
||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
||||
// blocking query, this function will be rerun and these state store lookups will both be current.
|
||||
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
||||
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, &args.EnterpriseMeta)
|
||||
// The request is considered legacy if the deprecated args.Upstream was used
|
||||
if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 {
|
||||
legacyUpstreams = true
|
||||
|
||||
upstreamIDs = make([]structs.ServiceID, 0)
|
||||
for _, upstream := range args.Upstreams {
|
||||
// Before Consul namespaces were released, the Upstreams
|
||||
// provided to the endpoint did not contain the namespace.
|
||||
// Because of this we attach the enterprise meta of the
|
||||
// request, which will just be the default namespace.
|
||||
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
|
||||
upstreamIDs = append(upstreamIDs, sid)
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch all relevant config entries.
|
||||
|
||||
index, entries, err := state.ReadResolvedServiceConfigEntries(
|
||||
ws,
|
||||
args.Name,
|
||||
&args.EnterpriseMeta,
|
||||
upstreamIDs,
|
||||
args.Mode,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
proxyConf *structs.ProxyConfigEntry
|
||||
proxyConfGlobalProtocol string
|
||||
ok bool
|
||||
)
|
||||
if proxyEntry != nil {
|
||||
proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid proxy config type %T", proxyEntry)
|
||||
// Generate a hash of the config entry content driving this
|
||||
// response. Use it to determine if the response is identical to a
|
||||
// prior wakeup.
|
||||
newHash, err := hashstructure_v2.Hash(entries, hashstructure_v2.FormatV2, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||
}
|
||||
|
||||
if ranOnce && priorHash == newHash {
|
||||
priorHash = newHash
|
||||
reply.Index = index
|
||||
// NOTE: the prior response is still alive inside of *reply, which
|
||||
// is desirable
|
||||
return errNotChanged
|
||||
} else {
|
||||
priorHash = newHash
|
||||
ranOnce = true
|
||||
}
|
||||
|
||||
thisReply, err := c.computeResolvedServiceConfig(
|
||||
args,
|
||||
upstreamIDs,
|
||||
legacyUpstreams,
|
||||
entries,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
thisReply.Index = index
|
||||
|
||||
*reply = *thisReply
|
||||
if entries.IsEmpty() {
|
||||
// No config entries factored into this reply; it's a default.
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConfigEntry) computeResolvedServiceConfig(
|
||||
args *structs.ServiceConfigRequest,
|
||||
upstreamIDs []structs.ServiceID,
|
||||
legacyUpstreams bool,
|
||||
entries *configentry.ResolvedServiceConfigSet,
|
||||
) (*structs.ServiceConfigResponse, error) {
|
||||
var thisReply structs.ServiceConfigResponse
|
||||
|
||||
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
||||
|
||||
// TODO(freddy) Refactor this into smaller set of state store functions
|
||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
||||
// blocking query, this function will be rerun and these state store lookups will both be current.
|
||||
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
||||
var proxyConfGlobalProtocol string
|
||||
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
|
||||
if proxyConf != nil {
|
||||
// Apply the proxy defaults to the sidecar's proxy config
|
||||
mapCopy, err := copystructure.Copy(proxyConf.Config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy global proxy-defaults: %v", err)
|
||||
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
|
||||
}
|
||||
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
|
||||
thisReply.Mode = proxyConf.Mode
|
||||
|
@ -457,25 +555,18 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
// Extract the global protocol from proxyConf for upstream configs.
|
||||
rawProtocol := proxyConf.Config["protocol"]
|
||||
if rawProtocol != nil {
|
||||
var ok bool
|
||||
proxyConfGlobalProtocol, ok = rawProtocol.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid protocol type %T", rawProtocol)
|
||||
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
thisReply.Index = index
|
||||
|
||||
var serviceConf *structs.ServiceConfigEntry
|
||||
if serviceEntry != nil {
|
||||
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid service config type %T", serviceEntry)
|
||||
}
|
||||
serviceConf := entries.GetServiceDefaults(
|
||||
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
|
||||
)
|
||||
if serviceConf != nil {
|
||||
if serviceConf.Expose.Checks {
|
||||
thisReply.Expose.Checks = true
|
||||
}
|
||||
|
@ -508,9 +599,6 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
// - Implicitly from centralized upstream config in service-defaults
|
||||
seenUpstreams := map[structs.ServiceID]struct{}{}
|
||||
|
||||
upstreamIDs := args.UpstreamIDs
|
||||
legacyUpstreams := false
|
||||
|
||||
var (
|
||||
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
|
||||
|
||||
|
@ -520,24 +608,10 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
)
|
||||
|
||||
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
|
||||
// If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode.
|
||||
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
|
||||
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
|
||||
if noUpstreamArgs && !tproxy {
|
||||
*reply = thisReply
|
||||
return nil
|
||||
}
|
||||
|
||||
// The request is considered legacy if the deprecated args.Upstream was used
|
||||
if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 {
|
||||
legacyUpstreams = true
|
||||
|
||||
upstreamIDs = make([]structs.ServiceID, 0)
|
||||
for _, upstream := range args.Upstreams {
|
||||
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
|
||||
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
|
||||
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
|
||||
upstreamIDs = append(upstreamIDs, sid)
|
||||
}
|
||||
return &thisReply, nil
|
||||
}
|
||||
|
||||
// First store all upstreams that were provided in the request
|
||||
|
@ -592,19 +666,15 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
// (how the downstream wants to address it)
|
||||
protocol := proxyConfGlobalProtocol
|
||||
|
||||
_, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
upstreamSvcDefaults := entries.GetServiceDefaults(
|
||||
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
|
||||
)
|
||||
if upstreamSvcDefaults != nil {
|
||||
cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults)
|
||||
}
|
||||
if cfg.Protocol != "" {
|
||||
protocol = cfg.Protocol
|
||||
if upstreamSvcDefaults.Protocol != "" {
|
||||
protocol = upstreamSvcDefaults.Protocol
|
||||
}
|
||||
}
|
||||
|
||||
if protocol != "" {
|
||||
resolvedCfg["protocol"] = protocol
|
||||
}
|
||||
|
@ -637,8 +707,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
|
||||
// don't allocate the slices just to not fill them
|
||||
if len(usConfigs) == 0 {
|
||||
*reply = thisReply
|
||||
return nil
|
||||
return &thisReply, nil
|
||||
}
|
||||
|
||||
if legacyUpstreams {
|
||||
|
@ -658,9 +727,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
}
|
||||
}
|
||||
|
||||
*reply = thisReply
|
||||
return nil
|
||||
})
|
||||
return &thisReply, nil
|
||||
}
|
||||
|
||||
func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error {
|
||||
|
|
|
@ -9,10 +9,12 @@ import (
|
|||
"time"
|
||||
|
||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/configentry"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
|
@ -310,20 +312,29 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
|
|||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t)
|
||||
codec := rpcClient(t, s1)
|
||||
store := s1.fsm.State()
|
||||
|
||||
entry := &structs.ServiceConfigEntry{
|
||||
codec := rpcClient(t, s1)
|
||||
readerCodec := rpcClient(t, s1)
|
||||
writerCodec := rpcClient(t, s1)
|
||||
|
||||
{ // create one relevant entry
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "alpha",
|
||||
},
|
||||
}, &out))
|
||||
require.True(t, out)
|
||||
}
|
||||
require.NoError(t, store.EnsureConfigEntry(1, entry))
|
||||
|
||||
runStep(t, "test the errNotFound path", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var count int
|
||||
|
||||
start := time.Now()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
args := structs.ConfigEntryQuery{
|
||||
|
@ -335,7 +346,7 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
|
|||
for ctx.Err() == nil {
|
||||
var out structs.ConfigEntryResponse
|
||||
|
||||
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
|
||||
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.Get", &args, &out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -347,14 +358,21 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
|
|||
})
|
||||
|
||||
g.Go(func() error {
|
||||
for i := uint64(0); i < 200; i++ {
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
entry := &structs.ServiceConfigEntry{
|
||||
|
||||
var out bool
|
||||
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: fmt.Sprintf("other%d", i),
|
||||
},
|
||||
}, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] unexpected error: %w", i, err)
|
||||
}
|
||||
if err := store.EnsureConfigEntry(i+2, entry); err != nil {
|
||||
return err
|
||||
if !out {
|
||||
return fmt.Errorf("[%d] unexpectedly returned false", i)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
|
@ -362,11 +380,9 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
|
|||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
// The test is a bit racy because of the timing of the two goroutines, so
|
||||
// we relax the check for the count to be within a small range.
|
||||
if count < 2 || count > 3 {
|
||||
t.Fatalf("expected count to be 2 or 3, got %d", count)
|
||||
}
|
||||
|
||||
assertBlockingQueryWakeupCount(t, time.Second, start, count)
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigEntry_Get_ACLDeny(t *testing.T) {
|
||||
|
@ -472,6 +488,102 @@ func TestConfigEntry_List(t *testing.T) {
|
|||
require.Equal(t, expected, out)
|
||||
}
|
||||
|
||||
func TestConfigEntry_List_BlockOnNoChange(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t)
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
readerCodec := rpcClient(t, s1)
|
||||
writerCodec := rpcClient(t, s1)
|
||||
|
||||
run := func(t *testing.T, dataPrefix string) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var count int
|
||||
|
||||
start := time.Now()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
args := structs.ConfigEntryQuery{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
args.QueryOptions.MaxQueryTime = time.Second
|
||||
|
||||
for ctx.Err() == nil {
|
||||
var out structs.IndexedConfigEntries
|
||||
|
||||
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.List", &args, &out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.Log("blocking query index", out.QueryMeta.Index, out, time.Now())
|
||||
count++
|
||||
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
var out bool
|
||||
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: fmt.Sprintf(dataPrefix+"%d", i),
|
||||
ConnectTimeout: 33 * time.Second,
|
||||
},
|
||||
}, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] unexpected error: %w", i, err)
|
||||
}
|
||||
if !out {
|
||||
return fmt.Errorf("[%d] unexpectedly returned false", i)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
|
||||
assertBlockingQueryWakeupCount(t, time.Second, start, count)
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotFound path", func(t *testing.T) {
|
||||
run(t, "other")
|
||||
})
|
||||
|
||||
{ // Create some dummy services in the state store to look up.
|
||||
for _, entry := range []structs.ConfigEntry{
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "bar",
|
||||
},
|
||||
&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
},
|
||||
} {
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: entry,
|
||||
}, &out))
|
||||
require.True(t, out)
|
||||
}
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotChanged path", func(t *testing.T) {
|
||||
run(t, "completely-different-other")
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigEntry_ListAll(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
@ -2025,6 +2137,142 @@ func TestConfigEntry_ResolveServiceConfig_ProxyDefaultsProtocol_UsedForAllUpstre
|
|||
require.Equal(t, expected, out)
|
||||
}
|
||||
|
||||
func BenchmarkConfigEntry_ResolveServiceConfig_Hash(b *testing.B) {
|
||||
res := &configentry.ResolvedServiceConfigSet{}
|
||||
|
||||
res.AddServiceDefaults(&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "web",
|
||||
Protocol: "http",
|
||||
})
|
||||
res.AddServiceDefaults(&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "up1",
|
||||
Protocol: "http",
|
||||
})
|
||||
res.AddServiceDefaults(&structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "up2",
|
||||
Protocol: "http",
|
||||
})
|
||||
res.AddProxyDefaults(&structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "grpc",
|
||||
},
|
||||
})
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := hashstructure_v2.Hash(res, hashstructure_v2.FormatV2, nil)
|
||||
if err != nil {
|
||||
b.Fatalf("error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t)
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
readerCodec := rpcClient(t, s1)
|
||||
writerCodec := rpcClient(t, s1)
|
||||
|
||||
run := func(t *testing.T, dataPrefix string) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var count int
|
||||
|
||||
start := time.Now()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
args := structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
UpstreamIDs: []structs.ServiceID{
|
||||
structs.NewServiceID("bar", nil),
|
||||
},
|
||||
}
|
||||
args.QueryOptions.MaxQueryTime = time.Second
|
||||
|
||||
for ctx.Err() == nil {
|
||||
var out structs.ServiceConfigResponse
|
||||
|
||||
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.ResolveServiceConfig", &args, &out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.Log("blocking query index", out.QueryMeta.Index, out, time.Now())
|
||||
count++
|
||||
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
var out bool
|
||||
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: fmt.Sprintf(dataPrefix+"%d", i),
|
||||
},
|
||||
}, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] unexpected error: %w", i, err)
|
||||
}
|
||||
if !out {
|
||||
return fmt.Errorf("[%d] unexpectedly returned false", i)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
|
||||
assertBlockingQueryWakeupCount(t, time.Second, start, count)
|
||||
}
|
||||
|
||||
{ // create one unrelated entry
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "unrelated",
|
||||
},
|
||||
}, &out))
|
||||
require.True(t, out)
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotFound path", func(t *testing.T) {
|
||||
run(t, "other")
|
||||
})
|
||||
|
||||
{ // create one relevant entry
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "bar",
|
||||
Protocol: "grpc",
|
||||
},
|
||||
}, &out))
|
||||
require.True(t, out)
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotChanged path", func(t *testing.T) {
|
||||
run(t, "completely-different-other")
|
||||
})
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfigNoConfig(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
|
@ -48,6 +49,10 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
|
|||
evalDC = c.srv.config.Datacenter
|
||||
}
|
||||
|
||||
var (
|
||||
priorHash uint64
|
||||
ranOnce bool
|
||||
)
|
||||
return c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
|
@ -66,9 +71,32 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
|
|||
return err
|
||||
}
|
||||
|
||||
// Generate a hash of the config entry content driving this
|
||||
// response. Use it to determine if the response is identical to a
|
||||
// prior wakeup.
|
||||
newHash, err := hashstructure_v2.Hash(chain, hashstructure_v2.FormatV2, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||
}
|
||||
|
||||
if ranOnce && priorHash == newHash {
|
||||
priorHash = newHash
|
||||
reply.Index = index
|
||||
// NOTE: the prior response is still alive inside of *reply, which
|
||||
// is desirable
|
||||
return errNotChanged
|
||||
} else {
|
||||
priorHash = newHash
|
||||
ranOnce = true
|
||||
}
|
||||
|
||||
reply.Index = index
|
||||
reply.Chain = chain
|
||||
|
||||
if chain.IsDefault() {
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
@ -8,6 +9,7 @@ import (
|
|||
|
||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
|
@ -242,3 +244,116 @@ func TestDiscoveryChainEndpoint_Get(t *testing.T) {
|
|||
require.Equal(t, expect, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
readerCodec := rpcClient(t, s1)
|
||||
writerCodec := rpcClient(t, s1)
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
|
||||
{ // create one unrelated entry
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "unrelated",
|
||||
ConnectTimeout: 33 * time.Second,
|
||||
},
|
||||
}, &out))
|
||||
require.True(t, out)
|
||||
}
|
||||
|
||||
run := func(t *testing.T, dataPrefix string) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var count int
|
||||
|
||||
start := time.Now()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
args := &structs.DiscoveryChainRequest{
|
||||
Name: "web",
|
||||
EvaluateInDatacenter: "dc1",
|
||||
EvaluateInNamespace: "default",
|
||||
EvaluateInPartition: "default",
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
args.QueryOptions.MaxQueryTime = time.Second
|
||||
|
||||
for ctx.Err() == nil {
|
||||
var out structs.DiscoveryChainResponse
|
||||
err := msgpackrpc.CallWithCodec(readerCodec, "DiscoveryChain.Get", &args, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting discovery chain: %w", err)
|
||||
}
|
||||
if !out.Chain.IsDefault() {
|
||||
return fmt.Errorf("expected default chain")
|
||||
}
|
||||
|
||||
t.Log("blocking query index", out.QueryMeta.Index, out.Chain)
|
||||
count++
|
||||
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
var out bool
|
||||
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Datacenter: "dc1",
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: fmt.Sprintf(dataPrefix+"%d", i),
|
||||
},
|
||||
}, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] unexpected error: %w", i, err)
|
||||
}
|
||||
if !out {
|
||||
return fmt.Errorf("[%d] unexpectedly returned false", i)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
|
||||
assertBlockingQueryWakeupCount(t, time.Second, start, count)
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotFound path", func(t *testing.T) {
|
||||
run(t, "other")
|
||||
})
|
||||
|
||||
{ // create one relevant entry
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "web",
|
||||
Protocol: "grpc",
|
||||
},
|
||||
}, &out))
|
||||
require.True(t, out)
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotChanged path", func(t *testing.T) {
|
||||
run(t, "completely-different-other")
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/hashicorp/go-bexpr"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
|
@ -617,6 +618,10 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In
|
|||
}
|
||||
}
|
||||
|
||||
var (
|
||||
priorHash uint64
|
||||
ranOnce bool
|
||||
)
|
||||
return s.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
|
@ -628,6 +633,35 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In
|
|||
|
||||
reply.Index = index
|
||||
reply.Matches = matches
|
||||
|
||||
// Generate a hash of the intentions content driving this response.
|
||||
// Use it to determine if the response is identical to a prior
|
||||
// wakeup.
|
||||
newHash, err := hashstructure_v2.Hash(matches, hashstructure_v2.FormatV2, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||
}
|
||||
|
||||
if ranOnce && priorHash == newHash {
|
||||
priorHash = newHash
|
||||
return errNotChanged
|
||||
} else {
|
||||
priorHash = newHash
|
||||
ranOnce = true
|
||||
}
|
||||
|
||||
hasData := false
|
||||
for _, match := range matches {
|
||||
if len(match) > 0 {
|
||||
hasData = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !hasData {
|
||||
return errNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
@ -8,6 +9,7 @@ import (
|
|||
|
||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -1742,6 +1744,123 @@ func TestIntentionMatch_good(t *testing.T) {
|
|||
require.Equal(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestIntentionMatch_BlockOnNoChange(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t)
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
readerCodec := rpcClient(t, s1)
|
||||
writerCodec := rpcClient(t, s1)
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
run := func(t *testing.T, dataPrefix string, expectMatches int) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var count int
|
||||
|
||||
start := time.Now()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
args := &structs.IntentionQueryRequest{
|
||||
Datacenter: "dc1",
|
||||
Match: &structs.IntentionQueryMatch{
|
||||
Type: structs.IntentionMatchDestination,
|
||||
Entries: []structs.IntentionMatchEntry{
|
||||
{Name: "bar"},
|
||||
},
|
||||
},
|
||||
}
|
||||
args.QueryOptions.MaxQueryTime = time.Second
|
||||
|
||||
for ctx.Err() == nil {
|
||||
var out structs.IndexedIntentionMatches
|
||||
|
||||
err := msgpackrpc.CallWithCodec(readerCodec, "Intention.Match", args, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting intentions: %w", err)
|
||||
}
|
||||
if len(out.Matches) != 1 {
|
||||
return fmt.Errorf("expected 1 match got %d", len(out.Matches))
|
||||
}
|
||||
if len(out.Matches[0]) != expectMatches {
|
||||
return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0]))
|
||||
}
|
||||
|
||||
t.Log("blocking query index", out.QueryMeta.Index, out.Matches[0])
|
||||
count++
|
||||
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
var out string
|
||||
err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.IntentionOpCreate,
|
||||
Intention: &structs.Intention{
|
||||
// {"default", "*", "default", "baz"}, // shouldn't match
|
||||
SourceNS: "default",
|
||||
SourceName: "*",
|
||||
DestinationNS: "default",
|
||||
DestinationName: fmt.Sprintf(dataPrefix+"%d", i),
|
||||
Action: structs.IntentionActionAllow,
|
||||
},
|
||||
}, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] unexpected error: %w", i, err)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
|
||||
assertBlockingQueryWakeupCount(t, time.Second, start, count)
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotFound path", func(t *testing.T) {
|
||||
run(t, "other", 0)
|
||||
})
|
||||
|
||||
// Create some records
|
||||
{
|
||||
insert := [][]string{
|
||||
{"default", "*", "default", "*"},
|
||||
{"default", "*", "default", "bar"},
|
||||
{"default", "*", "default", "baz"}, // shouldn't match
|
||||
}
|
||||
|
||||
for _, v := range insert {
|
||||
var out string
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Intention.Apply", &structs.IntentionRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.IntentionOpCreate,
|
||||
Intention: &structs.Intention{
|
||||
SourceNS: v[0],
|
||||
SourceName: v[1],
|
||||
DestinationNS: v[2],
|
||||
DestinationName: v[3],
|
||||
Action: structs.IntentionActionAllow,
|
||||
},
|
||||
}, &out))
|
||||
}
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotChanged path", func(t *testing.T) {
|
||||
run(t, "completely-different-other", 2)
|
||||
})
|
||||
}
|
||||
|
||||
// Test matching with ACLs
|
||||
func TestIntentionMatch_acl(t *testing.T) {
|
||||
if testing.Short() {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
|
@ -210,6 +211,10 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl
|
|||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
priorHash uint64
|
||||
ranOnce bool
|
||||
)
|
||||
return m.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
|
@ -224,6 +229,23 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl
|
|||
|
||||
reply.Index, reply.Services = index, services
|
||||
m.srv.filterACLWithAuthorizer(authz, reply)
|
||||
|
||||
// Generate a hash of the intentions content driving this response.
|
||||
// Use it to determine if the response is identical to a prior
|
||||
// wakeup.
|
||||
newHash, err := hashstructure_v2.Hash(services, hashstructure_v2.FormatV2, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||
}
|
||||
|
||||
if ranOnce && priorHash == newHash {
|
||||
priorHash = newHash
|
||||
return errNotChanged
|
||||
} else {
|
||||
priorHash = newHash
|
||||
ranOnce = true
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -2317,6 +2321,115 @@ func TestInternal_IntentionUpstreams(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
_, s1 := testServerWithConfig(t)
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
readerCodec := rpcClient(t, s1)
|
||||
writerCodec := rpcClient(t, s1)
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
{ // ensure it's default deny to start
|
||||
var out bool
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
|
||||
Entry: &structs.ServiceIntentionsConfigEntry{
|
||||
Kind: structs.ServiceIntentions,
|
||||
Name: "*",
|
||||
Sources: []*structs.SourceIntention{
|
||||
{
|
||||
Name: "*",
|
||||
Action: structs.IntentionActionDeny,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, &out))
|
||||
require.True(t, out)
|
||||
}
|
||||
|
||||
run := func(t *testing.T, dataPrefix string, expectServices int) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var count int
|
||||
|
||||
start := time.Now()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
args := &structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "web",
|
||||
}
|
||||
args.QueryOptions.MaxQueryTime = time.Second
|
||||
|
||||
for ctx.Err() == nil {
|
||||
var out structs.IndexedServiceList
|
||||
|
||||
err := msgpackrpc.CallWithCodec(readerCodec, "Internal.IntentionUpstreams", args, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting upstreams: %w", err)
|
||||
}
|
||||
|
||||
if len(out.Services) != expectServices {
|
||||
return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services))
|
||||
}
|
||||
|
||||
t.Log("blocking query index", out.QueryMeta.Index, out.Services)
|
||||
count++
|
||||
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
var out string
|
||||
err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.IntentionOpCreate,
|
||||
Intention: &structs.Intention{
|
||||
SourceName: fmt.Sprintf(dataPrefix+"-src-%d", i),
|
||||
DestinationName: fmt.Sprintf(dataPrefix+"-dst-%d", i),
|
||||
Action: structs.IntentionActionAllow,
|
||||
},
|
||||
}, &out)
|
||||
if err != nil {
|
||||
return fmt.Errorf("[%d] unexpected error: %w", i, err)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, g.Wait())
|
||||
|
||||
assertBlockingQueryWakeupCount(t, time.Second, start, count)
|
||||
}
|
||||
|
||||
runStep(t, "test the errNotFound path", func(t *testing.T) {
|
||||
run(t, "other", 0)
|
||||
})
|
||||
|
||||
// Services:
|
||||
// api and api-proxy on node foo
|
||||
// web and web-proxy on node foo
|
||||
//
|
||||
// Intentions
|
||||
// * -> * (deny) intention
|
||||
// web -> api (allow)
|
||||
registerIntentionUpstreamEntries(t, codec, "")
|
||||
|
||||
runStep(t, "test the errNotChanged path", func(t *testing.T) {
|
||||
run(t, "completely-different-other", 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestInternal_IntentionUpstreams_ACL(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -954,6 +954,19 @@ type blockingQueryResponseMeta interface {
|
|||
// a previous result. errNotFound will never be returned to the caller, it is
|
||||
// converted to nil before returning.
|
||||
//
|
||||
// The query function can return errNotChanged, which is a sentinel error. This
|
||||
// can only be returned on calls AFTER the first call, as it would not be
|
||||
// possible to detect the absence of a change on the first call. Returning
|
||||
// errNotChanged indicates that the query results are identical to the prior
|
||||
// results which allows blockingQuery to keep blocking until the query returns
|
||||
// a real changed result.
|
||||
//
|
||||
// The query function must take care to ensure the actual result of the query
|
||||
// is either left unmodified or explicitly left in a good state before
|
||||
// returning, otherwise when blockingQuery times out it may return an
|
||||
// incomplete or unexpected result. errNotChanged will never be returned to the
|
||||
// caller, it is converted to nil before returning.
|
||||
//
|
||||
// If query function returns any other error, the error is returned to the caller
|
||||
// immediately.
|
||||
//
|
||||
|
@ -993,7 +1006,7 @@ func (s *Server) blockingQuery(
|
|||
var ws memdb.WatchSet
|
||||
err := query(ws, s.fsm.State())
|
||||
s.setQueryMeta(responseMeta, opts.GetToken())
|
||||
if errors.Is(err, errNotFound) {
|
||||
if errors.Is(err, errNotFound) || errors.Is(err, errNotChanged) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
@ -1008,7 +1021,10 @@ func (s *Server) blockingQuery(
|
|||
// decrement the count when the function returns.
|
||||
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))
|
||||
|
||||
var notFound bool
|
||||
var (
|
||||
notFound bool
|
||||
ranOnce bool
|
||||
)
|
||||
|
||||
for {
|
||||
if opts.GetRequireConsistent() {
|
||||
|
@ -1029,17 +1045,23 @@ func (s *Server) blockingQuery(
|
|||
|
||||
err := query(ws, state)
|
||||
s.setQueryMeta(responseMeta, opts.GetToken())
|
||||
|
||||
switch {
|
||||
case errors.Is(err, errNotFound):
|
||||
if notFound {
|
||||
// query result has not changed
|
||||
minQueryIndex = responseMeta.GetIndex()
|
||||
}
|
||||
|
||||
notFound = true
|
||||
case errors.Is(err, errNotChanged):
|
||||
if ranOnce {
|
||||
// query result has not changed
|
||||
minQueryIndex = responseMeta.GetIndex()
|
||||
}
|
||||
case err != nil:
|
||||
return err
|
||||
}
|
||||
ranOnce = true
|
||||
|
||||
if responseMeta.GetIndex() > minQueryIndex {
|
||||
return nil
|
||||
|
@ -1060,7 +1082,10 @@ func (s *Server) blockingQuery(
|
|||
}
|
||||
}
|
||||
|
||||
var errNotFound = fmt.Errorf("no data found for query")
|
||||
var (
|
||||
errNotFound = fmt.Errorf("no data found for query")
|
||||
errNotChanged = fmt.Errorf("data did not change for query")
|
||||
)
|
||||
|
||||
// setQueryMeta is used to populate the QueryMeta data for an RPC call
|
||||
//
|
||||
|
|
|
@ -1681,3 +1681,24 @@ func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.Subsc
|
|||
}
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// assertBlockingQueryWakeupCount is used to assist in assertions for
|
||||
// blockingQuery RPC tests involving the two sentinel errors errNotFound and
|
||||
// errNotChanged.
|
||||
//
|
||||
// Those tests are a bit racy because of the timing of the two goroutines, so
|
||||
// we relax the check for the count to be within a small range.
|
||||
//
|
||||
// The blocking query is going to wake up every interval, so use the elapsed test
|
||||
// time with that known timing value to gauge how many legit wakeups should
|
||||
// happen and then pad it out a smidge.
|
||||
func assertBlockingQueryWakeupCount(t testing.TB, interval time.Duration, start time.Time, gotCount int) {
|
||||
t.Helper()
|
||||
|
||||
const buffer = 2
|
||||
expectedQueries := int(time.Since(start)/interval) + buffer
|
||||
|
||||
if gotCount < 2 || gotCount > expectedQueries {
|
||||
t.Fatalf("expected count to be >= 2 or < %d, got %d", expectedQueries, gotCount)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -818,6 +818,120 @@ func (s *Store) serviceDiscoveryChainTxn(
|
|||
return index, chain, nil
|
||||
}
|
||||
|
||||
func (s *Store) ReadResolvedServiceConfigEntries(
|
||||
ws memdb.WatchSet,
|
||||
serviceName string,
|
||||
entMeta *structs.EnterpriseMeta,
|
||||
upstreamIDs []structs.ServiceID,
|
||||
proxyMode structs.ProxyMode,
|
||||
) (uint64, *configentry.ResolvedServiceConfigSet, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
var res configentry.ResolvedServiceConfigSet
|
||||
|
||||
// The caller will likely calculate this again, but we need to do it here
|
||||
// to determine if we are going to traverse into implicit upstream
|
||||
// definitions.
|
||||
var inferredProxyMode structs.ProxyMode
|
||||
|
||||
index, proxyEntry, err := configEntryTxn(tx, ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
maxIndex := index
|
||||
|
||||
if proxyEntry != nil {
|
||||
var ok bool
|
||||
proxyConf, ok := proxyEntry.(*structs.ProxyConfigEntry)
|
||||
if !ok {
|
||||
return 0, nil, fmt.Errorf("invalid proxy config type %T", proxyEntry)
|
||||
}
|
||||
res.AddProxyDefaults(proxyConf)
|
||||
|
||||
inferredProxyMode = proxyConf.Mode
|
||||
}
|
||||
|
||||
index, serviceEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, serviceName, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
if index > maxIndex {
|
||||
maxIndex = index
|
||||
}
|
||||
|
||||
var serviceConf *structs.ServiceConfigEntry
|
||||
if serviceEntry != nil {
|
||||
var ok bool
|
||||
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
|
||||
if !ok {
|
||||
return 0, nil, fmt.Errorf("invalid service config type %T", serviceEntry)
|
||||
}
|
||||
res.AddServiceDefaults(serviceConf)
|
||||
|
||||
if serviceConf.Mode != structs.ProxyModeDefault {
|
||||
inferredProxyMode = serviceConf.Mode
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
noUpstreamArgs = len(upstreamIDs) == 0
|
||||
|
||||
// Check the args and the resolved value. If it was exclusively set via a config entry, then proxyMode
|
||||
// will never be transparent because the service config request does not use the resolved value.
|
||||
tproxy = proxyMode == structs.ProxyModeTransparent || inferredProxyMode == structs.ProxyModeTransparent
|
||||
)
|
||||
|
||||
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
|
||||
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
|
||||
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
|
||||
if noUpstreamArgs && !tproxy {
|
||||
return maxIndex, &res, nil
|
||||
}
|
||||
|
||||
// First collect all upstreams into a set of seen upstreams.
|
||||
// Upstreams can come from:
|
||||
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
|
||||
// - Implicitly from centralized upstream config in service-defaults
|
||||
seenUpstreams := map[structs.ServiceID]struct{}{}
|
||||
|
||||
for _, sid := range upstreamIDs {
|
||||
if _, ok := seenUpstreams[sid]; !ok {
|
||||
seenUpstreams[sid] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
||||
for _, override := range serviceConf.UpstreamConfig.Overrides {
|
||||
if override.Name == "" {
|
||||
continue // skip this impossible condition
|
||||
}
|
||||
seenUpstreams[override.ServiceID()] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
for upstream := range seenUpstreams {
|
||||
index, rawEntry, err := configEntryTxn(tx, ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
if index > maxIndex {
|
||||
maxIndex = index
|
||||
}
|
||||
|
||||
if rawEntry != nil {
|
||||
entry, ok := rawEntry.(*structs.ServiceConfigEntry)
|
||||
if !ok {
|
||||
return 0, nil, fmt.Errorf("invalid service config type %T", rawEntry)
|
||||
}
|
||||
res.AddServiceDefaults(entry)
|
||||
}
|
||||
}
|
||||
|
||||
return maxIndex, &res, nil
|
||||
}
|
||||
|
||||
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for
|
||||
// the provided service name. All relevant config entries will be recursively
|
||||
// fetched and included in the result.
|
||||
|
|
1
go.mod
1
go.mod
|
@ -69,6 +69,7 @@ require (
|
|||
github.com/mitchellh/copystructure v1.0.0
|
||||
github.com/mitchellh/go-testing-interface v1.14.0
|
||||
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||
github.com/mitchellh/mapstructure v1.4.1
|
||||
github.com/mitchellh/pointerstructure v1.2.1
|
||||
github.com/mitchellh/reflectwalk v1.0.1
|
||||
|
|
2
go.sum
2
go.sum
|
@ -396,6 +396,8 @@ github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp
|
|||
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
|
||||
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 h1:hOY53G+kBFhbYFpRVxHl5eS7laP6B1+Cq+Z9Dry1iMU=
|
||||
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
|
||||
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
|
|
Loading…
Reference in New Issue