425 lines
12 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package otelaccesslogging
import (
"fmt"
"strconv"
"strings"
"time"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_extensions_access_loggers_grpc_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/grpc/v3"
envoy_upstreams_http_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/api"
cmn "github.com/hashicorp/consul/envoyextensions/extensioncommon"
"github.com/hashicorp/go-multierror"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
const (
LocalAccessLogClusterName = "local_access_log"
localhost = "localhost"
localhostIPv4 = "127.0.0.1"
localhostIPv6 = "::1"
)
type AccessLog struct {
LogName string
GrpcService *GrpcService
BufferFlushInterval *time.Duration
BufferSizeBytes uint32
FilterStateObjectsToLog []string
RetryPolicy *RetryPolicy
Body any
Attributes map[string]any
ResourceAttributes map[string]any
}
func (a *AccessLog) normalize() {
if a.GrpcService != nil {
a.GrpcService.normalize()
}
if a.RetryPolicy != nil {
a.RetryPolicy.normalize()
}
}
func (a *AccessLog) validate() error {
a.normalize()
if a.GrpcService == nil {
return fmt.Errorf("missing GrpcService")
}
var resultErr error
var field string
var validate func() error
field = "GrpcService"
validate = a.GrpcService.validate
if err := validate(); err != nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to validate Config.%s: %w", field, err))
}
return resultErr
}
func (a *AccessLog) envoyGrpcService(cfg *cmn.RuntimeConfig) (*envoy_core_v3.GrpcService, error) {
target := a.GrpcService.Target
clusterName, err := a.getClusterName(cfg, target)
if err != nil {
return nil, err
}
var initialMetadata []*envoy_core_v3.HeaderValue
for _, meta := range a.GrpcService.InitialMetadata {
initialMetadata = append(initialMetadata, meta.toEnvoy())
}
return &envoy_core_v3.GrpcService{
TargetSpecifier: &envoy_core_v3.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &envoy_core_v3.GrpcService_EnvoyGrpc{
ClusterName: clusterName,
Authority: a.GrpcService.Authority,
},
},
Timeout: target.timeoutDurationPB(),
InitialMetadata: initialMetadata,
}, nil
}
// getClusterName returns the name of the cluster for the OpenTelemetry access logging service.
// If the extension is configured with an upstream OpenTelemetry access logging service then the name of the cluster for
// that upstream is returned. If the extension is configured with a URI, the only allowed host is `localhost`
// and the extension will insert a new cluster with the name "local_access_log", so we use that name.
func (a *AccessLog) getClusterName(cfg *cmn.RuntimeConfig, target *Target) (string, error) {
var err error
clusterName := LocalAccessLogClusterName
if target.isService() {
if clusterName, err = target.clusterName(cfg); err != nil {
return "", err
}
}
return clusterName, nil
}
// toEnvoyCluster returns an Envoy cluster for connecting to the OpenTelemetry access logging service.
// If the extension is configured with the OpenTelemetry access logging service locally via the URI set to localhost,
// this func will return a new cluster definition that will allow the proxy to connect to the OpenTelemetry access logging
// service running on localhost on the configured port.
//
// If the extension is configured with the OpenTelemetry access logging service as an upstream there is no need to insert
// a new cluster so this method returns nil.
func (a *AccessLog) toEnvoyCluster(_ *cmn.RuntimeConfig) (*envoy_cluster_v3.Cluster, error) {
target := a.GrpcService.Target
// If the target is an upstream we do not need to create a cluster. We will use the cluster of the upstream.
if target.isService() {
return nil, nil
}
host, port, err := target.addr()
if err != nil {
return nil, err
}
clusterType := &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_STATIC}
if host == localhost {
// If the host is "localhost" use a STRICT_DNS cluster type to perform DNS lookup.
clusterType = &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_STRICT_DNS}
}
var typedExtProtoOpts map[string]*anypb.Any
httpProtoOpts := &envoy_upstreams_http_v3.HttpProtocolOptions{
UpstreamProtocolOptions: &envoy_upstreams_http_v3.HttpProtocolOptions_ExplicitHttpConfig_{
ExplicitHttpConfig: &envoy_upstreams_http_v3.HttpProtocolOptions_ExplicitHttpConfig{
ProtocolConfig: &envoy_upstreams_http_v3.HttpProtocolOptions_ExplicitHttpConfig_Http2ProtocolOptions{},
},
},
}
httpProtoOptsAny, err := anypb.New(httpProtoOpts)
if err != nil {
return nil, err
}
typedExtProtoOpts = make(map[string]*anypb.Any)
typedExtProtoOpts["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"] = httpProtoOptsAny
return &envoy_cluster_v3.Cluster{
Name: LocalAccessLogClusterName,
ClusterDiscoveryType: clusterType,
ConnectTimeout: target.timeoutDurationPB(),
LoadAssignment: &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: LocalAccessLogClusterName,
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
{
LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint_v3.Endpoint{
Address: &envoy_core_v3.Address{
Address: &envoy_core_v3.Address_SocketAddress{
SocketAddress: &envoy_core_v3.SocketAddress{
Address: host,
PortSpecifier: &envoy_core_v3.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
},
},
}},
},
},
},
TypedExtensionProtocolOptions: typedExtProtoOpts,
}, nil
}
func (a *AccessLog) toEnvoyCommonGrpcAccessLogConfig(cfg *cmn.RuntimeConfig) (*envoy_extensions_access_loggers_grpc_v3.CommonGrpcAccessLogConfig, error) {
config := &envoy_extensions_access_loggers_grpc_v3.CommonGrpcAccessLogConfig{
LogName: a.LogName,
BufferSizeBytes: wrapperspb.UInt32(a.BufferSizeBytes),
FilterStateObjectsToLog: a.FilterStateObjectsToLog,
TransportApiVersion: envoy_core_v3.ApiVersion_V3,
}
if a.BufferFlushInterval != nil {
config.BufferFlushInterval = durationpb.New(*a.BufferFlushInterval)
}
if a.RetryPolicy != nil {
config.GrpcStreamRetryPolicy = a.RetryPolicy.toEnvoy()
}
grpcSvc, err := a.envoyGrpcService(cfg)
if err != nil {
return nil, err
}
config.GrpcService = grpcSvc
return config, nil
}
type GrpcService struct {
Target *Target
Authority string
InitialMetadata []*HeaderValue
}
func (v *GrpcService) normalize() {
if v == nil {
return
}
v.Target.normalize()
}
func (v *GrpcService) validate() error {
var resultErr error
if v == nil {
return resultErr
}
if v.Target == nil {
resultErr = multierror.Append(resultErr, fmt.Errorf("GrpcService.Target must be set"))
}
if err := v.Target.validate(); err != nil {
resultErr = multierror.Append(resultErr, err)
}
return resultErr
}
type HeaderValue struct {
Key string
Value string
}
func (h *HeaderValue) toEnvoy() *envoy_core_v3.HeaderValue {
if h == nil {
return nil
}
return &envoy_core_v3.HeaderValue{Key: h.Key, Value: h.Value}
}
type Target struct {
Service api.CompoundServiceName
URI string
Timeout string
timeout *time.Duration
host string
port int
}
// addr returns the host and port for the target when the target is a URI.
// It returns a non-nil error if the target is not a URI.
func (t Target) addr() (string, int, error) {
if !t.isURI() {
return "", 0, fmt.Errorf("target is not configured with a URI, set Target.URI")
}
return t.host, t.port, nil
}
// clusterName returns the cluster name for the target when the target is an upstream service.
// It searches through the upstreams in the provided runtime configuration and returns the name
// of the cluster for the first upstream service that matches the target service.
// It returns a non-nil error if a matching cluster is not found or if the target is not an
// upstream service.
func (t Target) clusterName(cfg *cmn.RuntimeConfig) (string, error) {
if !t.isService() {
return "", fmt.Errorf("target is not configured with an upstream service, set Target.Service")
}
for service, upstream := range cfg.Upstreams {
if service == t.Service {
for sni := range upstream.SNIs {
return sni, nil
}
}
}
return "", fmt.Errorf("no upstream definition found for service %q", t.Service.Name)
}
func (t Target) isService() bool {
return t.Service.Name != ""
}
func (t Target) isURI() bool {
return t.URI != ""
}
func (t *Target) normalize() {
if t == nil {
return
}
t.Service.Namespace = acl.NamespaceOrDefault(t.Service.Namespace)
t.Service.Partition = acl.PartitionOrDefault(t.Service.Partition)
}
// timeoutDurationPB returns the target's timeout as a *durationpb.Duration.
// It returns nil if the timeout has not been explicitly set.
func (t *Target) timeoutDurationPB() *durationpb.Duration {
if t == nil || t.timeout == nil {
return nil
}
return durationpb.New(*t.timeout)
}
func (t *Target) validate() error {
var err, resultErr error
if t == nil {
return resultErr
}
if t.isURI() == t.isService() {
resultErr = multierror.Append(resultErr, fmt.Errorf("exactly one of Target.Service or Target.URI must be set"))
}
if t.isURI() {
t.host, t.port, err = parseAddr(t.URI)
if err == nil {
switch t.host {
case localhost, localhostIPv4, localhostIPv6:
default:
resultErr = multierror.Append(resultErr,
fmt.Errorf("invalid host for Target.URI %q: expected %q, %q, or %q", t.URI, localhost, localhostIPv4, localhostIPv6))
}
} else {
resultErr = multierror.Append(resultErr, fmt.Errorf("invalid format for Target.URI %q: expected host:port", t.URI))
}
}
if t.Timeout != "" {
if d, err := time.ParseDuration(t.Timeout); err == nil {
t.timeout = &d
} else {
resultErr = multierror.Append(resultErr, fmt.Errorf("failed to parse Target.Timeout %q as a duration: %w", t.Timeout, err))
}
}
return resultErr
}
type RetryPolicy struct {
RetryBackOff *RetryBackOff
NumRetries uint32
}
func (r *RetryPolicy) normalize() {
if r == nil {
return
}
r.RetryBackOff.normalize()
}
func (r *RetryPolicy) toEnvoy() *envoy_core_v3.RetryPolicy {
if r == nil {
return nil
}
return &envoy_core_v3.RetryPolicy{
RetryBackOff: r.RetryBackOff.toEnvoy(),
NumRetries: wrapperspb.UInt32(r.NumRetries),
}
}
type RetryBackOff struct {
BaseInterval *time.Duration
MaxInterval *time.Duration
}
func (v *RetryBackOff) normalize() {
if v == nil {
return
}
if v.BaseInterval == nil {
v.BaseInterval = new(time.Duration)
*v.BaseInterval = time.Second
}
if v.MaxInterval == nil {
v.MaxInterval = new(time.Duration)
*v.MaxInterval = time.Second * 30
}
}
func (r *RetryBackOff) toEnvoy() *envoy_core_v3.BackoffStrategy {
if r == nil {
return nil
}
return &envoy_core_v3.BackoffStrategy{
BaseInterval: durationpb.New(*r.BaseInterval),
MaxInterval: durationpb.New(*r.MaxInterval),
}
}
func parseAddr(s string) (host string, port int, err error) {
// Strip the protocol if one was provided
if _, addr, hasProto := strings.Cut(s, "://"); hasProto {
s = addr
}
idx := strings.LastIndex(s, ":")
switch idx {
case -1, len(s) - 1:
err = fmt.Errorf("invalid input format %q: expected host:port", s)
case 0:
host = localhost
port, err = strconv.Atoi(s[idx+1:])
default:
host = s[:idx]
port, err = strconv.Atoi(s[idx+1:])
}
return
}