consul/agent/hcp/telemetry/otel_exporter_test.go
Ashvitha 828567c62e
[HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration (#18168)
* OTElExporter now uses an EndpointProvider to discover the endpoint

* OTELSink uses a ConfigProvider to obtain filters and labels configuration

* improve tests for otel_sink

* Regex logic is moved into client for a method on the TelemetryConfig object

* Create a telemetry_config_provider and update deps to use it

* Fix conversion

* fix import newline

* Add logger to hcp client and move telemetry_config out of the client.go file

* Add a telemetry_config.go to refactor client.go

* Update deps

* update hcp deps test

* Modify telemetry_config_providers

* Check for nil filters

* PR review updates

* Fix comments and move around pieces

* Fix comments

* Remove context from client struct

* Moved ctx out of sink struct and fixed filters, added a test

* Remove named imports, use errors.New if not fformatting

* Remove HCP dependencies in telemetry package

* Add success metric and move lock only to grab the t.cfgHahs

* Update hash

* fix nits

* Create an equals method and add tests

* Improve telemetry_config_provider.go tests

* Add race test

* Add missing godoc

* Remove mock for MetricsClient

* Avoid goroutine test panics

* trying to kick CI lint issues by upgrading mod

* imprve test code and add hasher for testing

* Use structure logging for filters, fix error constants, and default to allow all regex

* removed hashin and modify logic to simplify

* Improve race test and fix PR feedback by removing hash equals and avoid testing the timer.Ticker logic, and instead unit test

* Ran make go-mod-tidy

* Use errtypes in the test

* Add changelog

* add safety check for exporter endpoint

* remove require.Contains by using error types, fix structure logging, and fix success metric typo in exporter

* Fixed race test to have changing config values

* Send success metric before modifying config

* Avoid the defer and move the success metric under
2023-08-01 17:20:18 -04:00

236 lines
5.7 KiB
Go

package telemetry
import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
const (
testExportEndpoint = "https://test.com/v1/metrics"
)
type mockMetricsClient struct {
exportErr error
}
func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}
type mockEndpointProvider struct {
endpoint *url.URL
}
func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }
func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter))
}
func TestAggregation(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
kind metric.InstrumentKind
expAgg aggregation.Aggregation
}{
"gauge": {
kind: metric.InstrumentKindObservableGauge,
expAgg: aggregation.LastValue{},
},
"counter": {
kind: metric.InstrumentKindCounter,
expAgg: aggregation.Sum{},
},
"histogram": {
kind: metric.InstrumentKindHistogram,
expAgg: aggregation.ExplicitBucketHistogram{Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false},
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
require.Equal(t, test.expAgg, exp.Aggregation(test.kind))
})
}
}
func TestExport(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
wantErr string
metrics *metricdata.ResourceMetrics
client MetricsClient
provider EndpointProvider
}{
"earlyReturnWithoutEndpoint": {
client: &mockMetricsClient{},
provider: &mockEndpointProvider{},
},
"earlyReturnWithoutScopeMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
},
"earlyReturnWithoutMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{}},
},
),
},
"errorWithExportFailure": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("failed to export metrics."),
},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{
Metrics: []metricdata.Metrics{
{
Name: "consul.raft.commitTime",
Data: metricdata.Gauge[float64]{},
},
},
},
},
),
wantErr: "failed to export metrics",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
provider := test.provider
if provider == nil {
u, err := url.Parse(testExportEndpoint)
require.NoError(t, err)
provider = &mockEndpointProvider{
endpoint: u,
}
}
exp := NewOTELExporter(test.client, provider)
err := exp.Export(context.Background(), test.metrics)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}
require.NoError(t, err)
})
}
}
// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted
// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal()
// sets a shared global sink.
func TestExport_CustomMetrics(t *testing.T) {
for name, tc := range map[string]struct {
client MetricsClient
metricKey []string
operation string
}{
"exportSuccessEmitsCustomMetric": {
client: &mockMetricsClient{},
metricKey: internalMetricExportSuccess,
operation: "export",
},
"exportFailureEmitsCustomMetric": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("client err"),
},
metricKey: internalMetricExportFailure,
operation: "export",
},
"shutdownEmitsCustomMetric": {
metricKey: internalMetricExporterShutdown,
operation: "shutdown",
},
"forceFlushEmitsCustomMetric": {
metricKey: internalMetricExporterForceFlush,
operation: "flush",
},
} {
t.Run(name, func(t *testing.T) {
// Init global sink.
serviceName := "test.transform"
cfg := metrics.DefaultConfig(serviceName)
cfg.EnableHostname = false
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)
// Perform operation that emits metric.
u, err := url.Parse(testExportEndpoint)
require.NoError(t, err)
exp := NewOTELExporter(tc.client, &mockEndpointProvider{
endpoint: u,
})
ctx := context.Background()
switch tc.operation {
case "flush":
exp.ForceFlush(ctx)
case "shutdown":
exp.Shutdown(ctx)
default:
exp.Export(ctx, inputResourceMetrics)
}
// Collect sink metrics.
intervals := sink.Data()
require.Len(t, intervals, 1)
key := serviceName + "." + strings.Join(tc.metricKey, ".")
sv := intervals[0].Counters[key]
// Verify count for transform failure metric.
require.NotNil(t, sv)
require.NotNil(t, sv.AggregateSample)
require.Equal(t, 1, sv.AggregateSample.Count)
})
}
}
func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := exp.ForceFlush(ctx)
require.ErrorIs(t, err, context.Canceled)
}
func TestShutdown(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := exp.Shutdown(ctx)
require.ErrorIs(t, err, context.Canceled)
}
func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics {
return &metricdata.ResourceMetrics{
Resource: resource.Empty(),
ScopeMetrics: m,
}
}