consul/agent/hcp/telemetry_provider_test.go

661 lines
18 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package hcp
import (
"context"
"fmt"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/go-openapi/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/version"
"github.com/hashicorp/go-hclog"
)
const (
testRefreshInterval = 100 * time.Millisecond
testSinkServiceName = "test.telemetry_config_provider"
testRaceWriteSampleCount = 100
testRaceReadSampleCount = 5000
)
var (
// Test constants to verify inmem sink metrics.
testMetricKeyFailure = testSinkServiceName + "." + strings.Join(internalMetricRefreshFailure, ".")
testMetricKeySuccess = testSinkServiceName + "." + strings.Join(internalMetricRefreshSuccess, ".")
)
type testConfig struct {
filters string
endpoint string
labels map[string]string
refreshInterval time.Duration
disabled bool
}
func TestNewTelemetryConfigProvider_DefaultConfig(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize new provider
provider := NewHCPProvider(ctx)
provider.updateConfig(ctx)
// Assert provider has default configuration and metrics processing is disabled.
defaultCfg := &dynamicConfig{
labels: map[string]string{},
filters: client.DefaultMetricFilters,
refreshInterval: defaultTelemetryConfigRefreshInterval,
endpoint: nil,
disabled: true,
}
require.Equal(t, defaultCfg, provider.cfg)
}
func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) {
for name, tc := range map[string]struct {
mockExpect func(*client.MockClient)
metricKey string
initCfg *dynamicConfig
expected *dynamicConfig
expectedInterval time.Duration
skipHCPClient bool
}{
"noChanges": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
}),
metricKey: testMetricKeySuccess,
expectedInterval: testRefreshInterval,
},
"newConfig": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: 2 * time.Second,
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://newendpoint/v1/metrics",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: testDynamicCfg(&testConfig{
endpoint: "http://newendpoint/v1/metrics",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
}),
expectedInterval: 2 * time.Second,
metricKey: testMetricKeySuccess,
},
"newConfigMetricsDisabled": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: 2 * time.Second,
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
disabled: true,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: testDynamicCfg(&testConfig{
endpoint: "",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
disabled: true,
}),
metricKey: testMetricKeySuccess,
expectedInterval: 2 * time.Second,
},
"sameConfigInvalidRefreshInterval": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
refreshInterval: 0 * time.Second,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
}),
metricKey: testMetricKeyFailure,
expectedInterval: 0,
},
"sameConfigHCPClientFailure": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
mockExpect: func(m *client.MockClient) {
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("failure"))
},
expected: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
metricKey: testMetricKeyFailure,
expectedInterval: 0,
},
"disableMetrics404": {
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
mockExpect: func(m *client.MockClient) {
err := runtime.NewAPIError("404 failure", nil, 404)
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, err)
},
expected: defaultDisabledCfg(),
metricKey: testMetricKeySuccess,
expectedInterval: defaultTelemetryConfigRefreshInterval,
},
"hcpClientNotConfigured": {
skipHCPClient: true,
initCfg: testDynamicCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
}),
expected: defaultDisabledCfg(),
metricKey: testMetricKeySuccess,
expectedInterval: defaultTelemetryConfigRefreshInterval,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
sink := initGlobalSink()
var mockClient *client.MockClient
if !tc.skipHCPClient {
mockClient = client.NewMockClient(t)
tc.mockExpect(mockClient)
}
provider := &hcpProviderImpl{
hcpClient: mockClient,
cfg: tc.initCfg,
logger: hclog.NewNullLogger(),
}
newInterval := provider.updateConfig(context.Background())
require.Equal(t, tc.expectedInterval, newInterval)
// Verify endpoint provider returns correct config values.
require.Equal(t, tc.expected.endpoint, provider.GetEndpoint())
require.Equal(t, tc.expected.filters, provider.GetFilters())
require.Equal(t, tc.expected.labels, provider.GetLabels())
require.Equal(t, tc.expected.disabled, provider.IsDisabled())
// Verify count for transform success metric.
interval := sink.Data()[0]
require.NotNil(t, interval, 1)
sv := interval.Counters[tc.metricKey]
assert.NotNil(t, sv.AggregateSample)
require.Equal(t, sv.AggregateSample.Count, 1)
})
}
}
func TestTelemetryConfigProvider_Start(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
provider := NewHCPProvider(ctx)
testUpdateConfigCh := make(chan struct{}, 1)
provider.testUpdateConfigCh = testUpdateConfigCh
// Configure mocks
mockClient := client.NewMockClient(t)
mTelemetryCfg, err := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil)
mockHCPCfg := &config.MockCloudCfg{}
// Run provider
go provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
// Expect at least two update config calls to validate provider is running
// and has entered the main run loop
select {
case <-testUpdateConfigCh:
case <-time.After(time.Second):
require.Fail(t, "provider did not attempt to update config in expected time")
}
select {
case <-testUpdateConfigCh:
case <-time.After(time.Millisecond * 500):
require.Fail(t, "provider did not attempt to update config in expected time")
}
mockClient.AssertExpectations(t)
}
func TestTelemetryConfigProvider_MultipleRun(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
provider := NewHCPProvider(ctx)
testUpdateConfigCh := make(chan struct{}, 1)
provider.testUpdateConfigCh = testUpdateConfigCh
// Configure mocks
mockClient := client.NewMockClient(t)
mTelemetryCfg, err := testTelemetryCfg(&testConfig{
refreshInterval: 30 * time.Minute,
})
require.NoError(t, err)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil)
mockHCPCfg := &config.MockCloudCfg{}
// Run provider twice in parallel
go provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
go provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
// Expect only one update config call
select {
case <-testUpdateConfigCh:
case <-time.After(time.Second):
require.Fail(t, "provider did not attempt to update config in expected time")
}
select {
case <-testUpdateConfigCh:
require.Fail(t, "provider unexpectedly updated config")
case <-time.After(time.Second):
}
// Try calling run again, should not update again
provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
select {
case <-testUpdateConfigCh:
require.Fail(t, "provider unexpectedly updated config")
case <-time.After(time.Second):
}
mockClient.AssertExpectations(t)
}
func TestTelemetryConfigProvider_updateHTTPConfig(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
cfg config.CloudConfigurer
}{
"success": {
cfg: &config.MockCloudCfg{},
},
"failsWithoutCloudCfg": {
wantErr: "must provide valid HCP configuration",
cfg: nil,
},
"failsHCPConfig": {
wantErr: "failed to configure telemetry HTTP client",
cfg: config.MockCloudCfg{
ConfigErr: fmt.Errorf("test bad hcp config"),
},
},
"failsBadResource": {
wantErr: "failed set telemetry client headers",
cfg: config.MockCloudCfg{
ResourceErr: fmt.Errorf("test bad resource"),
},
},
} {
t.Run(name, func(t *testing.T) {
provider := NewHCPProvider(context.Background())
err := provider.updateHTTPConfig(test.cfg)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}
require.NoError(t, err)
require.NotNil(t, provider.GetHTTPClient())
expectedHeader := make(http.Header)
expectedHeader.Set("content-type", "application/x-protobuf")
expectedHeader.Set("x-hcp-resource-id", "organization/test-org/project/test-project/test-type/test-id")
expectedHeader.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))
require.Equal(t, expectedHeader, provider.GetHeader())
})
}
}
func TestTelemetryConfigProvider_Stop(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
provider := NewHCPProvider(ctx)
testUpdateConfigCh := make(chan struct{}, 1)
provider.testUpdateConfigCh = testUpdateConfigCh
// Configure mocks
mockClient := client.NewMockClient(t)
mTelemetryCfg, err := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil)
mockHCPCfg := &config.MockCloudCfg{}
// Run provider
provider.Start(context.Background(), &HCPProviderCfg{
HCPClient: mockClient,
HCPConfig: mockHCPCfg,
})
// Wait for at least two update config calls to ensure provider is running
// and has entered the main run loop
select {
case <-testUpdateConfigCh:
case <-time.After(time.Second):
require.Fail(t, "provider did not attempt to update config in expected time")
}
select {
case <-testUpdateConfigCh:
case <-time.After(time.Millisecond * 500):
require.Fail(t, "provider did not attempt to update config in expected time")
}
// Stop the provider
provider.Stop()
require.Equal(t, defaultDisabledCfg(), provider.cfg)
select {
case <-testUpdateConfigCh:
require.Fail(t, "provider should not attempt to update config after stop")
case <-time.After(time.Second):
// Success, no updates have happened after stopping
}
mockClient.AssertExpectations(t)
}
// mockRaceClient is a mock HCP client that fetches TelemetryConfig.
// The mock TelemetryConfig returned can be manually updated at any time.
// It manages concurrent read/write access to config with a sync.RWMutex.
type mockRaceClient struct {
client.Client
cfg *client.TelemetryConfig
rw sync.RWMutex
}
// updateCfg acquires a write lock and updates client config to a new value givent a count.
func (m *mockRaceClient) updateCfg(count int) (*client.TelemetryConfig, error) {
m.rw.Lock()
defer m.rw.Unlock()
labels := map[string]string{fmt.Sprintf("label_%d", count): fmt.Sprintf("value_%d", count)}
filters, err := regexp.Compile(fmt.Sprintf("consul_filter_%d", count))
if err != nil {
return nil, err
}
endpoint, err := url.Parse(fmt.Sprintf("http://consul-endpoint-%d.com", count))
if err != nil {
return nil, err
}
cfg := &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Filters: filters,
Endpoint: endpoint,
Labels: labels,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testRefreshInterval,
},
}
m.cfg = cfg
return cfg, nil
}
// FetchTelemetryConfig returns the current config held by the mockRaceClient.
func (m *mockRaceClient) FetchTelemetryConfig(ctx context.Context) (*client.TelemetryConfig, error) {
m.rw.RLock()
defer m.rw.RUnlock()
return m.cfg, nil
}
func TestTelemetryConfigProvider_Race(t *testing.T) {
//todo(achooo): address flaky test
t.Skip("TODO(flaky): This test fails often in the CI")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
initCfg, err := testTelemetryCfg(&testConfig{
endpoint: "test.com",
filters: "test",
labels: map[string]string{"test_label": "test_value"},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)
m := &mockRaceClient{
cfg: initCfg,
}
// Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval.
provider := NewHCPProvider(ctx)
err = provider.Start(context.Background(), &HCPProviderCfg{m, config.MockCloudCfg{}})
require.NoError(t, err)
for count := 0; count < testRaceWriteSampleCount; count++ {
// Force a TelemetryConfig value change in the mockRaceClient.
newCfg, err := m.updateCfg(count)
require.NoError(t, err)
// Force provider to obtain new client TelemetryConfig immediately.
// This call is necessary to guarantee TelemetryConfig changes to assert on expected values below.
provider.updateConfig(context.Background())
// Start goroutines to access label configuration.
wg := &sync.WaitGroup{}
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetLabels(), newCfg.MetricsConfig.Labels)
})
// Start goroutines to access endpoint configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetFilters(), newCfg.MetricsConfig.Filters)
})
// Start goroutines to access filter configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetEndpoint(), newCfg.MetricsConfig.Endpoint)
})
wg.Wait()
}
}
func kickOff(wg *sync.WaitGroup, count int, provider *hcpProviderImpl, check func(cfgProvider *hcpProviderImpl)) {
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
check(provider)
}()
}
}
// initGlobalSink is a helper function to initialize a Go metrics inmemsink.
func initGlobalSink() *metrics.InmemSink {
cfg := metrics.DefaultConfig(testSinkServiceName)
cfg.EnableHostname = false
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)
return sink
}
// testDynamicCfg converts testConfig inputs to a dynamicConfig to be used in tests.
func testDynamicCfg(testCfg *testConfig) *dynamicConfig {
filters, _ := regexp.Compile(testCfg.filters)
var endpoint *url.URL
if testCfg.endpoint != "" {
endpoint, _ = url.Parse(testCfg.endpoint)
}
return &dynamicConfig{
endpoint: endpoint,
filters: filters,
labels: testCfg.labels,
refreshInterval: testCfg.refreshInterval,
disabled: testCfg.disabled,
}
}
// testTelemetryCfg converts testConfig inputs to a TelemetryConfig to be used in tests.
func testTelemetryCfg(testCfg *testConfig) (*client.TelemetryConfig, error) {
filters, err := regexp.Compile(testCfg.filters)
if err != nil {
return nil, err
}
var endpoint *url.URL
if testCfg.endpoint != "" {
u, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
}
endpoint = u
}
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: endpoint,
Filters: filters,
Labels: testCfg.labels,
Disabled: testCfg.disabled,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testCfg.refreshInterval,
},
}, nil
}