consul/agent/hcp/telemetry_provider_test.go

388 lines
10 KiB
Go
Raw Normal View History

[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 09:12:13 -04:00
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
[HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration (#18168) * OTElExporter now uses an EndpointProvider to discover the endpoint * OTELSink uses a ConfigProvider to obtain filters and labels configuration * improve tests for otel_sink * Regex logic is moved into client for a method on the TelemetryConfig object * Create a telemetry_config_provider and update deps to use it * Fix conversion * fix import newline * Add logger to hcp client and move telemetry_config out of the client.go file * Add a telemetry_config.go to refactor client.go * Update deps * update hcp deps test * Modify telemetry_config_providers * Check for nil filters * PR review updates * Fix comments and move around pieces * Fix comments * Remove context from client struct * Moved ctx out of sink struct and fixed filters, added a test * Remove named imports, use errors.New if not fformatting * Remove HCP dependencies in telemetry package * Add success metric and move lock only to grab the t.cfgHahs * Update hash * fix nits * Create an equals method and add tests * Improve telemetry_config_provider.go tests * Add race test * Add missing godoc * Remove mock for MetricsClient * Avoid goroutine test panics * trying to kick CI lint issues by upgrading mod * imprve test code and add hasher for testing * Use structure logging for filters, fix error constants, and default to allow all regex * removed hashin and modify logic to simplify * Improve race test and fix PR feedback by removing hash equals and avoid testing the timer.Ticker logic, and instead unit test * Ran make go-mod-tidy * Use errtypes in the test * Add changelog * add safety check for exporter endpoint * remove require.Contains by using error types, fix structure logging, and fix success metric typo in exporter * Fixed race test to have changing config values * Send success metric before modifying config * Avoid the defer and move the success metric under
2023-08-01 17:20:18 -04:00
package hcp
import (
"context"
"fmt"
"net/url"
"regexp"
"strings"
"sync"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/hcp/client"
)
const (
testRefreshInterval = 100 * time.Millisecond
testSinkServiceName = "test.telemetry_config_provider"
testRaceWriteSampleCount = 100
testRaceReadSampleCount = 5000
)
var (
// Test constants to verify inmem sink metrics.
testMetricKeyFailure = testSinkServiceName + "." + strings.Join(internalMetricRefreshFailure, ".")
testMetricKeySuccess = testSinkServiceName + "." + strings.Join(internalMetricRefreshSuccess, ".")
)
type testConfig struct {
filters string
endpoint string
labels map[string]string
refreshInterval time.Duration
}
func TestNewTelemetryConfigProvider(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
testInputs *testConfig
wantErr string
}{
"success": {
testInputs: &testConfig{
refreshInterval: 1 * time.Second,
},
},
"failsWithInvalidRefreshInterval": {
testInputs: &testConfig{
refreshInterval: 0 * time.Second,
},
wantErr: "invalid refresh interval",
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCfg, err := testTelemetryCfg(tc.testInputs)
require.NoError(t, err)
cfgProvider, err := NewHCPProvider(ctx, client.NewMockClient(t), testCfg)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
require.Nil(t, cfgProvider)
return
}
require.NotNil(t, cfgProvider)
})
}
}
func TestTelemetryConfigProviderGetUpdate(t *testing.T) {
for name, tc := range map[string]struct {
mockExpect func(*client.MockClient)
metricKey string
optsInputs *testConfig
expected *testConfig
}{
"noChanges": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeySuccess,
},
"newConfig": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: 2 * time.Second,
},
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://newendpoint/v1/metrics",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
endpoint: "http://newendpoint/v1/metrics",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
},
metricKey: testMetricKeySuccess,
},
"sameConfigInvalidRefreshInterval": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
refreshInterval: 0 * time.Second,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeyFailure,
},
"sameConfigHCPClientFailure": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
mockExpect: func(m *client.MockClient) {
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("failure"))
},
expected: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeyFailure,
},
} {
t.Run(name, func(t *testing.T) {
sink := initGlobalSink()
mockClient := client.NewMockClient(t)
tc.mockExpect(mockClient)
dynamicCfg, err := testDynamicCfg(tc.optsInputs)
require.NoError(t, err)
provider := &hcpProviderImpl{
hcpClient: mockClient,
cfg: dynamicCfg,
}
provider.getUpdate(context.Background())
// Verify endpoint provider returns correct config values.
require.Equal(t, tc.expected.endpoint, provider.GetEndpoint().String())
require.Equal(t, tc.expected.filters, provider.GetFilters().String())
require.Equal(t, tc.expected.labels, provider.GetLabels())
// Verify count for transform success metric.
interval := sink.Data()[0]
require.NotNil(t, interval, 1)
sv := interval.Counters[tc.metricKey]
assert.NotNil(t, sv.AggregateSample)
require.Equal(t, sv.AggregateSample.Count, 1)
})
}
}
// mockRaceClient is a mock HCP client that fetches TelemetryConfig.
// The mock TelemetryConfig returned can be manually updated at any time.
// It manages concurrent read/write access to config with a sync.RWMutex.
type mockRaceClient struct {
client.Client
cfg *client.TelemetryConfig
rw sync.RWMutex
}
// updateCfg acquires a write lock and updates client config to a new value givent a count.
func (m *mockRaceClient) updateCfg(count int) (*client.TelemetryConfig, error) {
m.rw.Lock()
defer m.rw.Unlock()
labels := map[string]string{fmt.Sprintf("label_%d", count): fmt.Sprintf("value_%d", count)}
filters, err := regexp.Compile(fmt.Sprintf("consul_filter_%d", count))
if err != nil {
return nil, err
}
endpoint, err := url.Parse(fmt.Sprintf("http://consul-endpoint-%d.com", count))
if err != nil {
return nil, err
}
cfg := &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Filters: filters,
Endpoint: endpoint,
Labels: labels,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testRefreshInterval,
},
}
m.cfg = cfg
return cfg, nil
}
// FetchTelemetryConfig returns the current config held by the mockRaceClient.
func (m *mockRaceClient) FetchTelemetryConfig(ctx context.Context) (*client.TelemetryConfig, error) {
m.rw.RLock()
defer m.rw.RUnlock()
return m.cfg, nil
}
func TestTelemetryConfigProvider_Race(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
initCfg, err := testTelemetryCfg(&testConfig{
endpoint: "test.com",
filters: "test",
labels: map[string]string{"test_label": "test_value"},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)
m := &mockRaceClient{
cfg: initCfg,
}
// Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval.
provider, err := NewHCPProvider(ctx, m, m.cfg)
require.NoError(t, err)
for count := 0; count < testRaceWriteSampleCount; count++ {
// Force a TelemetryConfig value change in the mockRaceClient.
newCfg, err := m.updateCfg(count)
require.NoError(t, err)
// Force provider to obtain new client TelemetryConfig immediately.
// This call is necessary to guarantee TelemetryConfig changes to assert on expected values below.
provider.getUpdate(context.Background())
// Start goroutines to access label configuration.
wg := &sync.WaitGroup{}
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetLabels(), newCfg.MetricsConfig.Labels)
})
// Start goroutines to access endpoint configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetFilters(), newCfg.MetricsConfig.Filters)
})
// Start goroutines to access filter configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetEndpoint(), newCfg.MetricsConfig.Endpoint)
})
wg.Wait()
}
}
func kickOff(wg *sync.WaitGroup, count int, provider *hcpProviderImpl, check func(cfgProvider *hcpProviderImpl)) {
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
check(provider)
}()
}
}
// initGlobalSink is a helper function to initialize a Go metrics inmemsink.
func initGlobalSink() *metrics.InmemSink {
cfg := metrics.DefaultConfig(testSinkServiceName)
cfg.EnableHostname = false
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)
return sink
}
// testDynamicCfg converts testConfig inputs to a dynamicConfig to be used in tests.
func testDynamicCfg(testCfg *testConfig) (*dynamicConfig, error) {
filters, err := regexp.Compile(testCfg.filters)
if err != nil {
return nil, err
}
endpoint, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
}
return &dynamicConfig{
Endpoint: endpoint,
Filters: filters,
Labels: testCfg.labels,
RefreshInterval: testCfg.refreshInterval,
}, nil
}
// testTelemetryCfg converts testConfig inputs to a TelemetryConfig to be used in tests.
func testTelemetryCfg(testCfg *testConfig) (*client.TelemetryConfig, error) {
filters, err := regexp.Compile(testCfg.filters)
if err != nil {
return nil, err
}
endpoint, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
}
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: endpoint,
Filters: filters,
Labels: testCfg.labels,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testCfg.refreshInterval,
},
}, nil
}