Continue working through proxy and agent

Rework/listeners, rename makeListener

Refactor, tests pass

Signed-off-by: Mark Anderson <manderson@hashicorp.com>
This commit is contained in:
Mark Anderson 2021-03-26 13:00:44 -07:00
parent 8b1217417a
commit 06f0f79218
19 changed files with 160 additions and 47 deletions

View File

@ -399,7 +399,7 @@ func TestAgent_Service(t *testing.T) {
Service: "web-sidecar-proxy",
Port: 8000,
Proxy: expectProxy.ToAPI(),
ContentHash: "eb557bc310d4f8a0",
ContentHash: "35ad6dd5b1ff8d18",
Weights: api.AgentWeights{
Passing: 1,
Warning: 1,
@ -413,7 +413,7 @@ func TestAgent_Service(t *testing.T) {
// Copy and modify
updatedResponse := *expectedResponse
updatedResponse.Port = 9999
updatedResponse.ContentHash = "d61c11f438c7eb02"
updatedResponse.ContentHash = "8e407e299ec9eba"
// Simple response for non-proxy service registered in TestAgent config
expectWebResponse := &api.AgentService{

View File

@ -14,6 +14,7 @@ import (
"reflect"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -1707,7 +1708,7 @@ func (b *builder) upstreamsVal(v []Upstream) structs.Upstreams {
LocalBindAddress: stringVal(u.LocalBindAddress),
LocalBindPort: intVal(u.LocalBindPort),
LocalBindSocketPath: stringVal(u.LocalBindSocketPath),
LocalBindSocketMode: uint32Val(u.LocalBindSocketMode),
LocalBindSocketMode: b.unixPermissionsVal("local_bind_socket_mode", u.LocalBindSocketMode),
Config: u.Config,
MeshGateway: b.meshGatewayConfVal(u.MeshGateway),
}
@ -1894,6 +1895,21 @@ func uint64Val(v *uint64) uint64 {
return *v
}
// Expect an octal permissions string, e.g. 0644
func (b *builder) unixPermissionsVal(name string, v *string) uint32 {
if v == nil {
return 0
}
if strings.HasPrefix(*v, "0") {
if mode, err := strconv.ParseUint(*v, 0, 32); err == nil {
return uint32(mode)
}
}
b.err = multierror.Append(b.err, fmt.Errorf("%s: invalid mode: %s", name, *v))
return 0
}
func (b *builder) portVal(name string, v *int) int {
if v == nil || *v <= 0 {
return -1

View File

@ -516,7 +516,7 @@ type Upstream struct {
// These are exclusive with LocalBindAddress/LocalBindPort
LocalBindSocketPath *string `mapstructure:"local_bind_socket_path"`
LocalBindSocketMode *uint32 `mapstructure:"local_bind_socket_mode"`
LocalBindSocketMode *string `mapstructure:"local_bind_socket_mode"`
// Config is an opaque config that is specific to the proxy process being run.
// It can be used to pass arbitrary configuration for this specific upstream

View File

@ -2590,6 +2590,11 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
{
"destination_name": "db",
"local_bind_port": 7000
},
{
"destination_name": "db2",
"local_bind_socket_path": "/tmp/socketpath",
"local_bind_socket_mode": "0644"
}
]
}
@ -2631,6 +2636,11 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
destination_name = "db"
local_bind_port = 7000
},
{
destination_name = "db2",
local_bind_socket_path = "/tmp/socketpath",
local_bind_socket_mode = "0644"
}
]
}
}
@ -2675,6 +2685,12 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
DestinationName: "db",
LocalBindPort: 7000,
},
structs.Upstream{
DestinationType: "service",
DestinationName: "db2",
LocalBindSocketPath: "/tmp/socketpath",
LocalBindSocketMode: 0644,
},
},
},
Weights: &structs.Weights{

View File

@ -578,6 +578,8 @@ services = [
destination_name = "KSd8HsRl"
local_bind_port = 11884
local_bind_address = "127.24.88.0"
local_bind_socket_path = "/foo/bar/upstream"
local_bind_socket_mode = "0600"
},
]
expose {

View File

@ -561,6 +561,7 @@
"destination_service_name": "6L6BVfgH",
"local_service_address": "127.0.0.2",
"local_service_port": 23759,
"local_service_socket_path": "/foo/bar/local",
"expose": {
"checks": true,
"paths": [
@ -589,7 +590,9 @@
"destination_namespace": "9nakw0td",
"destination_type": "prepared_query",
"local_bind_address": "127.24.88.0",
"local_bind_port": 11884
"local_bind_port": 11884,
"local_bind_socket_path": "/foo/bar/upstream",
"local_bind_socket_mode": "0600"
}
]
}

View File

@ -235,6 +235,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
upstreams[2].Identifier(): &upstreams[2],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
@ -290,6 +291,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
upstreams[2].Identifier(): &upstreams[2],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},

View File

@ -1709,6 +1709,8 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
return nil
}
// Note: Ingress gateways are always bound to ports and never unix sockets.
// This means LocalBindPort is the only possibility
func makeUpstream(g *structs.GatewayService) structs.Upstream {
upstream := structs.Upstream{
DestinationName: g.Service.Name,

View File

@ -410,8 +410,11 @@ func (u *Upstream) Validate() error {
return fmt.Errorf("upstream destination name cannot be a wildcard")
}
if u.LocalBindPort == 0 && !u.CentrallyConfigured {
return fmt.Errorf("upstream local bind port cannot be zero")
if u.LocalBindPort == 0 && u.LocalBindSocketPath == "" && !u.CentrallyConfigured {
return fmt.Errorf("upstream local bind port or local socket path must be defined and nonzero")
}
if u.LocalBindPort != 0 && u.LocalBindSocketPath != "" && !u.CentrallyConfigured {
return fmt.Errorf("only one of upstream local bind port or local socket path can be defined and nonzero")
}
return nil

View File

@ -1189,17 +1189,13 @@ func (s *NodeService) Validate() error {
}
upstreamKeys[uk] = struct{}{}
addr := u.LocalBindAddress
if addr == "" {
addr = "127.0.0.1"
}
addr = net.JoinHostPort(addr, fmt.Sprintf("%d", u.LocalBindPort))
addr := u.UpstreamAddressToString()
// Centrally configured upstreams will fail this check if there are multiple because they do not have an address/port.
// Only consider non-centrally configured upstreams in this check since those are the ones we create listeners for.
if _, ok := bindAddrs[addr]; ok && !u.CentrallyConfigured {
result = multierror.Append(result, fmt.Errorf(
"upstreams cannot contain duplicates by local bind address and port; %q is specified twice", addr))
"upstreams cannot contain duplicates by local bind address and port or unix path; %q is specified twice", addr))
continue
}
bindAddrs[addr] = struct{}{}

View File

@ -176,6 +176,16 @@ var expectedFieldConfigUpstreams bexpr.FieldConfigurations = bexpr.FieldConfigur
CoerceFn: bexpr.CoerceInt,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual},
},
"LocalBindSocketPath": &bexpr.FieldConfiguration{
StructFieldName: "LocalBindSocketPath",
CoerceFn: bexpr.CoerceString,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual, bexpr.MatchIn, bexpr.MatchNotIn, bexpr.MatchMatches, bexpr.MatchNotMatches},
},
"LocalBindSocketMode": &bexpr.FieldConfiguration{
StructFieldName: "LocalBindSocketMode",
CoerceFn: bexpr.CoerceInt,
SupportedOperations: []bexpr.MatchOperator{bexpr.MatchEqual, bexpr.MatchNotEqual},
},
"MeshGateway": &bexpr.FieldConfiguration{
StructFieldName: "MeshGateway",
SubFields: expectedFieldConfigMeshGatewayConfig,

View File

@ -734,7 +734,19 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
LocalBindPort: 0,
}}
},
"upstream local bind port cannot be zero",
"upstream local bind port or local socket path must be defined and nonzero",
},
{
"connect-proxy: upstream bind port and path defined",
func(x *NodeService) {
x.Proxy.Upstreams = Upstreams{{
DestinationType: UpstreamDestTypeService,
DestinationName: "foo",
LocalBindPort: 1,
LocalBindSocketPath: "/tmp/socket",
}}
},
"only one of upstream local bind port or local socket path can be defined and nonzero",
},
{
"connect-proxy: Upstreams almost-but-not-quite-duplicated in various ways",

View File

@ -32,6 +32,11 @@ func TestUpstreams(t testing.T) Upstreams {
LocalBindPort: 8181,
LocalBindAddress: "127.10.10.10",
},
{
DestinationName: "upstream_socket",
LocalBindSocketPath: "/tmp/upstream.sock",
LocalBindSocketMode: "0700",
},
}
}

View File

@ -57,7 +57,6 @@ func (s *ResourceGenerator) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapsh
// listenersFromSnapshotConnectProxy returns the "listeners" for a connect proxy service
func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
resources := make([]proto.Message, 1)
var err error
// Configure inbound listener.
@ -77,7 +76,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
port = cfgSnap.Proxy.TransparentProxy.OutboundListenerPort
}
outboundListener = makeListener(OutboundListenerName, "127.0.0.1", port, envoy_core_v3.TrafficDirection_OUTBOUND)
outboundListener = makePortListener(OutboundListenerName, "127.0.0.1", port, envoy_core_v3.TrafficDirection_OUTBOUND)
outboundListener.FilterChains = make([]*envoy_listener_v3.FilterChain, 0)
outboundListener.ListenerFilters = []*envoy_listener_v3.ListenerFilter{
{
@ -103,13 +102,8 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
continue
}
// Generate the upstream listeners for when they are explicitly set with a local bind port
if outboundListener == nil || (upstreamCfg != nil && upstreamCfg.LocalBindPort != 0) {
address := "127.0.0.1"
if upstreamCfg.LocalBindAddress != "" {
address = upstreamCfg.LocalBindAddress
}
// Generate the upstream listeners for when they are explicitly set with a local bind port or socket path
if outboundListener == nil || (upstreamCfg != nil && upstreamCfg.HasLocalPortOrSocket()) {
filterChain, err := s.makeUpstreamFilterChainForDiscoveryChain(
id,
"",
@ -123,7 +117,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
return nil, err
}
upstreamListener := makeListener(id, address, upstreamCfg.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener := makeListener(id, upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{
filterChain,
}
@ -243,12 +237,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
}
address := "127.0.0.1"
if u.LocalBindAddress != "" {
address = u.LocalBindAddress
}
upstreamListener := makeListener(id, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener := makeListener(id, u, envoy_core_v3.TrafficDirection_OUTBOUND)
filterChain, err := s.makeUpstreamFilterChainForDiscoveryChain(
id,
@ -499,7 +488,7 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap
resources = append(resources, upstreamListener)
} else {
// If multiple upstreams share this port, make a special listener for the protocol.
listener := makeListener(listenerKey.Protocol, address, listenerKey.Port, envoy_core_v3.TrafficDirection_OUTBOUND)
listener := makePortListener(listenerKey.Protocol, address, listenerKey.Port, envoy_core_v3.TrafficDirection_OUTBOUND)
opts := listenerFilterOpts{
useRDS: true,
protocol: listenerKey.Protocol,
@ -546,7 +535,15 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap
// changes them, we actually create a whole new listener on the new address and
// port. Envoy should take care of closing the old one once it sees it's no
// longer in the config.
func makeListener(name, addr string, port int, trafficDirection envoy_core_v3.TrafficDirection) *envoy_listener_v3.Listener {
func makeListener(name string, upstream *structs.Upstream, trafficDirection envoy_core_v3.TrafficDirection) *envoy_listener_v3.Listener {
if upstream.LocalBindPort == 0 && upstream.LocalBindSocketPath != "" {
return makePipeListener(name, upstream.LocalBindSocketPath, upstream.LocalBindSocketMode, trafficDirection)
}
return makePortListenerWithDefault(name, upstream.LocalBindAddress, upstream.LocalBindPort, trafficDirection)
}
func makePortListener(name, addr string, port int, trafficDirection envoy_core_v3.TrafficDirection) *envoy_listener_v3.Listener {
return &envoy_listener_v3.Listener{
Name: fmt.Sprintf("%s:%s:%d", name, addr, port),
Address: makeAddress(addr, port),
@ -554,6 +551,22 @@ func makeListener(name, addr string, port int, trafficDirection envoy_core_v3.Tr
}
}
func makePortListenerWithDefault(name, addr string, port int, trafficDirection envoy_core_v3.TrafficDirection) *envoy_listener_v3.Listener {
if addr == "" {
addr = "127.0.0.1"
}
return makePortListener(name, addr, port, trafficDirection)
}
// TODO markan INVESTIGATE sanitizing path name (path.filepath) clean/validate. (Maybe check if sanitizer alters things, then fail)
func makePipeListener(name, path string, mode uint32, trafficDirection envoy_core_v3.TrafficDirection) *envoy_listener_v3.Listener {
return &envoy_listener_v3.Listener{
Name: fmt.Sprintf("%s:%s", name, path),
Address: makePipeAddress(path, mode),
TrafficDirection: trafficDirection,
}
}
// makeListenerFromUserConfig returns the listener config decoded from an
// arbitrary proto3 json format string or an error if it's invalid.
//
@ -755,7 +768,7 @@ func (s *ResourceGenerator) makeInboundListener(cfgSnap *proxycfg.ConfigSnapshot
port = cfg.BindPort
}
l = makeListener(name, addr, port, envoy_core_v3.TrafficDirection_INBOUND)
l = makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_INBOUND)
filterOpts := listenerFilterOpts{
protocol: cfg.Protocol,
@ -833,7 +846,7 @@ func (s *ResourceGenerator) makeExposedCheckListener(cfgSnap *proxycfg.ConfigSna
strippedPath := r.ReplaceAllString(path.Path, "")
listenerName := fmt.Sprintf("exposed_path_%s", strippedPath)
l := makeListener(listenerName, addr, path.ListenerPort, envoy_core_v3.TrafficDirection_INBOUND)
l := makePortListener(listenerName, addr, path.ListenerPort, envoy_core_v3.TrafficDirection_INBOUND)
filterName := fmt.Sprintf("exposed_path_filter_%s_%d", strippedPath, path.ListenerPort)
@ -898,7 +911,7 @@ func (s *ResourceGenerator) makeTerminatingGatewayListener(
name, addr string,
port int,
) (*envoy_listener_v3.Listener, error) {
l := makeListener(name, addr, port, envoy_core_v3.TrafficDirection_INBOUND)
l := makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_INBOUND)
tlsInspector, err := makeTLSInspectorListenerFilter()
if err != nil {
@ -1088,7 +1101,7 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
},
}
l := makeListener(name, addr, port, envoy_core_v3.TrafficDirection_UNSPECIFIED)
l := makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_UNSPECIFIED)
l.ListenerFilters = []*envoy_listener_v3.ListenerFilter{tlsInspector}
// TODO (mesh-gateway) - Do we need to create clusters for all the old trust domains as well?
@ -1275,12 +1288,15 @@ func (s *ResourceGenerator) makeUpstreamListenerForDiscoveryChain(
cfgSnap *proxycfg.ConfigSnapshot,
tlsContext *envoy_tls_v3.DownstreamTlsContext,
) (proto.Message, error) {
if address == "" {
address = "127.0.0.1"
}
upstreamID := u.Identifier()
l := makeListener(upstreamID, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
// Best understanding is this only makes sense for port listeners....
if u.LocalBindSocketPath != "" {
return nil, fmt.Errorf("makeUpstreamListenerForDiscoveryChain not supported for unix domain sockets %s %+v",
address, u)
}
upstreamID := u.Identifier()
l := makePortListenerWithDefault(upstreamID, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
cfg := s.getAndModifyUpstreamConfigForListener(upstreamID, u, chain)
if cfg.EnvoyListenerJSON != "" {
return makeListenerFromUserConfig(cfg.EnvoyListenerJSON)

View File

@ -38,6 +38,17 @@ func createResponse(typeURL string, version, nonce string, resources []proto.Mes
return resp, nil
}
func makePipeAddress(path string, mode uint32) *envoy_core_v3.Address {
return &envoy_core_v3.Address{
Address: &envoy_core_v3.Address_Pipe{
Pipe: &envoy_core_v3.Pipe{
Path: path,
Mode: mode,
},
},
}
}
func makeAddress(ip string, port int) *envoy_core_v3.Address {
return &envoy_core_v3.Address{
Address: &envoy_core_v3.Address_SocketAddress{

View File

@ -15,6 +15,10 @@ import (
//
// The syntax of the value is "name:addr" where addr can be "port" or
// "host:port". Examples: "db:8181", "db:127.0.0.10:8282", etc.
//
// This would need to be extended for Unix Domain Sockets; how does this get handled
// addr: path & path:mode is ambigous; maybe path alone could be sorted by checking for a numeric port
// but iffy
type FlagUpstreams map[string]proxy.UpstreamConfig
func (f *FlagUpstreams) String() string {

View File

@ -315,9 +315,16 @@ func (c *cmd) configWatcher(client *api.Client) (proxyImpl.ConfigWatcher, error)
for _, k := range upstreamKeys {
config := c.upstreams[k]
addr := config.LocalBindSocketPath
if addr == "" {
addr = fmt.Sprintf(
"%s:%d",
config.LocalBindAddress, config.LocalBindPort)
}
c.UI.Info(fmt.Sprintf(
" Upstream: %s => %s:%d",
k, config.LocalBindAddress, config.LocalBindPort))
" Upstream: %s => %s",
k, addr))
upstreams = append(upstreams, config)
}

View File

@ -98,6 +98,7 @@ func (uc *UpstreamConfig) ConnectTimeout() time.Duration {
return 10000 * time.Millisecond
}
// markan START TOMORROW HERE; discover where this is applied
// applyDefaults sets zero-valued params to a sane default.
func (uc *UpstreamConfig) applyDefaults() {
if uc.DestinationType == "" {
@ -106,7 +107,7 @@ func (uc *UpstreamConfig) applyDefaults() {
if uc.DestinationNamespace == "" {
uc.DestinationNamespace = "default"
}
if uc.LocalBindAddress == "" {
if uc.LocalBindAddress == "" && uc.LocalBindSocketPath == "" {
uc.LocalBindAddress = "127.0.0.1"
}
}
@ -114,7 +115,14 @@ func (uc *UpstreamConfig) applyDefaults() {
// String returns a string that uniquely identifies the Upstream. Used for
// identifying the upstream in log output and map keys.
func (uc *UpstreamConfig) String() string {
return fmt.Sprintf("%s:%d->%s:%s/%s", uc.LocalBindAddress, uc.LocalBindPort,
// TODO markan upfactor
addr := uc.LocalBindSocketPath
if addr == "" {
addr = fmt.Sprintf(
"%s:%d",
uc.LocalBindAddress, uc.LocalBindPort)
}
return fmt.Sprintf("%s->%s:%s/%s", addr,
uc.DestinationType, uc.DestinationNamespace, uc.DestinationName)
}

View File

@ -103,8 +103,8 @@ func (p *Proxy) Serve() error {
for _, uc := range newCfg.Upstreams {
uc.applyDefaults()
if uc.LocalBindPort < 1 {
p.logger.Error("upstream has no local_bind_port. "+
if uc.LocalBindPort < 1 || uc.LocalBindSocketPath == "" {
p.logger.Error("upstream has no local_bind_port or local_bind_socket_path. "+
"Can't start upstream.", "upstream", uc.String())
continue
}