Added connect proxy config and local agent state setup on boot.

This commit is contained in:
Paul Banks 2018-04-16 16:00:20 +01:00 committed by Mitchell Hashimoto
parent 88541bba17
commit e6071051cf
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
10 changed files with 911 additions and 10 deletions

View File

@ -246,6 +246,8 @@ func LocalConfig(cfg *config.RuntimeConfig) local.Config {
NodeID: cfg.NodeID,
NodeName: cfg.NodeName,
TaggedAddresses: map[string]string{},
ProxyBindMinPort: cfg.ConnectProxyBindMinPort,
ProxyBindMaxPort: cfg.ConnectProxyBindMaxPort,
}
for k, v := range cfg.TaggedAddresses {
lc.TaggedAddresses[k] = v
@ -328,6 +330,9 @@ func (a *Agent) Start() error {
if err := a.loadServices(c); err != nil {
return err
}
if err := a.loadProxies(c); err != nil {
return err
}
if err := a.loadChecks(c); err != nil {
return err
}
@ -1973,6 +1978,58 @@ func (a *Agent) RemoveCheck(checkID types.CheckID, persist bool) error {
return nil
}
// AddProxy adds a new local Connect Proxy instance to be managed by the agent.
//
// It REQUIRES that the service that is being proxied is already present in the
// local state. Note that this is only used for agent-managed proxies so we can
// ensure that we always make this true. For externally managed and registered
// proxies we explicitly allow the proxy to be registered first to make
// bootstrap ordering of a new service simpler but the same is not true here
// since this is only ever called when setting up a _managed_ proxy which was
// registered as part of a service registration either from config or HTTP API
// call.
func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error {
// Lookup the target service token in state if there is one.
token := a.State.ServiceToken(proxy.TargetServiceID)
// Add the proxy to local state first since we may need to assign a port which
// needs to be coordinate under state lock. AddProxy will generate the
// NodeService for the proxy populated with the allocated (or configured) port
// and an ID, but it doesn't add it to the agent directly since that could
// deadlock and we may need to coordinate adding it and persisting etc.
proxyService, err := a.State.AddProxy(proxy, token)
if err != nil {
return err
}
// TODO(banks): register proxy health checks.
err = a.AddService(proxyService, nil, persist, token)
if err != nil {
// Remove the state too
a.State.RemoveProxy(proxyService.ID)
return err
}
// TODO(banks): persist some of the local proxy state (not the _proxy_ token).
return nil
}
// RemoveProxy stops and removes a local proxy instance.
func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
// Validate proxyID
if proxyID == "" {
return fmt.Errorf("proxyID missing")
}
if err := a.State.RemoveProxy(proxyID); err != nil {
return err
}
// TODO(banks): unpersist proxy
return nil
}
func (a *Agent) cancelCheckMonitors(checkID types.CheckID) {
// Stop any monitors
delete(a.checkReapAfter, checkID)
@ -2366,6 +2423,25 @@ func (a *Agent) unloadChecks() error {
return nil
}
// loadProxies will load connect proxy definitions from configuration and
// persisted definitions on disk, and load them into the local agent.
func (a *Agent) loadProxies(conf *config.RuntimeConfig) error {
for _, proxy := range conf.ConnectProxies {
if err := a.AddProxy(proxy, false); err != nil {
return fmt.Errorf("failed adding proxy: %s", err)
}
}
// TODO(banks): persist proxy state and re-load it here?
return nil
}
// unloadProxies will deregister all proxies known to the local agent.
func (a *Agent) unloadProxies() error {
// TODO(banks): implement me
return nil
}
// snapshotCheckState is used to snapshot the current state of the health
// checks. This is done before we reload our checks, so that we can properly
// restore into the same state.
@ -2514,6 +2590,9 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
if err := a.loadServices(newCfg); err != nil {
return fmt.Errorf("Failed reloading services: %s", err)
}
if err := a.loadProxies(newCfg); err != nil {
return fmt.Errorf("Failed reloading proxies: %s", err)
}
if err := a.loadChecks(newCfg); err != nil {
return fmt.Errorf("Failed reloading checks: %s", err)
}

View File

@ -15,6 +15,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
@ -2235,3 +2237,103 @@ func TestAgent_reloadWatchesHTTPS(t *testing.T) {
t.Fatalf("bad: %s", err)
}
}
func TestAgent_AddProxy(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), `
node_name = "node1"
`)
defer a.Shutdown()
// Register a target service we can use
reg := &structs.NodeService{
Service: "web",
Port: 8080,
}
require.NoError(t, a.AddService(reg, nil, false, ""))
tests := []struct {
desc string
proxy *structs.ConnectManagedProxy
wantErr bool
}{
{
desc: "basic proxy adding, unregistered service",
proxy: &structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: "consul connect proxy",
Config: map[string]interface{}{
"foo": "bar",
},
TargetServiceID: "db", // non-existent service.
},
// Target service must be registered.
wantErr: true,
},
{
desc: "basic proxy adding, unregistered service",
proxy: &structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: "consul connect proxy",
Config: map[string]interface{}{
"foo": "bar",
},
TargetServiceID: "web",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
require := require.New(t)
err := a.AddProxy(tt.proxy, false)
if tt.wantErr {
require.Error(err)
return
}
require.NoError(err)
// Test the ID was created as we expect.
got := a.State.Proxy("web-proxy")
require.Equal(tt.proxy, got)
})
}
}
func TestAgent_RemoveProxy(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), `
node_name = "node1"
`)
defer a.Shutdown()
require := require.New(t)
// Register a target service we can use
reg := &structs.NodeService{
Service: "web",
Port: 8080,
}
require.NoError(a.AddService(reg, nil, false, ""))
// Add a proxy for web
pReg := &structs.ConnectManagedProxy{
TargetServiceID: "web",
}
require.NoError(a.AddProxy(pReg, false))
// Test the ID was created as we expect.
gotProxy := a.State.Proxy("web-proxy")
require.Equal(pReg, gotProxy)
err := a.RemoveProxy("web-proxy", false)
require.NoError(err)
gotProxy = a.State.Proxy("web-proxy")
require.Nil(gotProxy)
// Removing invalid proxy should be an error
err = a.RemoveProxy("foobar", false)
require.Error(err)
}

View File

@ -322,8 +322,15 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
}
var services []*structs.ServiceDefinition
var proxies []*structs.ConnectManagedProxy
for _, service := range c.Services {
services = append(services, b.serviceVal(&service))
// Register any connect proxies requested
if proxy := b.connectManagedProxyVal(&service); proxy != nil {
proxies = append(proxies, proxy)
}
// TODO(banks): support connect-native registrations (v.Connect.Enabled ==
// true)
}
if c.Service != nil {
services = append(services, b.serviceVal(c.Service))
@ -520,6 +527,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
consulRaftHeartbeatTimeout := b.durationVal("consul.raft.heartbeat_timeout", c.Consul.Raft.HeartbeatTimeout) * time.Duration(performanceRaftMultiplier)
consulRaftLeaderLeaseTimeout := b.durationVal("consul.raft.leader_lease_timeout", c.Consul.Raft.LeaderLeaseTimeout) * time.Duration(performanceRaftMultiplier)
// Connect proxy defaults.
proxyBindMinPort, proxyBindMaxPort := b.connectProxyPortRange(c.Connect)
// ----------------------------------------------------------------
// build runtime config
//
@ -638,6 +648,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval),
Checks: checks,
ClientAddrs: clientAddrs,
ConnectProxies: proxies,
ConnectProxyBindMinPort: proxyBindMinPort,
ConnectProxyBindMaxPort: proxyBindMaxPort,
DataDir: b.stringVal(c.DataDir),
Datacenter: strings.ToLower(b.stringVal(c.Datacenter)),
DevMode: b.boolVal(b.Flags.DevMode),
@ -1010,6 +1023,75 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition {
}
}
func (b *Builder) connectManagedProxyVal(v *ServiceDefinition) *structs.ConnectManagedProxy {
if v.Connect == nil || v.Connect.Proxy == nil {
return nil
}
p := v.Connect.Proxy
targetID := b.stringVal(v.ID)
if targetID == "" {
targetID = b.stringVal(v.Name)
}
execMode := structs.ProxyExecModeDaemon
if p.ExecMode != nil {
switch *p.ExecMode {
case "daemon":
execMode = structs.ProxyExecModeDaemon
case "script":
execMode = structs.ProxyExecModeScript
default:
b.err = multierror.Append(fmt.Errorf(
"service[%s]: invalid connect proxy exec_mode: %s", targetID,
*p.ExecMode))
return nil
}
}
return &structs.ConnectManagedProxy{
ExecMode: execMode,
Command: b.stringVal(p.Command),
Config: p.Config,
// ProxyService will be setup when the agent registers the configured
// proxies and starts them etc. We could do it here but we may need to do
// things like probe the OS for a free port etc. And we have enough info to
// resolve all this later.
ProxyService: nil,
TargetServiceID: targetID,
}
}
func (b *Builder) connectProxyPortRange(v *Connect) (int, int) {
// Choose this default range just because. There are zero "safe" ranges that
// don't have something somewhere that uses them which is why this is
// configurable. We rely on the host not having any of these ports for non
// agent managed proxies. I went with 20k because I know of at least one
// super-common server memcached that defaults to the 10k range.
start := 20000
end := 20256 // 256 proxies on a host is enough for anyone ;)
if v == nil || v.ProxyDefaults == nil {
return start, end
}
min, max := v.ProxyDefaults.BindMinPort, v.ProxyDefaults.BindMaxPort
if min == nil && max == nil {
return start, end
}
// If either was set show a warning if the overall range was invalid
if min == nil || max == nil || *max < *min {
b.warn("Connect proxy_defaults bind_min_port and bind_max_port must both "+
"be set with max >= min. To disable automatic port allocation set both "+
"to 0. Using default range %d..%d.", start, end)
return start, end
}
return *min, *max
}
func (b *Builder) boolVal(v *bool) bool {
if v == nil {
return false

View File

@ -159,6 +159,7 @@ type Config struct {
CheckUpdateInterval *string `json:"check_update_interval,omitempty" hcl:"check_update_interval" mapstructure:"check_update_interval"`
Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
ClientAddr *string `json:"client_addr,omitempty" hcl:"client_addr" mapstructure:"client_addr"`
Connect *Connect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"`
DNS DNS `json:"dns_config,omitempty" hcl:"dns_config" mapstructure:"dns_config"`
DNSDomain *string `json:"domain,omitempty" hcl:"domain" mapstructure:"domain"`
DNSRecursors []string `json:"recursors,omitempty" hcl:"recursors" mapstructure:"recursors"`
@ -324,6 +325,7 @@ type ServiceDefinition struct {
Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
Token *string `json:"token,omitempty" hcl:"token" mapstructure:"token"`
EnableTagOverride *bool `json:"enable_tag_override,omitempty" hcl:"enable_tag_override" mapstructure:"enable_tag_override"`
Connect *ServiceConnect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"`
}
type CheckDefinition struct {
@ -349,6 +351,47 @@ type CheckDefinition struct {
DeregisterCriticalServiceAfter *string `json:"deregister_critical_service_after,omitempty" hcl:"deregister_critical_service_after" mapstructure:"deregister_critical_service_after"`
}
// ServiceConnect is the connect block within a service registration
type ServiceConnect struct {
// TODO(banks) add way to specify that the app is connect-native
// Proxy configures a connect proxy instance for the service
Proxy *ServiceConnectProxy `json:"proxy,omitempty" hcl:"proxy" mapstructure:"proxy"`
}
type ServiceConnectProxy struct {
Command *string `json:"command,omitempty" hcl:"command" mapstructure:"command"`
ExecMode *string `json:"exec_mode,omitempty" hcl:"exec_mode" mapstructure:"exec_mode"`
Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"`
}
// Connect is the agent-global connect configuration.
type Connect struct {
// Enabled opts the agent into connect. It should be set on all clients and
// servers in a cluster for correct connect operation. TODO(banks) review that.
Enabled bool `json:"enabled,omitempty" hcl:"enabled" mapstructure:"enabled"`
ProxyDefaults *ConnectProxyDefaults `json:"proxy_defaults,omitempty" hcl:"proxy_defaults" mapstructure:"proxy_defaults"`
}
// ConnectProxyDefaults is the agent-global connect proxy configuration.
type ConnectProxyDefaults struct {
// BindMinPort, BindMaxPort are the inclusive lower and upper bounds on the
// port range allocated to the agent to assign to connect proxies that have no
// bind_port specified.
BindMinPort *int `json:"bind_min_port,omitempty" hcl:"bind_min_port" mapstructure:"bind_min_port"`
BindMaxPort *int `json:"bind_max_port,omitempty" hcl:"bind_max_port" mapstructure:"bind_max_port"`
// ExecMode is used where a registration doesn't include an exec_mode.
// Defaults to daemon.
ExecMode *string `json:"exec_mode,omitempty" hcl:"exec_mode" mapstructure:"exec_mode"`
// DaemonCommand is used to start proxy in exec_mode = daemon if not specified
// at registration time.
DaemonCommand *string `json:"daemon_command,omitempty" hcl:"daemon_command" mapstructure:"daemon_command"`
// ScriptCommand is used to start proxy in exec_mode = script if not specified
// at registration time.
ScriptCommand *string `json:"script_command,omitempty" hcl:"script_command" mapstructure:"script_command"`
// Config is merged into an Config specified at registration time.
Config map[string]interface{} `json:"config,omitempty" hcl:"config" mapstructure:"config"`
}
type DNS struct {
AllowStale *bool `json:"allow_stale,omitempty" hcl:"allow_stale" mapstructure:"allow_stale"`
ARecordLimit *int `json:"a_record_limit,omitempty" hcl:"a_record_limit" mapstructure:"a_record_limit"`

View File

@ -616,6 +616,41 @@ type RuntimeConfig struct {
// flag: -client string
ClientAddrs []*net.IPAddr
// ConnectEnabled opts the agent into connect. It should be set on all clients
// and servers in a cluster for correct connect operation. TODO(banks) review
// that.
ConnectEnabled bool
// ConnectProxies is a list of configured proxies taken from the "connect"
// block of service registrations.
ConnectProxies []*structs.ConnectManagedProxy
// ConnectProxyBindMinPort is the inclusive start of the range of ports
// allocated to the agent for starting proxy listeners on where no explicit
// port is specified.
ConnectProxyBindMinPort int
// ConnectProxyBindMaxPort is the inclusive end of the range of ports
// allocated to the agent for starting proxy listeners on where no explicit
// port is specified.
ConnectProxyBindMaxPort int
// ConnectProxyDefaultExecMode is used where a registration doesn't include an
// exec_mode. Defaults to daemon.
ConnectProxyDefaultExecMode *string
// ConnectProxyDefaultDaemonCommand is used to start proxy in exec_mode =
// daemon if not specified at registration time.
ConnectProxyDefaultDaemonCommand *string
// ConnectProxyDefaultScriptCommand is used to start proxy in exec_mode =
// script if not specified at registration time.
ConnectProxyDefaultScriptCommand *string
// ConnectProxyDefaultConfig is merged with any config specified at
// registration time to allow global control of defaults.
ConnectProxyDefaultConfig map[string]interface{}
// DNSAddrs contains the list of TCP and UDP addresses the DNS server will
// bind to. If the DNS endpoint is disabled (ports.dns <= 0) the list is
// empty.

View File

@ -2353,6 +2353,21 @@ func TestFullConfig(t *testing.T) {
],
"check_update_interval": "16507s",
"client_addr": "93.83.18.19",
"connect": {
"enabled": true,
"proxy_defaults": {
"bind_min_port": 2000,
"bind_max_port": 3000,
"exec_mode": "script",
"daemon_command": "consul connect proxy",
"script_command": "proxyctl.sh",
"config": {
"foo": "bar",
"connect_timeout_ms": 1000,
"pedantic_mode": true
}
}
},
"data_dir": "` + dataDir + `",
"datacenter": "rzo029wg",
"disable_anonymous_signature": true,
@ -2613,7 +2628,16 @@ func TestFullConfig(t *testing.T) {
"ttl": "11222s",
"deregister_critical_service_after": "68482s"
}
]
],
"connect": {
"proxy": {
"exec_mode": "daemon",
"command": "awesome-proxy",
"config": {
"foo": "qux"
}
}
}
}
],
"session_ttl_min": "26627s",
@ -2786,6 +2810,21 @@ func TestFullConfig(t *testing.T) {
]
check_update_interval = "16507s"
client_addr = "93.83.18.19"
connect {
enabled = true
proxy_defaults {
bind_min_port = 2000
bind_max_port = 3000
exec_mode = "script"
daemon_command = "consul connect proxy"
script_command = "proxyctl.sh"
config = {
foo = "bar"
connect_timeout_ms = 1000
pedantic_mode = true
}
}
}
data_dir = "` + dataDir + `"
datacenter = "rzo029wg"
disable_anonymous_signature = true
@ -3047,6 +3086,15 @@ func TestFullConfig(t *testing.T) {
deregister_critical_service_after = "68482s"
}
]
connect {
proxy {
exec_mode = "daemon"
command = "awesome-proxy"
config = {
foo = "qux"
}
}
}
}
]
session_ttl_min = "26627s"
@ -3355,8 +3403,23 @@ func TestFullConfig(t *testing.T) {
DeregisterCriticalServiceAfter: 13209 * time.Second,
},
},
CheckUpdateInterval: 16507 * time.Second,
ClientAddrs: []*net.IPAddr{ipAddr("93.83.18.19")},
CheckUpdateInterval: 16507 * time.Second,
ClientAddrs: []*net.IPAddr{ipAddr("93.83.18.19")},
ConnectProxies: []*structs.ConnectManagedProxy{
{
ExecMode: structs.ProxyExecModeDaemon,
Command: "awesome-proxy",
Config: map[string]interface{}{
"foo": "qux", // Overriden by service
// Note globals are not merged here but on rendering to the proxy
// endpoint. That's because proxies can be added later too so merging
// at config time is redundant if we have to do it later anyway.
},
TargetServiceID: "MRHVMZuD",
},
},
ConnectProxyBindMinPort: 2000,
ConnectProxyBindMaxPort: 3000,
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
DNSARecordLimit: 29907,
DNSAllowStale: true,
@ -4018,6 +4081,14 @@ func TestSanitize(t *testing.T) {
}
],
"ClientAddrs": [],
"ConnectEnabled": false,
"ConnectProxies": [],
"ConnectProxyBindMaxPort": 0,
"ConnectProxyBindMinPort": 0,
"ConnectProxyDefaultConfig": {},
"ConnectProxyDefaultDaemonCommand": null,
"ConnectProxyDefaultExecMode": null,
"ConnectProxyDefaultScriptCommand": null,
"ConsulCoordinateUpdateBatchSize": 0,
"ConsulCoordinateUpdateMaxBatches": 0,
"ConsulCoordinateUpdatePeriod": "15s",
@ -4150,9 +4221,11 @@ func TestSanitize(t *testing.T) {
"Checks": [],
"EnableTagOverride": false,
"ID": "",
"Kind": "",
"Meta": {},
"Name": "foo",
"Port": 0,
"ProxyDestination": "",
"Tags": [],
"Token": "hidden"
}

View File

@ -3,6 +3,7 @@ package local
import (
"fmt"
"log"
"math/rand"
"reflect"
"strconv"
"strings"
@ -10,6 +11,8 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
@ -27,6 +30,8 @@ type Config struct {
NodeID types.NodeID
NodeName string
TaggedAddresses map[string]string
ProxyBindMinPort int
ProxyBindMaxPort int
}
// ServiceState describes the state of a service record.
@ -107,6 +112,21 @@ type rpc interface {
RPC(method string, args interface{}, reply interface{}) error
}
// ManagedProxy represents the local state for a registered proxy instance.
type ManagedProxy struct {
Proxy *structs.ConnectManagedProxy
// ProxyToken is a special local-only security token that grants the bearer
// access to the proxy's config as well as allowing it to request certificates
// on behalf of the TargetService. Certain connect endpoints will validate
// against this token and if it matches will then use the TargetService.Token
// to actually authenticate the upstream RPC on behalf of the service. This
// token is passed securely to the proxy process via ENV vars and should never
// be exposed any other way. Unmanaged proxies will never see this and need to
// use service-scoped ACL tokens distributed externally.
ProxyToken string
}
// State is used to represent the node's services,
// and checks. We use it to perform anti-entropy with the
// catalog representation
@ -150,17 +170,28 @@ type State struct {
// tokens contains the ACL tokens
tokens *token.Store
// managedProxies is a map of all manged connect proxies registered locally on
// this agent. This is NOT kept in sync with servers since it's agent-local
// config only. Proxy instances have separate service registrations in the
// services map above which are kept in sync via anti-entropy. Un-managed
// proxies (that registered themselves separately from the service
// registration) do not appear here as the agent doesn't need to manage their
// process nor config. The _do_ still exist in services above though as
// services with Kind == connect-proxy.
managedProxies map[string]*ManagedProxy
}
// NewLocalState creates a new local state for the agent.
// NewState creates a new local state for the agent.
func NewState(c Config, lg *log.Logger, tokens *token.Store) *State {
l := &State{
config: c,
logger: lg,
services: make(map[string]*ServiceState),
checks: make(map[types.CheckID]*CheckState),
metadata: make(map[string]string),
tokens: tokens,
config: c,
logger: lg,
services: make(map[string]*ServiceState),
checks: make(map[types.CheckID]*CheckState),
metadata: make(map[string]string),
tokens: tokens,
managedProxies: make(map[string]*ManagedProxy),
}
l.SetDiscardCheckOutput(c.DiscardCheckOutput)
return l
@ -529,6 +560,142 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
return m
}
// AddProxy is used to add a connect proxy entry to the local state. This
// assumes the proxy's NodeService is already registered via Agent.AddService
// (since that has to do other book keeping). The token passed here is the ACL
// token the service used to register itself so must have write on service
// record.
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*structs.NodeService, error) {
if proxy == nil {
return nil, fmt.Errorf("no proxy")
}
// Lookup the local service
target := l.Service(proxy.TargetServiceID)
if target == nil {
return nil, fmt.Errorf("target service ID %s not registered",
proxy.TargetServiceID)
}
// Get bind info from config
cfg, err := proxy.ParseConfig()
if err != nil {
return nil, err
}
// Construct almost all of the NodeService that needs to be registered by the
// caller outside of the lock.
svc := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: target.ID + "-proxy",
Service: target.ID + "-proxy",
ProxyDestination: target.Service,
Address: cfg.BindAddress,
Port: cfg.BindPort,
}
pToken, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
// Lock now. We can't lock earlier as l.Service would deadlock and shouldn't
// anyway to minimise the critical section.
l.Lock()
defer l.Unlock()
// Allocate port if needed (min and max inclusive)
rangeLen := l.config.ProxyBindMaxPort - l.config.ProxyBindMinPort + 1
if svc.Port < 1 && l.config.ProxyBindMinPort > 0 && rangeLen > 0 {
// This should be a really short list so don't bother optimising lookup yet.
OUTER:
for _, offset := range rand.Perm(rangeLen) {
p := l.config.ProxyBindMinPort + offset
// See if this port was already allocated to another proxy
for _, other := range l.managedProxies {
if other.Proxy.ProxyService.Port == p {
// allready taken, skip to next random pick in the range
continue OUTER
}
}
// We made it through all existing proxies without a match so claim this one
svc.Port = p
break
}
}
// If no ports left (or auto ports disabled) fail
if svc.Port < 1 {
return nil, fmt.Errorf("no port provided for proxy bind_port and none "+
" left in the allocated range [%d, %d]", l.config.ProxyBindMinPort,
l.config.ProxyBindMaxPort)
}
proxy.ProxyService = svc
// All set, add the proxy and return the service
l.managedProxies[svc.ID] = &ManagedProxy{
Proxy: proxy,
ProxyToken: pToken,
}
// No need to trigger sync as proxy state is local only.
return svc, nil
}
// RemoveProxy is used to remove a proxy entry from the local state.
func (l *State) RemoveProxy(id string) error {
l.Lock()
defer l.Unlock()
p := l.managedProxies[id]
if p == nil {
return fmt.Errorf("Proxy %s does not exist", id)
}
delete(l.managedProxies, id)
// No need to trigger sync as proxy state is local only.
return nil
}
// Proxy returns the local proxy state.
func (l *State) Proxy(id string) *structs.ConnectManagedProxy {
l.RLock()
defer l.RUnlock()
p := l.managedProxies[id]
if p == nil {
return nil
}
return p.Proxy
}
// Proxies returns the locally registered proxies.
func (l *State) Proxies() map[string]*structs.ConnectManagedProxy {
l.RLock()
defer l.RUnlock()
m := make(map[string]*structs.ConnectManagedProxy)
for id, p := range l.managedProxies {
m[id] = p.Proxy
}
return m
}
// ProxyToken returns the local proxy token for a given proxy. Note this is not
// an ACL token so it won't fallback to using the agent-configured default ACL
// token. If the proxy doesn't exist an error is returned, otherwise the token
// is guaranteed to exist.
func (l *State) ProxyToken(id string) (string, error) {
l.RLock()
defer l.RUnlock()
p := l.managedProxies[id]
if p == nil {
return "", fmt.Errorf("proxy %s not registered", id)
}
return p.ProxyToken, nil
}
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func (l *State) Metadata() map[string]string {

View File

@ -3,10 +3,14 @@ package local_test
import (
"errors"
"fmt"
"log"
"os"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/local"
@ -1664,3 +1668,128 @@ func checksInSync(state *local.State, wantChecks int) error {
}
return nil
}
func TestStateProxyManagement(t *testing.T) {
t.Parallel()
state := local.NewState(local.Config{
ProxyPortRangeStart: 20000,
ProxyPortRangeEnd: 20002,
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
// Stub state syncing
state.TriggerSyncChanges = func() {}
p1 := structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: "consul connect proxy",
TargetServiceID: "web",
}
require := require.New(t)
assert := assert.New(t)
_, err := state.AddProxy(&p1, "fake-token")
require.Error(err, "should fail as the target service isn't registered")
// Sanity check done, lets add a couple of target services to the state
err = state.AddService(&structs.NodeService{
Service: "web",
}, "fake-token-web")
require.NoError(err)
err = state.AddService(&structs.NodeService{
Service: "cache",
}, "fake-token-cache")
require.NoError(err)
require.NoError(err)
err = state.AddService(&structs.NodeService{
Service: "db",
}, "fake-token-db")
require.NoError(err)
// Should work now
svc, err := state.AddProxy(&p1, "fake-token")
require.NoError(err)
assert.Equal("web-proxy", svc.ID)
assert.Equal("web-proxy", svc.Service)
assert.Equal(structs.ServiceKindConnectProxy, svc.Kind)
assert.Equal("web", svc.ProxyDestination)
assert.Equal("", svc.Address, "should have empty address by default")
// Port is non-deterministic but could be either of 20000 or 20001
assert.Contains([]int{20000, 20001}, svc.Port)
// Second proxy should claim other port
p2 := p1
p2.TargetServiceID = "cache"
svc2, err := state.AddProxy(&p2, "fake-token")
require.NoError(err)
assert.Contains([]int{20000, 20001}, svc2.Port)
assert.NotEqual(svc.Port, svc2.Port)
// Just saving this for later...
p2Token, err := state.ProxyToken(svc2.ID)
require.NoError(err)
// Third proxy should fail as all ports are used
p3 := p1
p3.TargetServiceID = "db"
_, err = state.AddProxy(&p3, "fake-token")
require.Error(err)
// But if we set a port explicitly it should be OK
p3.Config = map[string]interface{}{
"bind_port": 1234,
"bind_address": "0.0.0.0",
}
svc3, err := state.AddProxy(&p3, "fake-token")
require.NoError(err)
require.Equal("0.0.0.0", svc3.Address)
require.Equal(1234, svc3.Port)
// Remove one of the auto-assigned proxies
err = state.RemoveProxy(svc2.ID)
require.NoError(err)
// Should be able to create a new proxy for that service with the port (it
// should have been "freed").
p4 := p2
svc4, err := state.AddProxy(&p4, "fake-token")
require.NoError(err)
assert.Contains([]int{20000, 20001}, svc2.Port)
assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed")
// Remove a proxy that doesn't exist should error
err = state.RemoveProxy("nope")
require.Error(err)
assert.Equal(&p4, state.Proxy(p4.ProxyService.ID),
"should fetch the right proxy details")
assert.Nil(state.Proxy("nope"))
proxies := state.Proxies()
assert.Len(proxies, 3)
assert.Equal(&p1, proxies[svc.ID])
assert.Equal(&p4, proxies[svc4.ID])
assert.Equal(&p3, proxies[svc3.ID])
tokens := make([]string, 4)
tokens[0], err = state.ProxyToken(svc.ID)
require.NoError(err)
// p2 not registered anymore but lets make sure p4 got a new token when it
// re-registered with same ID.
tokens[1] = p2Token
tokens[2], err = state.ProxyToken(svc3.ID)
require.NoError(err)
tokens[3], err = state.ProxyToken(svc4.ID)
require.NoError(err)
// Quick check all are distinct
for i := 0; i < len(tokens)-1; i++ {
assert.Len(tokens[i], 36) // Sanity check for UUIDish thing.
for j := i + 1; j < len(tokens); j++ {
assert.NotEqual(tokens[i], tokens[j], "tokens for proxy %d and %d match",
i+1, j+1)
}
}
}

View File

@ -1,5 +1,9 @@
package structs
import (
"github.com/mitchellh/mapstructure"
)
// ConnectAuthorizeRequest is the structure of a request to authorize
// a connection.
type ConnectAuthorizeRequest struct {
@ -15,3 +19,75 @@ type ConnectAuthorizeRequest struct {
ClientCertURI string
ClientCertSerial string
}
// ProxyExecMode encodes the mode for running a managed connect proxy.
type ProxyExecMode int
const (
// ProxyExecModeDaemon executes a proxy process as a supervised daemon.
ProxyExecModeDaemon ProxyExecMode = iota
// ProxyExecModeScript executes a proxy config script on each change to it's
// config.
ProxyExecModeScript
)
// ConnectManagedProxy represents the agent-local state for a configured proxy
// instance. This is never stored or sent to the servers and is only used to
// store the config for the proxy that the agent needs to track. For now it's
// really generic with only the fields the agent needs to act on defined while
// the rest of the proxy config is passed as opaque bag of attributes to support
// arbitrary config params for third-party proxy integrations. "External"
// proxies by definition register themselves and manage their own config
// externally so are never represented in agent state.
type ConnectManagedProxy struct {
// ExecMode is one of daemon or script.
ExecMode ProxyExecMode
// Command is the command to execute. Empty defaults to self-invoking the same
// consul binary with proxy subcomand for ProxyExecModeDaemon and is an error
// for ProxyExecModeScript.
Command string
// Config is the arbitrary configuration data provided with the registration.
Config map[string]interface{}
// ProxyService is a pointer to the local proxy's service record for
// convenience. The proxies ID and name etc. can be read from there. It may be
// nil if the agent is starting up and hasn't registered the service yet.
ProxyService *NodeService
// TargetServiceID is the ID of the target service on the localhost. It may
// not exist yet since bootstrapping is allowed to happen in either order.
TargetServiceID string
}
// ConnectManagedProxyConfig represents the parts of the proxy config the agent
// needs to understand. It's bad UX to make the user specify these separately
// just to make parsing simpler for us so this encapsulates the fields in
// ConnectManagedProxy.Config that we care about. They are all optoinal anyway
// and this is used to decode them with mapstructure.
type ConnectManagedProxyConfig struct {
BindAddress string `mapstructure:"bind_address"`
BindPort int `mapstructure:"bind_port"`
}
// ParseConfig attempts to read the fields we care about from the otherwise
// opaque config map. They are all optional but it may fail if one is specified
// but an invalid value.
func (p *ConnectManagedProxy) ParseConfig() (*ConnectManagedProxyConfig, error) {
var cfg ConnectManagedProxyConfig
d, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
ErrorUnused: false,
WeaklyTypedInput: true, // allow string port etc.
Result: &cfg,
})
if err != nil {
return nil, err
}
err = d.Decode(p.Config)
if err != nil {
return nil, err
}
return &cfg, nil
}

View File

@ -0,0 +1,115 @@
package structs
import (
"reflect"
"testing"
)
func TestConnectManagedProxy_ParseConfig(t *testing.T) {
tests := []struct {
name string
config map[string]interface{}
want *ConnectManagedProxyConfig
wantErr bool
}{
{
name: "empty",
config: nil,
want: &ConnectManagedProxyConfig{},
wantErr: false,
},
{
name: "specified",
config: map[string]interface{}{
"bind_address": "127.0.0.1",
"bind_port": 1234,
},
want: &ConnectManagedProxyConfig{
BindAddress: "127.0.0.1",
BindPort: 1234,
},
wantErr: false,
},
{
name: "stringy port",
config: map[string]interface{}{
"bind_address": "127.0.0.1",
"bind_port": "1234",
},
want: &ConnectManagedProxyConfig{
BindAddress: "127.0.0.1",
BindPort: 1234,
},
wantErr: false,
},
{
name: "empty addr",
config: map[string]interface{}{
"bind_address": "",
"bind_port": "1234",
},
want: &ConnectManagedProxyConfig{
BindAddress: "",
BindPort: 1234,
},
wantErr: false,
},
{
name: "empty port",
config: map[string]interface{}{
"bind_address": "127.0.0.1",
"bind_port": "",
},
want: nil,
wantErr: true,
},
{
name: "junk address",
config: map[string]interface{}{
"bind_address": 42,
"bind_port": "",
},
want: nil,
wantErr: true,
},
{
name: "zero port, missing addr",
config: map[string]interface{}{
"bind_port": 0,
},
want: &ConnectManagedProxyConfig{
BindPort: 0,
},
wantErr: false,
},
{
name: "extra fields present",
config: map[string]interface{}{
"bind_port": 1234,
"flamingos": true,
"upstream": []map[string]interface{}{
{"foo": "bar"},
},
},
want: &ConnectManagedProxyConfig{
BindPort: 1234,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &ConnectManagedProxy{
Config: tt.config,
}
got, err := p.ParseConfig()
if (err != nil) != tt.wantErr {
t.Errorf("ConnectManagedProxy.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ConnectManagedProxy.ParseConfig() = %v, want %v", got, tt.want)
}
})
}
}