consul/agent/hcp/client/metrics_client.go
Ashvitha 828567c62e
[HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration (#18168)
* OTElExporter now uses an EndpointProvider to discover the endpoint

* OTELSink uses a ConfigProvider to obtain filters and labels configuration

* improve tests for otel_sink

* Regex logic is moved into client for a method on the TelemetryConfig object

* Create a telemetry_config_provider and update deps to use it

* Fix conversion

* fix import newline

* Add logger to hcp client and move telemetry_config out of the client.go file

* Add a telemetry_config.go to refactor client.go

* Update deps

* update hcp deps test

* Modify telemetry_config_providers

* Check for nil filters

* PR review updates

* Fix comments and move around pieces

* Fix comments

* Remove context from client struct

* Moved ctx out of sink struct and fixed filters, added a test

* Remove named imports, use errors.New if not fformatting

* Remove HCP dependencies in telemetry package

* Add success metric and move lock only to grab the t.cfgHahs

* Update hash

* fix nits

* Create an equals method and add tests

* Improve telemetry_config_provider.go tests

* Add race test

* Add missing godoc

* Remove mock for MetricsClient

* Avoid goroutine test panics

* trying to kick CI lint issues by upgrading mod

* imprve test code and add hasher for testing

* Use structure logging for filters, fix error constants, and default to allow all regex

* removed hashin and modify logic to simplify

* Improve race test and fix PR feedback by removing hash equals and avoid testing the timer.Ticker logic, and instead unit test

* Ran make go-mod-tidy

* Use errtypes in the test

* Add changelog

* add safety check for exporter endpoint

* remove require.Contains by using error types, fix structure logging, and fix success metric typo in exporter

* Fixed race test to have changing config values

* Send success metric before modifying config

* Avoid the defer and move the success metric under
2023-08-01 17:20:18 -04:00

168 lines
5.0 KiB
Go

package client
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-retryablehttp"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/hashicorp/hcp-sdk-go/resource"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/consul/version"
)
const (
// HTTP Client config
defaultStreamTimeout = 15 * time.Second
// Retry config
// TODO: Eventually, we'd like to configure these values dynamically.
defaultRetryWaitMin = 1 * time.Second
defaultRetryWaitMax = 15 * time.Second
// defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible.
// This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now.
defaultRetryMax = 0
// defaultErrRespBodyLength refers to the max character length of the body on a failure to export metrics.
// anything beyond we will truncate.
defaultErrRespBodyLength = 100
)
// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type CloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
Resource() (resource.Resource, error)
}
// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
// It also holds default HTTP headers to add to export requests.
type otlpClient struct {
client *retryablehttp.Client
header *http.Header
}
// NewMetricsClient returns a configured MetricsClient.
// The current implementation uses otlpClient to provide retry functionality.
func NewMetricsClient(ctx context.Context, cfg CloudConfig) (telemetry.MetricsClient, error) {
if cfg == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)")
}
if ctx == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide a valid context")
}
logger := hclog.FromContext(ctx)
c, err := newHTTPClient(cfg, logger)
if err != nil {
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}
r, err := cfg.Resource()
if err != nil {
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}
header := make(http.Header)
header.Set("content-type", "application/x-protobuf")
header.Set("x-hcp-resource-id", r.String())
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))
return &otlpClient{
client: c,
header: &header,
}, nil
}
// newHTTPClient configures the retryable HTTP client.
func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) {
hcpCfg, err := cloudCfg.HCPConfig()
if err != nil {
return nil, err
}
tlsTransport := cleanhttp.DefaultPooledTransport()
tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig()
var transport http.RoundTripper = &oauth2.Transport{
Base: tlsTransport,
Source: hcpCfg,
}
client := &http.Client{
Transport: transport,
Timeout: defaultStreamTimeout,
}
retryClient := &retryablehttp.Client{
HTTPClient: client,
Logger: logger.Named("hcp_telemetry_client"),
RetryWaitMin: defaultRetryWaitMin,
RetryWaitMax: defaultRetryWaitMax,
RetryMax: defaultRetryMax,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}
return retryClient, nil
}
// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}
body, err := proto.Marshal(pbRequest)
if err != nil {
return fmt.Errorf("failed to marshal the request: %w", err)
}
req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header = *o.header
resp, err := o.client.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("failed to post metrics: %w", err)
}
defer resp.Body.Close()
var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
return fmt.Errorf("failed to read body: %w", err)
}
if resp.StatusCode != http.StatusOK {
truncatedBody := truncate(respData.String(), defaultErrRespBodyLength)
return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, truncatedBody)
}
return nil
}
func truncate(text string, width uint) string {
if len(text) <= int(width) {
return text
}
r := []rune(text)
trunc := r[:width]
return string(trunc) + "..."
}