consul/connect/proxy/listener.go

296 lines
8.4 KiB
Go
Raw Permalink Normal View History

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 13:12:13 +00:00
// SPDX-License-Identifier: BUSL-1.1
package proxy
import (
"context"
"crypto/tls"
"errors"
"net"
"sync"
"sync/atomic"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/connect"
"github.com/hashicorp/consul/ipaddr"
)
const (
publicListenerPrefix = "inbound"
upstreamListenerPrefix = "upstream"
)
// Listener is the implementation of a specific proxy listener. It has pluggable
// Listen and Dial methods to suit public mTLS vs upstream semantics. It handles
// the lifecycle of the listener and all connections opened through it
type Listener struct {
// Service is the connect service instance to use.
Service *connect.Service
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
// listenFunc, dialFunc, and bindAddr are set by type-specific constructors.
listenFunc func() (net.Listener, error)
dialFunc func() (net.Conn, error)
bindAddr string
stopFlag int32
stopChan chan struct{}
// listeningChan is closed when listener is opened successfully. It's really
// only for use in tests where we need to coordinate wait for the Serve
// goroutine to be running before we proceed trying to connect. On my laptop
// this always works out anyway but on constrained VMs and especially docker
// containers (e.g. in CI) we often see the Dial routine win the race and get
// `connection refused`. Retry loops and sleeps are unpleasant workarounds and
// this is cheap and correct.
listeningChan chan struct{}
// listenerLock guards access to the listener field
listenerLock sync.Mutex
listener net.Listener
logger hclog.Logger
// Gauge to track current open connections
activeConns int32
connWG sync.WaitGroup
metricPrefix string
metricLabels []metrics.Label
}
// NewPublicListener returns a Listener setup to listen for public mTLS
// connections and proxy them to the configured local application over TCP.
func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig,
logger hclog.Logger) *Listener {
bindAddr := ipaddr.FormatAddressPort(cfg.BindAddress, cfg.BindPort)
return &Listener{
Service: svc,
listenFunc: func() (net.Listener, error) {
return tls.Listen("tcp", bindAddr, svc.ServerTLSConfig())
},
dialFunc: func() (net.Conn, error) {
return net.DialTimeout("tcp", cfg.LocalServiceAddress,
time.Duration(cfg.LocalConnectTimeoutMs)*time.Millisecond)
},
bindAddr: bindAddr,
stopChan: make(chan struct{}),
listeningChan: make(chan struct{}),
logger: logger.Named(publicListenerPrefix),
metricPrefix: publicListenerPrefix,
// For now we only label ourselves as source - we could fetch the src
// service from cert on each connection and label metrics differently but it
// significaly complicates the active connection tracking here and it's not
// clear that it's very valuable - on aggregate looking at all _outbound_
// connections across all proxies gets you a full picture of src->dst
// traffic. We might expand this later for better debugging of which clients
// are abusing a particular service instance but we'll see how valuable that
// seems for the extra complication of tracking many gauges here.
metricLabels: []metrics.Label{{Name: "dst", Value: svc.Name()}},
}
}
// NewUpstreamListener returns a Listener setup to listen locally for TCP
// connections that are proxied to a discovered Connect service instance.
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
func NewUpstreamListener(svc *connect.Service, client *api.Client,
cfg UpstreamConfig, logger hclog.Logger) *Listener {
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
return newUpstreamListenerWithResolver(svc, cfg,
UpstreamResolverFuncFromClient(client), logger)
}
func newUpstreamListenerWithResolver(svc *connect.Service, cfg UpstreamConfig,
resolverFunc func(UpstreamConfig) (connect.Resolver, error),
logger hclog.Logger) *Listener {
bindAddr := ipaddr.FormatAddressPort(cfg.LocalBindAddress, cfg.LocalBindPort)
return &Listener{
Service: svc,
listenFunc: func() (net.Listener, error) {
return net.Listen("tcp", bindAddr)
},
dialFunc: func() (net.Conn, error) {
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
rf, err := resolverFunc(cfg)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(),
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
cfg.ConnectTimeout())
defer cancel()
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
return svc.Dial(ctx, rf)
},
bindAddr: bindAddr,
stopChan: make(chan struct{}),
listeningChan: make(chan struct{}),
logger: logger.Named(upstreamListenerPrefix),
metricPrefix: upstreamListenerPrefix,
metricLabels: []metrics.Label{
{Name: "src", Value: svc.Name()},
// TODO(banks): namespace support
Add Proxy Upstreams to Service Definition (#4639) * Refactor Service Definition ProxyDestination. This includes: - Refactoring all internal structs used - Updated tests for both deprecated and new input for: - Agent Services endpoint response - Agent Service endpoint response - Agent Register endpoint - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Register - Unmanaged deprecated field - Unmanaged new fields - Managed deprecated upstreams - Managed new - Catalog Services endpoint response - Catalog Node endpoint response - Catalog Service endpoint response - Updated API tests for all of the above too (both deprecated and new forms of register) TODO: - config package changes for on-disk service definitions - proxy config endpoint - built-in proxy support for new fields * Agent proxy config endpoint updated with upstreams * Config file changes for upstreams. * Add upstream opaque config and update all tests to ensure it works everywhere. * Built in proxy working with new Upstreams config * Command fixes and deprecations * Fix key translation, upstream type defaults and a spate of other subtele bugs found with ned to end test scripts... TODO: tests still failing on one case that needs a fix. I think it's key translation for upstreams nested in Managed proxy struct. * Fix translated keys in API registration. ≈ * Fixes from docs - omit some empty undocumented fields in API - Bring back ServiceProxyDestination in Catalog responses to not break backwards compat - this was removed assuming it was only used internally. * Documentation updates for Upstreams in service definition * Fixes for tests broken by many refactors. * Enable travis on f-connect branch in this branch too. * Add consistent Deprecation comments to ProxyDestination uses * Update version number on deprecation notices, and correct upstream datacenter field with explanation in docs
2018-09-12 16:07:47 +00:00
{Name: "dst_type", Value: string(cfg.DestinationType)},
{Name: "dst", Value: cfg.DestinationName},
},
}
}
// Serve runs the listener until it is stopped. It is an error to call Serve
// more than once for any given Listener instance.
func (l *Listener) Serve() error {
// Ensure we mark state closed if we fail before Close is called externally.
defer l.Close()
if atomic.LoadInt32(&l.stopFlag) != 0 {
return errors.New("serve called on a closed listener")
}
listener, err := l.listenFunc()
if err != nil {
return err
}
l.setListener(listener)
close(l.listeningChan)
for {
conn, err := listener.Accept()
if err != nil {
if atomic.LoadInt32(&l.stopFlag) == 1 {
return nil
}
return err
}
l.connWG.Add(1)
go l.handleConn(conn)
}
}
// handleConn is the internal connection handler goroutine.
func (l *Listener) handleConn(src net.Conn) {
defer func() {
// Make sure Listener.Close waits for this conn to be cleaned up.
src.Close()
l.connWG.Done()
}()
dst, err := l.dialFunc()
if err != nil {
l.logger.Error("failed to dial", "error", err)
return
}
// Track active conn now (first function call) and defer un-counting it when
// it closes.
defer l.trackConn()()
// Note no need to defer dst.Close() since conn handles that for us.
conn := NewConn(src, dst)
defer conn.Close()
connStop := make(chan struct{})
// Run another goroutine to copy the bytes.
go func() {
err = conn.CopyBytes()
if err != nil {
l.logger.Error("connection failed", "error", err)
}
close(connStop)
}()
// Periodically copy stats from conn to metrics (to keep metrics calls out of
// the path of every single packet copy). 5 seconds is probably good enough
// resolution - statsd and most others tend to summarize with lower resolution
// anyway and this amortizes the cost more.
var tx, rx uint64
statsT := time.NewTicker(5 * time.Second)
defer statsT.Stop()
reportStats := func() {
newTx, newRx := conn.Stats()
if delta := newTx - tx; delta > 0 {
metrics.IncrCounterWithLabels([]string{l.metricPrefix, "tx_bytes"},
float32(newTx-tx), l.metricLabels)
}
if delta := newRx - rx; delta > 0 {
metrics.IncrCounterWithLabels([]string{l.metricPrefix, "rx_bytes"},
float32(newRx-rx), l.metricLabels)
}
tx, rx = newTx, newRx
}
// Always report final stats for the conn.
defer reportStats()
// Wait for conn to close
for {
select {
case <-connStop:
return
case <-l.stopChan:
return
case <-statsT.C:
reportStats()
}
}
}
// trackConn increments the count of active conns and returns a func() that can
// be deferred on to decrement the counter again on connection close.
func (l *Listener) trackConn() func() {
c := atomic.AddInt32(&l.activeConns, 1)
metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c),
l.metricLabels)
return func() {
c := atomic.AddInt32(&l.activeConns, -1)
metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c),
l.metricLabels)
}
}
// Close terminates the listener and all active connections.
func (l *Listener) Close() error {
// Prevent the listener from being started.
oldFlag := atomic.SwapInt32(&l.stopFlag, 1)
if oldFlag != 0 {
return nil
}
// Stop the current listener and stop accepting new requests.
if listener := l.getListener(); listener != nil {
listener.Close()
}
// Stop outstanding requests.
close(l.stopChan)
// Wait for all conns to close
l.connWG.Wait()
return nil
}
// Wait for the listener to be ready to accept connections.
func (l *Listener) Wait() {
<-l.listeningChan
}
// BindAddr returns the address the listen is bound to.
func (l *Listener) BindAddr() string {
return l.bindAddr
}
func (l *Listener) setListener(listener net.Listener) {
l.listenerLock.Lock()
l.listener = listener
l.listenerLock.Unlock()
}
func (l *Listener) getListener() net.Listener {
l.listenerLock.Lock()
defer l.listenerLock.Unlock()
return l.listener
}