Integration test for server rate limiting (#15960)

* rate limit test

* Have tests for the 3 modes

* added assertions for logs and metrics

* add comments to test sections

* add check for rate limit exceeded text in log assertion section.

* fix linting error

* updating test to use KV get and put.  move log assertion tolast.

* Adding logging for blocking messages in enforcing mode.  refactoring tests.

* modified test description

* formatting

* Apply suggestions from code review

Co-authored-by: Dan Upton <daniel@floppy.co>

* Update test/integration/consul-container/test/ratelimit/ratelimit_test.go

Co-authored-by: Dhia Ayachi <dhia@hashicorp.com>

* expand log checking so that it ensures both logs are they when they are supposed to be and not there when they are not expected to be.

* add retry on test

* Warn once when rate limit exceed regardless of enforcing vs permissive.

* Update test/integration/consul-container/test/ratelimit/ratelimit_test.go

Co-authored-by: Dan Upton <daniel@floppy.co>

Co-authored-by: Dan Upton <daniel@floppy.co>
Co-authored-by: Dhia Ayachi <dhia@hashicorp.com>
This commit is contained in:
John Murret 2023-01-19 08:43:33 -07:00 committed by GitHub
parent 13da1a5285
commit 794277371f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 258 additions and 5 deletions

View File

@ -207,7 +207,7 @@ func (h *Handler) Allow(op Operation) error {
// TODO(NET-1382): is this the correct log-level?
enforced := l.mode == ModeEnforcing
h.logger.Trace("RPC exceeded allowed rate limit",
h.logger.Warn("RPC exceeded allowed rate limit",
"rpc", op.Name,
"source_addr", op.SourceAddr,
"limit_type", l.desc,
@ -230,7 +230,6 @@ func (h *Handler) Allow(op Operation) error {
})
if enforced {
// TODO(NET-1382) - use the logger to print rate limiter logs.
if h.leaderStatusProvider.IsLeader() && op.Type == OperationTypeWrite {
return ErrRetryLater
}

View File

@ -40,6 +40,7 @@ type Config struct {
Image string
Version string
Cmd []string
LogConsumer testcontainers.LogConsumer
// service defaults
UseAPIWithTLS bool // TODO

View File

@ -198,10 +198,14 @@ func NewConsulContainer(ctx context.Context, config Config, network string, inde
_ = consulContainer.StopLogProducer()
})
if config.LogConsumer != nil {
consulContainer.FollowOutput(config.LogConsumer)
} else {
consulContainer.FollowOutput(&LogConsumer{
Prefix: opts.name,
})
}
}
node := &consulContainerNode{
config: config,

View File

@ -0,0 +1,249 @@
package ratelimit
import (
"fmt"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
)
const (
retryableErrorMsg = "Unexpected response code: 429 (rate limit exceeded, try again later or against a different server)"
nonRetryableErrorMsg = "Unexpected response code: 503 (rate limit exceeded for operation that can only be performed by the leader, try again later)"
)
// TestRateLimit
// This test validates
// - enforcing mode
// - read_rate - returns 429 - was blocked and returns retryable error
// - write_rate - returns 503 - was blocked and is not retryable
// - on each
// - fires metrics forexceeding
// - logs for exceeding
func TestServerRequestRateLimit(t *testing.T) {
type action struct {
function func(client *api.Client) error
rateLimitOperation string
rateLimitType string // will become an array of strings
}
type operation struct {
action action
expectedErrorMsg string
expectExceededLog bool
expectMetric bool
}
type testCase struct {
description string
cmd string
operations []operation
}
getKV := action{
function: func(client *api.Client) error {
_, _, err := client.KV().Get("foo", &api.QueryOptions{})
return err
},
rateLimitOperation: "KVS.Get",
rateLimitType: "global/read",
}
putKV := action{
function: func(client *api.Client) error {
_, err := client.KV().Put(&api.KVPair{Key: "foo", Value: []byte("bar")}, &api.WriteOptions{})
return err
},
rateLimitOperation: "KVS.Apply",
rateLimitType: "global/write",
}
testCases := []testCase{
// HTTP & net/RPC
{
description: "HTTP & net/RPC / Mode: disabled - errors: no / exceeded logs: no / metrics: no",
cmd: `-hcl=limits { request_limits { mode = "disabled" read_rate = 0 write_rate = 0 }}`,
operations: []operation{
{
action: putKV,
expectedErrorMsg: "",
expectExceededLog: false,
expectMetric: false,
},
{
action: getKV,
expectedErrorMsg: "",
expectExceededLog: false,
expectMetric: false,
},
},
},
{
description: "HTTP & net/RPC / Mode: permissive - errors: no / exceeded logs: yes / metrics: yes",
cmd: `-hcl=limits { request_limits { mode = "permissive" read_rate = 0 write_rate = 0 }}`,
operations: []operation{
{
action: putKV,
expectedErrorMsg: "",
expectExceededLog: true,
expectMetric: false,
},
{
action: getKV,
expectedErrorMsg: "",
expectExceededLog: true,
expectMetric: false,
},
},
},
{
description: "HTTP & net/RPC / Mode: enforcing - errors: yes / exceeded logs: yes / metrics: yes",
cmd: `-hcl=limits { request_limits { mode = "enforcing" read_rate = 0 write_rate = 0 }}`,
operations: []operation{
{
action: putKV,
expectedErrorMsg: nonRetryableErrorMsg,
expectExceededLog: true,
expectMetric: true,
},
{
action: getKV,
expectedErrorMsg: retryableErrorMsg,
expectExceededLog: true,
expectMetric: true,
},
},
}}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
logConsumer := &TestLogConsumer{}
cluster := createCluster(t, tc.cmd, logConsumer)
defer terminate(t, cluster)
client, err := cluster.GetClient(nil, true)
require.NoError(t, err)
// perform actions and validate returned errors to client
for _, op := range tc.operations {
err = op.action.function(client)
if len(op.expectedErrorMsg) > 0 {
require.Error(t, err)
require.Equal(t, op.expectedErrorMsg, err.Error())
} else {
require.NoError(t, err)
}
}
// validate logs and metrics
// doing this in a separate loop so we can perform actions, allow metrics
// and logs to collect and then assert on each.
for _, op := range tc.operations {
timer := &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}
retry.RunWith(timer, t, func(r *retry.R) {
// validate metrics
metricsInfo, err := client.Agent().Metrics()
// TODO(NET-1978): currently returns NaN error
// require.NoError(t, err)
if metricsInfo != nil && err == nil {
if op.expectMetric {
checkForMetric(r, metricsInfo, op.action.rateLimitOperation, op.action.rateLimitType)
}
}
// validate logs
// putting this last as there are cases where logs
// were not present in consumer when assertion was made.
checkLogsForMessage(r, logConsumer.Msgs,
fmt.Sprintf("[WARN] agent.server.rpc-rate-limit: RPC exceeded allowed rate limit: rpc=%s", op.action.rateLimitOperation),
op.action.rateLimitOperation, "exceeded", op.expectExceededLog)
})
}
})
}
}
func checkForMetric(t *retry.R, metricsInfo *api.MetricsInfo, operationName string, expectedLimitType string) {
for _, counter := range metricsInfo.Counters {
if counter.Name == "consul.rate.limit" {
operation, ok := counter.Labels["op"]
require.True(t, ok)
limitType, ok := counter.Labels["limit_type"]
require.True(t, ok)
mode, ok := counter.Labels["mode"]
require.True(t, ok)
if operation == operationName {
require.Equal(t, 2, counter.Count)
require.Equal(t, expectedLimitType, limitType)
require.Equal(t, "disabled", mode)
}
}
}
}
func checkLogsForMessage(t *retry.R, logs []string, msg string, operationName string, logType string, logShouldExist bool) {
found := false
for _, log := range logs {
if strings.Contains(log, msg) {
found = true
break
}
}
require.Equal(t, logShouldExist, found, fmt.Sprintf("%s log check failed for: %s. Log expected: %t", logType, operationName, logShouldExist))
}
func terminate(t *testing.T, cluster *libcluster.Cluster) {
err := cluster.Terminate()
require.NoError(t, err)
}
type TestLogConsumer struct {
Msgs []string
}
func (g *TestLogConsumer) Accept(l testcontainers.Log) {
g.Msgs = append(g.Msgs, string(l.Content))
}
// createCluster
func createCluster(t *testing.T, cmd string, logConsumer *TestLogConsumer) *libcluster.Cluster {
opts := libcluster.BuildOptions{
InjectAutoEncryption: true,
InjectGossipEncryption: true,
}
ctx := libcluster.NewBuildContext(t, opts)
conf := libcluster.NewConfigBuilder(ctx).ToAgentConfig(t)
conf.LogConsumer = logConsumer
t.Logf("Cluster config:\n%s", conf.JSON)
parsedConfigs := []libcluster.Config{*conf}
cfgs := []libcluster.Config{}
for _, cfg := range parsedConfigs {
// add command
cfg.Cmd = append(cfg.Cmd, cmd)
cfgs = append(cfgs, cfg)
}
cluster, err := libcluster.New(t, cfgs)
require.NoError(t, err)
client, err := cluster.GetClient(nil, true)
require.NoError(t, err)
libcluster.WaitForLeader(t, cluster, client)
libcluster.WaitForMembers(t, client, 1)
return cluster
}