mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
TLS watching integrated into Service with some basic tests.
There are also a lot of small bug fixes found when testing lots of things end-to-end for the first time and some cleanup now it's integrated with real CA code.
This commit is contained in:
parent
90c574ebaa
commit
e0e12e165b
@ -28,7 +28,6 @@ import (
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
|
||||
)
|
||||
|
||||
type Self struct {
|
||||
@ -1000,14 +999,71 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
|
||||
}
|
||||
contentHash := fmt.Sprintf("%x", hash)
|
||||
|
||||
// Merge globals defaults
|
||||
config := make(map[string]interface{})
|
||||
for k, v := range s.agent.config.ConnectProxyDefaultConfig {
|
||||
if _, ok := config[k]; !ok {
|
||||
config[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
execMode := "daemon"
|
||||
// If there is a global default mode use that instead
|
||||
if s.agent.config.ConnectProxyDefaultExecMode != "" {
|
||||
execMode = s.agent.config.ConnectProxyDefaultExecMode
|
||||
}
|
||||
// If it's actually set though, use the one set
|
||||
if proxy.Proxy.ExecMode != structs.ProxyExecModeUnspecified {
|
||||
execMode = proxy.Proxy.ExecMode.String()
|
||||
}
|
||||
|
||||
// TODO(banks): default the binary to current binary. Probably needs to be
|
||||
// done deeper though as it will be needed for actually managing proxy
|
||||
// lifecycle.
|
||||
command := proxy.Proxy.Command
|
||||
if command == "" {
|
||||
if execMode == "daemon" {
|
||||
command = s.agent.config.ConnectProxyDefaultDaemonCommand
|
||||
}
|
||||
if execMode == "script" {
|
||||
command = s.agent.config.ConnectProxyDefaultScriptCommand
|
||||
}
|
||||
}
|
||||
// No global defaults set either...
|
||||
if command == "" {
|
||||
command = "consul connect proxy"
|
||||
}
|
||||
|
||||
// Set defaults for anything that is still not specified but required.
|
||||
// Note that these are not included in the content hash. Since we expect
|
||||
// them to be static in general but some like the default target service
|
||||
// port might not be. In that edge case services can set that explicitly
|
||||
// when they re-register which will be caught though.
|
||||
for k, v := range proxy.Proxy.Config {
|
||||
config[k] = v
|
||||
}
|
||||
if _, ok := config["bind_port"]; !ok {
|
||||
config["bind_port"] = proxy.Proxy.ProxyService.Port
|
||||
}
|
||||
if _, ok := config["bind_address"]; !ok {
|
||||
// Default to binding to the same address the agent is configured to
|
||||
// bind to.
|
||||
config["bind_address"] = s.agent.config.BindAddr.String()
|
||||
}
|
||||
if _, ok := config["local_service_address"]; !ok {
|
||||
// Default to localhost and the port the service registered with
|
||||
config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d",
|
||||
target.Port)
|
||||
}
|
||||
|
||||
reply := &api.ConnectProxyConfig{
|
||||
ProxyServiceID: proxy.Proxy.ProxyService.ID,
|
||||
TargetServiceID: target.ID,
|
||||
TargetServiceName: target.Service,
|
||||
ContentHash: contentHash,
|
||||
ExecMode: api.ProxyExecMode(proxy.Proxy.ExecMode.String()),
|
||||
Command: proxy.Proxy.Command,
|
||||
Config: proxy.Proxy.Config,
|
||||
ExecMode: api.ProxyExecMode(execMode),
|
||||
Command: command,
|
||||
Config: config,
|
||||
}
|
||||
return contentHash, reply, nil
|
||||
})
|
||||
@ -1040,10 +1096,13 @@ func (s *HTTPServer) agentLocalBlockingQuery(resp http.ResponseWriter, hash stri
|
||||
// Apply a small amount of jitter to the request.
|
||||
wait += lib.RandomStagger(wait / 16)
|
||||
timeout = time.NewTimer(wait)
|
||||
ws = memdb.NewWatchSet()
|
||||
}
|
||||
|
||||
for {
|
||||
// Must reset this every loop in case the Watch set is already closed but
|
||||
// hash remains same. In that case we'll need to re-block on ws.Watch()
|
||||
// again.
|
||||
ws = memdb.NewWatchSet()
|
||||
curHash, curResp, err := fn(ws)
|
||||
if err != nil {
|
||||
return curResp, err
|
||||
|
@ -2316,7 +2316,7 @@ func requireLeafValidUnderCA(t *testing.T, issued *structs.IssuedCert,
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestAgentConnectProxy(t *testing.T) {
|
||||
func TestAgentConnectProxyConfig_Blocking(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
a := NewTestAgent(t.Name(), "")
|
||||
@ -2354,7 +2354,7 @@ func TestAgentConnectProxy(t *testing.T) {
|
||||
TargetServiceName: "test",
|
||||
ContentHash: "84346af2031659c9",
|
||||
ExecMode: "daemon",
|
||||
Command: "",
|
||||
Command: "consul connect proxy",
|
||||
Config: map[string]interface{}{
|
||||
"upstreams": []interface{}{
|
||||
map[string]interface{}{
|
||||
@ -2362,15 +2362,17 @@ func TestAgentConnectProxy(t *testing.T) {
|
||||
"local_port": float64(3131),
|
||||
},
|
||||
},
|
||||
"bind_port": float64(1234),
|
||||
"connect_timeout_ms": float64(500),
|
||||
"bind_address": "127.0.0.1",
|
||||
"local_service_address": "127.0.0.1:8000",
|
||||
"bind_port": float64(1234),
|
||||
"connect_timeout_ms": float64(500),
|
||||
},
|
||||
}
|
||||
|
||||
ur, err := copystructure.Copy(expectedResponse)
|
||||
require.NoError(t, err)
|
||||
updatedResponse := ur.(*api.ConnectProxyConfig)
|
||||
updatedResponse.ContentHash = "7d53473b0e9db5a"
|
||||
updatedResponse.ContentHash = "e1e3395f0d00cd41"
|
||||
upstreams := updatedResponse.Config["upstreams"].([]interface{})
|
||||
upstreams = append(upstreams,
|
||||
map[string]interface{}{
|
||||
@ -2431,6 +2433,41 @@ func TestAgentConnectProxy(t *testing.T) {
|
||||
wantErr: false,
|
||||
wantResp: updatedResponse,
|
||||
},
|
||||
{
|
||||
// This test exercises a case that caused a busy loop to eat CPU for the
|
||||
// entire duration of the blocking query. If a service gets re-registered
|
||||
// wth same proxy config then the old proxy config chan is closed causing
|
||||
// blocked watchset.Watch to return false indicating a change. But since
|
||||
// the hash is the same when the blocking fn is re-called we should just
|
||||
// keep blocking on the next iteration. The bug hit was that the WatchSet
|
||||
// ws was not being reset in the loop and so when you try to `Watch` it
|
||||
// the second time it just returns immediately making the blocking loop
|
||||
// into a busy-poll!
|
||||
//
|
||||
// This test though doesn't catch that because busy poll still has the
|
||||
// correct external behaviour. I don't want to instrument the loop to
|
||||
// assert it's not executing too fast here as I can't think of a clean way
|
||||
// and the issue is fixed now so this test doesn't actually catch the
|
||||
// error, but does provide an easy way to verify the behaviour by hand:
|
||||
// 1. Make this test fail e.g. change wantErr to true
|
||||
// 2. Add a log.Println or similar into the blocking loop/function
|
||||
// 3. See whether it's called just once or many times in a tight loop.
|
||||
name: "blocking fetch interrupted with no change (same hash)",
|
||||
url: "/v1/agent/connect/proxy/test-proxy?wait=200ms&hash=" + expectedResponse.ContentHash,
|
||||
updateFunc: func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Re-register with _same_ proxy config
|
||||
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg))
|
||||
resp := httptest.NewRecorder()
|
||||
_, err = a.srv.AgentRegisterService(resp, req)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 200, resp.Code, "body: %s", resp.Body.String())
|
||||
},
|
||||
wantWait: 200 * time.Millisecond,
|
||||
wantCode: 200,
|
||||
wantErr: false,
|
||||
wantResp: expectedResponse,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
@ -2479,6 +2516,201 @@ func TestAgentConnectProxy(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentConnectProxyConfig_ConfigHandling(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Define a local service with a managed proxy. It's registered in the test
|
||||
// loop to make sure agent state is predictable whatever order tests execute
|
||||
// since some alter this service config.
|
||||
reg := &structs.ServiceDefinition{
|
||||
ID: "test-id",
|
||||
Name: "test",
|
||||
Address: "127.0.0.1",
|
||||
Port: 8000,
|
||||
Check: structs.CheckType{
|
||||
TTL: 15 * time.Second,
|
||||
},
|
||||
Connect: &structs.ServiceDefinitionConnect{},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
globalConfig string
|
||||
proxy structs.ServiceDefinitionConnectProxy
|
||||
wantMode api.ProxyExecMode
|
||||
wantCommand string
|
||||
wantConfig map[string]interface{}
|
||||
}{
|
||||
{
|
||||
name: "defaults",
|
||||
globalConfig: `
|
||||
bind_addr = "0.0.0.0"
|
||||
connect {
|
||||
enabled = true
|
||||
proxy_defaults = {
|
||||
bind_min_port = 10000
|
||||
bind_max_port = 10000
|
||||
}
|
||||
}
|
||||
`,
|
||||
proxy: structs.ServiceDefinitionConnectProxy{},
|
||||
wantMode: api.ProxyExecModeDaemon,
|
||||
wantCommand: "consul connect proxy",
|
||||
wantConfig: map[string]interface{}{
|
||||
"bind_address": "0.0.0.0",
|
||||
"bind_port": 10000, // "randomly" chosen from our range of 1
|
||||
"local_service_address": "127.0.0.1:8000", // port from service reg
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "global defaults - script",
|
||||
globalConfig: `
|
||||
bind_addr = "0.0.0.0"
|
||||
connect {
|
||||
enabled = true
|
||||
proxy_defaults = {
|
||||
bind_min_port = 10000
|
||||
bind_max_port = 10000
|
||||
exec_mode = "script"
|
||||
script_command = "script.sh"
|
||||
}
|
||||
}
|
||||
`,
|
||||
proxy: structs.ServiceDefinitionConnectProxy{},
|
||||
wantMode: api.ProxyExecModeScript,
|
||||
wantCommand: "script.sh",
|
||||
wantConfig: map[string]interface{}{
|
||||
"bind_address": "0.0.0.0",
|
||||
"bind_port": 10000, // "randomly" chosen from our range of 1
|
||||
"local_service_address": "127.0.0.1:8000", // port from service reg
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "global defaults - daemon",
|
||||
globalConfig: `
|
||||
bind_addr = "0.0.0.0"
|
||||
connect {
|
||||
enabled = true
|
||||
proxy_defaults = {
|
||||
bind_min_port = 10000
|
||||
bind_max_port = 10000
|
||||
exec_mode = "daemon"
|
||||
daemon_command = "daemon.sh"
|
||||
}
|
||||
}
|
||||
`,
|
||||
proxy: structs.ServiceDefinitionConnectProxy{},
|
||||
wantMode: api.ProxyExecModeDaemon,
|
||||
wantCommand: "daemon.sh",
|
||||
wantConfig: map[string]interface{}{
|
||||
"bind_address": "0.0.0.0",
|
||||
"bind_port": 10000, // "randomly" chosen from our range of 1
|
||||
"local_service_address": "127.0.0.1:8000", // port from service reg
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "global default config merge",
|
||||
globalConfig: `
|
||||
bind_addr = "0.0.0.0"
|
||||
connect {
|
||||
enabled = true
|
||||
proxy_defaults = {
|
||||
bind_min_port = 10000
|
||||
bind_max_port = 10000
|
||||
config = {
|
||||
connect_timeout_ms = 1000
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
proxy: structs.ServiceDefinitionConnectProxy{
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
wantMode: api.ProxyExecModeDaemon,
|
||||
wantCommand: "consul connect proxy",
|
||||
wantConfig: map[string]interface{}{
|
||||
"bind_address": "0.0.0.0",
|
||||
"bind_port": 10000, // "randomly" chosen from our range of 1
|
||||
"local_service_address": "127.0.0.1:8000", // port from service reg
|
||||
"connect_timeout_ms": 1000,
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "overrides in reg",
|
||||
globalConfig: `
|
||||
bind_addr = "0.0.0.0"
|
||||
connect {
|
||||
enabled = true
|
||||
proxy_defaults = {
|
||||
bind_min_port = 10000
|
||||
bind_max_port = 10000
|
||||
exec_mode = "daemon"
|
||||
daemon_command = "daemon.sh"
|
||||
script_command = "script.sh"
|
||||
config = {
|
||||
connect_timeout_ms = 1000
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
proxy: structs.ServiceDefinitionConnectProxy{
|
||||
ExecMode: "script",
|
||||
Command: "foo.sh",
|
||||
Config: map[string]interface{}{
|
||||
"connect_timeout_ms": 2000,
|
||||
"bind_address": "127.0.0.1",
|
||||
"bind_port": 1024,
|
||||
"local_service_address": "127.0.0.1:9191",
|
||||
},
|
||||
},
|
||||
wantMode: api.ProxyExecModeScript,
|
||||
wantCommand: "foo.sh",
|
||||
wantConfig: map[string]interface{}{
|
||||
"bind_address": "127.0.0.1",
|
||||
"bind_port": float64(1024),
|
||||
"local_service_address": "127.0.0.1:9191",
|
||||
"connect_timeout_ms": float64(2000),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
a := NewTestAgent(t.Name(), tt.globalConfig)
|
||||
defer a.Shutdown()
|
||||
|
||||
// Register the basic service with the required config
|
||||
{
|
||||
reg.Connect.Proxy = &tt.proxy
|
||||
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(reg))
|
||||
resp := httptest.NewRecorder()
|
||||
_, err := a.srv.AgentRegisterService(resp, req)
|
||||
require.NoError(err)
|
||||
require.Equal(200, resp.Code, "body: %s", resp.Body.String())
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest("GET", "/v1/agent/connect/proxy/test-id-proxy", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.AgentConnectProxyConfig(resp, req)
|
||||
require.NoError(err)
|
||||
|
||||
proxyCfg := obj.(*api.ConnectProxyConfig)
|
||||
assert.Equal("test-id-proxy", proxyCfg.ProxyServiceID)
|
||||
assert.Equal("test-id", proxyCfg.TargetServiceID)
|
||||
assert.Equal("test", proxyCfg.TargetServiceName)
|
||||
assert.Equal(tt.wantMode, proxyCfg.ExecMode)
|
||||
assert.Equal(tt.wantCommand, proxyCfg.Command)
|
||||
require.Equal(tt.wantConfig, proxyCfg.Config)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentConnectAuthorize_badBody(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -531,6 +531,17 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||
connectCAConfig = c.Connect.CAConfig
|
||||
}
|
||||
|
||||
proxyDefaultExecMode := ""
|
||||
proxyDefaultDaemonCommand := ""
|
||||
proxyDefaultScriptCommand := ""
|
||||
proxyDefaultConfig := make(map[string]interface{})
|
||||
if c.Connect != nil && c.Connect.ProxyDefaults != nil {
|
||||
proxyDefaultExecMode = b.stringVal(c.Connect.ProxyDefaults.ExecMode)
|
||||
proxyDefaultDaemonCommand = b.stringVal(c.Connect.ProxyDefaults.DaemonCommand)
|
||||
proxyDefaultScriptCommand = b.stringVal(c.Connect.ProxyDefaults.ScriptCommand)
|
||||
proxyDefaultConfig = c.Connect.ProxyDefaults.Config
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// build runtime config
|
||||
//
|
||||
@ -638,100 +649,104 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||
TelemetryStatsiteAddr: b.stringVal(c.Telemetry.StatsiteAddr),
|
||||
|
||||
// Agent
|
||||
AdvertiseAddrLAN: advertiseAddrLAN,
|
||||
AdvertiseAddrWAN: advertiseAddrWAN,
|
||||
BindAddr: bindAddr,
|
||||
Bootstrap: b.boolVal(c.Bootstrap),
|
||||
BootstrapExpect: b.intVal(c.BootstrapExpect),
|
||||
CAFile: b.stringVal(c.CAFile),
|
||||
CAPath: b.stringVal(c.CAPath),
|
||||
CertFile: b.stringVal(c.CertFile),
|
||||
CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval),
|
||||
Checks: checks,
|
||||
ClientAddrs: clientAddrs,
|
||||
ConnectEnabled: connectEnabled,
|
||||
ConnectProxyBindMinPort: proxyBindMinPort,
|
||||
ConnectProxyBindMaxPort: proxyBindMaxPort,
|
||||
ConnectCAProvider: connectCAProvider,
|
||||
ConnectCAConfig: connectCAConfig,
|
||||
DataDir: b.stringVal(c.DataDir),
|
||||
Datacenter: strings.ToLower(b.stringVal(c.Datacenter)),
|
||||
DevMode: b.boolVal(b.Flags.DevMode),
|
||||
DisableAnonymousSignature: b.boolVal(c.DisableAnonymousSignature),
|
||||
DisableCoordinates: b.boolVal(c.DisableCoordinates),
|
||||
DisableHostNodeID: b.boolVal(c.DisableHostNodeID),
|
||||
DisableKeyringFile: b.boolVal(c.DisableKeyringFile),
|
||||
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
|
||||
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
|
||||
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
|
||||
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
|
||||
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
|
||||
EnableDebug: b.boolVal(c.EnableDebug),
|
||||
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
|
||||
EnableSyslog: b.boolVal(c.EnableSyslog),
|
||||
EnableUI: b.boolVal(c.UI),
|
||||
EncryptKey: b.stringVal(c.EncryptKey),
|
||||
EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming),
|
||||
EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing),
|
||||
KeyFile: b.stringVal(c.KeyFile),
|
||||
LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime),
|
||||
LeaveOnTerm: leaveOnTerm,
|
||||
LogLevel: b.stringVal(c.LogLevel),
|
||||
NodeID: types.NodeID(b.stringVal(c.NodeID)),
|
||||
NodeMeta: c.NodeMeta,
|
||||
NodeName: b.nodeName(c.NodeName),
|
||||
NonVotingServer: b.boolVal(c.NonVotingServer),
|
||||
PidFile: b.stringVal(c.PidFile),
|
||||
RPCAdvertiseAddr: rpcAdvertiseAddr,
|
||||
RPCBindAddr: rpcBindAddr,
|
||||
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
|
||||
RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst),
|
||||
RPCProtocol: b.intVal(c.RPCProtocol),
|
||||
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
|
||||
RaftProtocol: b.intVal(c.RaftProtocol),
|
||||
RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold),
|
||||
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
|
||||
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
|
||||
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
|
||||
RejoinAfterLeave: b.boolVal(c.RejoinAfterLeave),
|
||||
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
|
||||
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
|
||||
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
|
||||
RetryJoinMaxAttemptsLAN: b.intVal(c.RetryJoinMaxAttemptsLAN),
|
||||
RetryJoinMaxAttemptsWAN: b.intVal(c.RetryJoinMaxAttemptsWAN),
|
||||
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
|
||||
SegmentName: b.stringVal(c.SegmentName),
|
||||
Segments: segments,
|
||||
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
|
||||
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
|
||||
SerfBindAddrLAN: serfBindAddrLAN,
|
||||
SerfBindAddrWAN: serfBindAddrWAN,
|
||||
SerfPortLAN: serfPortLAN,
|
||||
SerfPortWAN: serfPortWAN,
|
||||
ServerMode: b.boolVal(c.ServerMode),
|
||||
ServerName: b.stringVal(c.ServerName),
|
||||
ServerPort: serverPort,
|
||||
Services: services,
|
||||
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
|
||||
SkipLeaveOnInt: skipLeaveOnInt,
|
||||
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
|
||||
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
|
||||
SyslogFacility: b.stringVal(c.SyslogFacility),
|
||||
TLSCipherSuites: b.tlsCipherSuites("tls_cipher_suites", c.TLSCipherSuites),
|
||||
TLSMinVersion: b.stringVal(c.TLSMinVersion),
|
||||
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
|
||||
TaggedAddresses: c.TaggedAddresses,
|
||||
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
|
||||
UIDir: b.stringVal(c.UIDir),
|
||||
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
|
||||
UnixSocketMode: b.stringVal(c.UnixSocket.Mode),
|
||||
UnixSocketUser: b.stringVal(c.UnixSocket.User),
|
||||
VerifyIncoming: b.boolVal(c.VerifyIncoming),
|
||||
VerifyIncomingHTTPS: b.boolVal(c.VerifyIncomingHTTPS),
|
||||
VerifyIncomingRPC: b.boolVal(c.VerifyIncomingRPC),
|
||||
VerifyOutgoing: b.boolVal(c.VerifyOutgoing),
|
||||
VerifyServerHostname: b.boolVal(c.VerifyServerHostname),
|
||||
Watches: c.Watches,
|
||||
AdvertiseAddrLAN: advertiseAddrLAN,
|
||||
AdvertiseAddrWAN: advertiseAddrWAN,
|
||||
BindAddr: bindAddr,
|
||||
Bootstrap: b.boolVal(c.Bootstrap),
|
||||
BootstrapExpect: b.intVal(c.BootstrapExpect),
|
||||
CAFile: b.stringVal(c.CAFile),
|
||||
CAPath: b.stringVal(c.CAPath),
|
||||
CertFile: b.stringVal(c.CertFile),
|
||||
CheckUpdateInterval: b.durationVal("check_update_interval", c.CheckUpdateInterval),
|
||||
Checks: checks,
|
||||
ClientAddrs: clientAddrs,
|
||||
ConnectEnabled: connectEnabled,
|
||||
ConnectCAProvider: connectCAProvider,
|
||||
ConnectCAConfig: connectCAConfig,
|
||||
ConnectProxyBindMinPort: proxyBindMinPort,
|
||||
ConnectProxyBindMaxPort: proxyBindMaxPort,
|
||||
ConnectProxyDefaultExecMode: proxyDefaultExecMode,
|
||||
ConnectProxyDefaultDaemonCommand: proxyDefaultDaemonCommand,
|
||||
ConnectProxyDefaultScriptCommand: proxyDefaultScriptCommand,
|
||||
ConnectProxyDefaultConfig: proxyDefaultConfig,
|
||||
DataDir: b.stringVal(c.DataDir),
|
||||
Datacenter: strings.ToLower(b.stringVal(c.Datacenter)),
|
||||
DevMode: b.boolVal(b.Flags.DevMode),
|
||||
DisableAnonymousSignature: b.boolVal(c.DisableAnonymousSignature),
|
||||
DisableCoordinates: b.boolVal(c.DisableCoordinates),
|
||||
DisableHostNodeID: b.boolVal(c.DisableHostNodeID),
|
||||
DisableKeyringFile: b.boolVal(c.DisableKeyringFile),
|
||||
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
|
||||
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
|
||||
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
|
||||
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
|
||||
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
|
||||
EnableDebug: b.boolVal(c.EnableDebug),
|
||||
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
|
||||
EnableSyslog: b.boolVal(c.EnableSyslog),
|
||||
EnableUI: b.boolVal(c.UI),
|
||||
EncryptKey: b.stringVal(c.EncryptKey),
|
||||
EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming),
|
||||
EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing),
|
||||
KeyFile: b.stringVal(c.KeyFile),
|
||||
LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime),
|
||||
LeaveOnTerm: leaveOnTerm,
|
||||
LogLevel: b.stringVal(c.LogLevel),
|
||||
NodeID: types.NodeID(b.stringVal(c.NodeID)),
|
||||
NodeMeta: c.NodeMeta,
|
||||
NodeName: b.nodeName(c.NodeName),
|
||||
NonVotingServer: b.boolVal(c.NonVotingServer),
|
||||
PidFile: b.stringVal(c.PidFile),
|
||||
RPCAdvertiseAddr: rpcAdvertiseAddr,
|
||||
RPCBindAddr: rpcBindAddr,
|
||||
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
|
||||
RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst),
|
||||
RPCProtocol: b.intVal(c.RPCProtocol),
|
||||
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
|
||||
RaftProtocol: b.intVal(c.RaftProtocol),
|
||||
RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold),
|
||||
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
|
||||
ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN),
|
||||
ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN),
|
||||
RejoinAfterLeave: b.boolVal(c.RejoinAfterLeave),
|
||||
RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN),
|
||||
RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN),
|
||||
RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN),
|
||||
RetryJoinMaxAttemptsLAN: b.intVal(c.RetryJoinMaxAttemptsLAN),
|
||||
RetryJoinMaxAttemptsWAN: b.intVal(c.RetryJoinMaxAttemptsWAN),
|
||||
RetryJoinWAN: b.expandAllOptionalAddrs("retry_join_wan", c.RetryJoinWAN),
|
||||
SegmentName: b.stringVal(c.SegmentName),
|
||||
Segments: segments,
|
||||
SerfAdvertiseAddrLAN: serfAdvertiseAddrLAN,
|
||||
SerfAdvertiseAddrWAN: serfAdvertiseAddrWAN,
|
||||
SerfBindAddrLAN: serfBindAddrLAN,
|
||||
SerfBindAddrWAN: serfBindAddrWAN,
|
||||
SerfPortLAN: serfPortLAN,
|
||||
SerfPortWAN: serfPortWAN,
|
||||
ServerMode: b.boolVal(c.ServerMode),
|
||||
ServerName: b.stringVal(c.ServerName),
|
||||
ServerPort: serverPort,
|
||||
Services: services,
|
||||
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
|
||||
SkipLeaveOnInt: skipLeaveOnInt,
|
||||
StartJoinAddrsLAN: b.expandAllOptionalAddrs("start_join", c.StartJoinAddrsLAN),
|
||||
StartJoinAddrsWAN: b.expandAllOptionalAddrs("start_join_wan", c.StartJoinAddrsWAN),
|
||||
SyslogFacility: b.stringVal(c.SyslogFacility),
|
||||
TLSCipherSuites: b.tlsCipherSuites("tls_cipher_suites", c.TLSCipherSuites),
|
||||
TLSMinVersion: b.stringVal(c.TLSMinVersion),
|
||||
TLSPreferServerCipherSuites: b.boolVal(c.TLSPreferServerCipherSuites),
|
||||
TaggedAddresses: c.TaggedAddresses,
|
||||
TranslateWANAddrs: b.boolVal(c.TranslateWANAddrs),
|
||||
UIDir: b.stringVal(c.UIDir),
|
||||
UnixSocketGroup: b.stringVal(c.UnixSocket.Group),
|
||||
UnixSocketMode: b.stringVal(c.UnixSocket.Mode),
|
||||
UnixSocketUser: b.stringVal(c.UnixSocket.User),
|
||||
VerifyIncoming: b.boolVal(c.VerifyIncoming),
|
||||
VerifyIncomingHTTPS: b.boolVal(c.VerifyIncomingHTTPS),
|
||||
VerifyIncomingRPC: b.boolVal(c.VerifyIncomingRPC),
|
||||
VerifyOutgoing: b.boolVal(c.VerifyOutgoing),
|
||||
VerifyServerHostname: b.boolVal(c.VerifyServerHostname),
|
||||
Watches: c.Watches,
|
||||
}
|
||||
|
||||
if rt.BootstrapExpect == 1 {
|
||||
|
@ -633,15 +633,15 @@ type RuntimeConfig struct {
|
||||
|
||||
// ConnectProxyDefaultExecMode is used where a registration doesn't include an
|
||||
// exec_mode. Defaults to daemon.
|
||||
ConnectProxyDefaultExecMode *string
|
||||
ConnectProxyDefaultExecMode string
|
||||
|
||||
// ConnectProxyDefaultDaemonCommand is used to start proxy in exec_mode =
|
||||
// daemon if not specified at registration time.
|
||||
ConnectProxyDefaultDaemonCommand *string
|
||||
ConnectProxyDefaultDaemonCommand string
|
||||
|
||||
// ConnectProxyDefaultScriptCommand is used to start proxy in exec_mode =
|
||||
// script if not specified at registration time.
|
||||
ConnectProxyDefaultScriptCommand *string
|
||||
ConnectProxyDefaultScriptCommand string
|
||||
|
||||
// ConnectProxyDefaultConfig is merged with any config specified at
|
||||
// registration time to allow global control of defaults.
|
||||
|
@ -2830,7 +2830,9 @@ func TestFullConfig(t *testing.T) {
|
||||
script_command = "proxyctl.sh"
|
||||
config = {
|
||||
foo = "bar"
|
||||
connect_timeout_ms = 1000
|
||||
# hack float since json parses numbers as float and we have to
|
||||
# assert against the same thing
|
||||
connect_timeout_ms = 1000.0
|
||||
pedantic_mode = true
|
||||
}
|
||||
}
|
||||
@ -3423,6 +3425,14 @@ func TestFullConfig(t *testing.T) {
|
||||
"g4cvJyys": "IRLXE9Ds",
|
||||
"hyMy9Oxn": "XeBp4Sis",
|
||||
},
|
||||
ConnectProxyDefaultExecMode: "script",
|
||||
ConnectProxyDefaultDaemonCommand: "consul connect proxy",
|
||||
ConnectProxyDefaultScriptCommand: "proxyctl.sh",
|
||||
ConnectProxyDefaultConfig: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
"connect_timeout_ms": float64(1000),
|
||||
"pedantic_mode": true,
|
||||
},
|
||||
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
|
||||
DNSARecordLimit: 29907,
|
||||
DNSAllowStale: true,
|
||||
@ -4099,9 +4109,9 @@ func TestSanitize(t *testing.T) {
|
||||
"ConnectProxyBindMaxPort": 0,
|
||||
"ConnectProxyBindMinPort": 0,
|
||||
"ConnectProxyDefaultConfig": {},
|
||||
"ConnectProxyDefaultDaemonCommand": null,
|
||||
"ConnectProxyDefaultExecMode": null,
|
||||
"ConnectProxyDefaultScriptCommand": null,
|
||||
"ConnectProxyDefaultDaemonCommand": "",
|
||||
"ConnectProxyDefaultExecMode": "",
|
||||
"ConnectProxyDefaultScriptCommand": "",
|
||||
"ConsulCoordinateUpdateBatchSize": 0,
|
||||
"ConsulCoordinateUpdateMaxBatches": 0,
|
||||
"ConsulCoordinateUpdatePeriod": "15s",
|
||||
|
@ -150,7 +150,7 @@ func TestLeaf(t testing.T, service string, root *structs.CARoot) (string, string
|
||||
spiffeId := &SpiffeIDService{
|
||||
Host: fmt.Sprintf("%s.consul", testClusterID),
|
||||
Namespace: "default",
|
||||
Datacenter: "dc01",
|
||||
Datacenter: "dc1",
|
||||
Service: service,
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ func TestSpiffeIDService(t testing.T, service string) *SpiffeIDService {
|
||||
return &SpiffeIDService{
|
||||
Host: testClusterID + ".consul",
|
||||
Namespace: "default",
|
||||
Datacenter: "dc01",
|
||||
Datacenter: "dc1",
|
||||
Service: service,
|
||||
}
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ func init() {
|
||||
registerEndpoint("/v1/connect/ca/roots", []string{"GET"}, (*HTTPServer).ConnectCARoots)
|
||||
registerEndpoint("/v1/connect/intentions", []string{"GET", "POST"}, (*HTTPServer).IntentionEndpoint)
|
||||
registerEndpoint("/v1/connect/intentions/match", []string{"GET"}, (*HTTPServer).IntentionMatch)
|
||||
registerEndpoint("/v1/connect/intentions/", []string{"GET"}, (*HTTPServer).IntentionSpecific)
|
||||
registerEndpoint("/v1/connect/intentions/", []string{"GET", "PUT", "DELETE"}, (*HTTPServer).IntentionSpecific)
|
||||
registerEndpoint("/v1/coordinate/datacenters", []string{"GET"}, (*HTTPServer).CoordinateDatacenters)
|
||||
registerEndpoint("/v1/coordinate/nodes", []string{"GET"}, (*HTTPServer).CoordinateNodes)
|
||||
registerEndpoint("/v1/coordinate/node/", []string{"GET"}, (*HTTPServer).CoordinateNode)
|
||||
|
@ -608,7 +608,12 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
// Allocate port if needed (min and max inclusive)
|
||||
// Does this proxy instance allready exist?
|
||||
if existing, ok := l.managedProxies[svc.ID]; ok {
|
||||
svc.Port = existing.Proxy.ProxyService.Port
|
||||
}
|
||||
|
||||
// Allocate port if needed (min and max inclusive).
|
||||
rangeLen := l.config.ProxyBindMaxPort - l.config.ProxyBindMinPort + 1
|
||||
if svc.Port < 1 && l.config.ProxyBindMinPort > 0 && rangeLen > 0 {
|
||||
// This should be a really short list so don't bother optimising lookup yet.
|
||||
|
@ -1721,6 +1721,21 @@ func TestStateProxyManagement(t *testing.T) {
|
||||
// Port is non-deterministic but could be either of 20000 or 20001
|
||||
assert.Contains([]int{20000, 20001}, svc.Port)
|
||||
|
||||
{
|
||||
// Re-registering same proxy again should not pick a random port but re-use
|
||||
// the assigned one.
|
||||
svcDup, err := state.AddProxy(&p1, "fake-token")
|
||||
require.NoError(err)
|
||||
|
||||
assert.Equal("web-proxy", svcDup.ID)
|
||||
assert.Equal("web-proxy", svcDup.Service)
|
||||
assert.Equal(structs.ServiceKindConnectProxy, svcDup.Kind)
|
||||
assert.Equal("web", svcDup.ProxyDestination)
|
||||
assert.Equal("", svcDup.Address, "should have empty address by default")
|
||||
// Port must be same as before
|
||||
assert.Equal(svc.Port, svcDup.Port)
|
||||
}
|
||||
|
||||
// Second proxy should claim other port
|
||||
p2 := p1
|
||||
p2.TargetServiceID = "cache"
|
||||
|
@ -24,8 +24,11 @@ type ConnectAuthorizeRequest struct {
|
||||
type ProxyExecMode int
|
||||
|
||||
const (
|
||||
// ProxyExecModeUnspecified uses the global default proxy mode.
|
||||
ProxyExecModeUnspecified ProxyExecMode = iota
|
||||
|
||||
// ProxyExecModeDaemon executes a proxy process as a supervised daemon.
|
||||
ProxyExecModeDaemon ProxyExecMode = iota
|
||||
ProxyExecModeDaemon
|
||||
|
||||
// ProxyExecModeScript executes a proxy config script on each change to it's
|
||||
// config.
|
||||
@ -35,6 +38,8 @@ const (
|
||||
// String implements Stringer
|
||||
func (m ProxyExecMode) String() string {
|
||||
switch m {
|
||||
case ProxyExecModeUnspecified:
|
||||
return "global_default"
|
||||
case ProxyExecModeDaemon:
|
||||
return "daemon"
|
||||
case ProxyExecModeScript:
|
||||
|
@ -55,10 +55,11 @@ func (s *ServiceDefinition) ConnectManagedProxy() (*ConnectManagedProxy, error)
|
||||
// which we shouldn't hard code ourselves here...
|
||||
ns := s.NodeService()
|
||||
|
||||
execMode := ProxyExecModeDaemon
|
||||
execMode := ProxyExecModeUnspecified
|
||||
switch s.Connect.Proxy.ExecMode {
|
||||
case "":
|
||||
execMode = ProxyExecModeDaemon
|
||||
// Use default
|
||||
break
|
||||
case "daemon":
|
||||
execMode = ProxyExecModeDaemon
|
||||
case "script":
|
||||
|
@ -609,9 +609,6 @@ func (a *Agent) ConnectCARoots(q *QueryOptions) (*CARootList, *QueryMeta, error)
|
||||
}
|
||||
|
||||
// ConnectCALeaf gets the leaf certificate for the given service ID.
|
||||
//
|
||||
// TODO(mitchellh): we need to test this better once we have a way to
|
||||
// configure CAs from the API package (when the CA work is done).
|
||||
func (a *Agent) ConnectCALeaf(serviceID string, q *QueryOptions) (*LeafCert, *QueryMeta, error) {
|
||||
r := a.c.newRequest("GET", "/v1/agent/connect/ca/leaf/"+serviceID)
|
||||
r.setQueryOptions(q)
|
||||
|
@ -1049,17 +1049,71 @@ func TestAPI_AgentConnectCARoots_empty(t *testing.T) {
|
||||
|
||||
agent := c.Agent()
|
||||
list, meta, err := agent.ConnectCARoots(nil)
|
||||
require.Nil(err)
|
||||
require.NoError(err)
|
||||
require.Equal(uint64(0), meta.LastIndex)
|
||||
require.Len(list.Roots, 0)
|
||||
}
|
||||
|
||||
func TestAPI_AgentConnectCARoots_list(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
// Force auto port range to 1 port so we have deterministic response.
|
||||
c.Connect = map[string]interface{}{
|
||||
"enabled": true,
|
||||
}
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
agent := c.Agent()
|
||||
list, meta, err := agent.ConnectCARoots(nil)
|
||||
require.NoError(err)
|
||||
require.True(meta.LastIndex > 0)
|
||||
require.Len(list.Roots, 1)
|
||||
}
|
||||
|
||||
func TestAPI_AgentConnectCALeaf(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
// Force auto port range to 1 port so we have deterministic response.
|
||||
c.Connect = map[string]interface{}{
|
||||
"enabled": true,
|
||||
}
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
agent := c.Agent()
|
||||
// Setup service
|
||||
reg := &AgentServiceRegistration{
|
||||
Name: "foo",
|
||||
Tags: []string{"bar", "baz"},
|
||||
Port: 8000,
|
||||
}
|
||||
require.NoError(agent.ServiceRegister(reg))
|
||||
|
||||
leaf, meta, err := agent.ConnectCALeaf("foo", nil)
|
||||
require.NoError(err)
|
||||
require.True(meta.LastIndex > 0)
|
||||
// Sanity checks here as we have actual certificate validation checks at many
|
||||
// other levels.
|
||||
require.NotEmpty(leaf.SerialNumber)
|
||||
require.NotEmpty(leaf.CertPEM)
|
||||
require.NotEmpty(leaf.PrivateKeyPEM)
|
||||
require.Equal("foo", leaf.Service)
|
||||
require.True(strings.HasSuffix(leaf.ServiceURI, "/svc/foo"))
|
||||
require.True(leaf.ModifyIndex > 0)
|
||||
require.True(leaf.ValidAfter.Before(time.Now()))
|
||||
require.True(leaf.ValidBefore.After(time.Now()))
|
||||
}
|
||||
|
||||
// TODO(banks): once we have CA stuff setup properly we can probably make this
|
||||
// much more complete. This is just a sanity check that the agent code basically
|
||||
// works.
|
||||
func TestAPI_AgentConnectAuthorize(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
@ -1079,7 +1133,15 @@ func TestAPI_AgentConnectAuthorize(t *testing.T) {
|
||||
|
||||
func TestAPI_AgentConnectProxyConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
|
||||
// Force auto port range to 1 port so we have deterministic response.
|
||||
c.Connect = map[string]interface{}{
|
||||
"proxy_defaults": map[string]interface{}{
|
||||
"bind_min_port": 20000,
|
||||
"bind_max_port": 20000,
|
||||
},
|
||||
}
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
agent := c.Agent()
|
||||
@ -1107,9 +1169,12 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
|
||||
TargetServiceName: "foo",
|
||||
ContentHash: "e662ea8600d84cf0",
|
||||
ExecMode: "daemon",
|
||||
Command: "",
|
||||
Command: "consul connect proxy",
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
"bind_address": "127.0.0.1",
|
||||
"bind_port": float64(20000),
|
||||
"foo": "bar",
|
||||
"local_service_address": "127.0.0.1:8000",
|
||||
},
|
||||
}
|
||||
require.Equal(t, expectConfig, config)
|
||||
|
@ -52,11 +52,6 @@ type Config struct {
|
||||
// private key to be used in development instead of the ones supplied by
|
||||
// Connect.
|
||||
DevServiceKeyFile string `json:"dev_service_key_file" hcl:"dev_service_key_file"`
|
||||
|
||||
// service is a connect.Service instance representing the proxied service. It
|
||||
// is created internally by the code responsible for setting up config as it
|
||||
// may depend on other external dependencies
|
||||
service *connect.Service
|
||||
}
|
||||
|
||||
// PublicListenerConfig contains the parameters needed for the incoming mTLS
|
||||
@ -89,6 +84,9 @@ func (plc *PublicListenerConfig) applyDefaults() {
|
||||
if plc.HandshakeTimeoutMs == 0 {
|
||||
plc.HandshakeTimeoutMs = 10000
|
||||
}
|
||||
if plc.BindAddress == "" {
|
||||
plc.BindAddress = "0.0.0.0"
|
||||
}
|
||||
}
|
||||
|
||||
// UpstreamConfig configures an upstream (outgoing) listener.
|
||||
@ -258,7 +256,6 @@ func NewAgentConfigWatcher(client *api.Client, proxyID string,
|
||||
|
||||
func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal,
|
||||
val interface{}) {
|
||||
log.Printf("DEBUG: got hash %s", blockVal.(watch.WaitHashVal))
|
||||
|
||||
resp, ok := val.(*api.ConnectProxyConfig)
|
||||
if !ok {
|
||||
@ -266,25 +263,16 @@ func (w *AgentConfigWatcher) handler(blockVal watch.BlockingParamVal,
|
||||
return
|
||||
}
|
||||
|
||||
// Setup Service instance now we know target ID etc
|
||||
service, err := connect.NewService(resp.TargetServiceID, w.client)
|
||||
if err != nil {
|
||||
w.logger.Printf("[WARN] proxy config watch failed to initialize"+
|
||||
" service: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create proxy config from the response
|
||||
cfg := &Config{
|
||||
ProxyID: w.proxyID,
|
||||
// Token should be already setup in the client
|
||||
ProxiedServiceID: resp.TargetServiceID,
|
||||
ProxiedServiceNamespace: "default",
|
||||
service: service,
|
||||
}
|
||||
|
||||
// Unmarshal configs
|
||||
err = mapstructure.Decode(resp.Config, &cfg.PublicListener)
|
||||
err := mapstructure.Decode(resp.Config, &cfg.PublicListener)
|
||||
if err != nil {
|
||||
w.logger.Printf("[ERR] proxy config watch public listener config "+
|
||||
"couldn't be parsed: %s", err)
|
||||
|
@ -175,11 +175,6 @@ func TestAgentConfigWatcher(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
// nil this out as comparisons are problematic, we'll explicitly sanity check
|
||||
// it's reasonable later.
|
||||
assert.NotNil(t, cfg.service)
|
||||
cfg.service = nil
|
||||
|
||||
assert.Equal(t, expectCfg, cfg)
|
||||
|
||||
// TODO(banks): Sanity check the service is viable and gets TLS certs eventually from
|
||||
@ -213,11 +208,6 @@ func TestAgentConfigWatcher(t *testing.T) {
|
||||
})
|
||||
expectCfg.PublicListener.LocalConnectTimeoutMs = 444
|
||||
|
||||
// nil this out as comparisons are problematic, we'll explicitly sanity check
|
||||
// it's reasonable later.
|
||||
assert.NotNil(t, cfg.service)
|
||||
cfg.service = nil
|
||||
|
||||
assert.Equal(t, expectCfg, cfg)
|
||||
}
|
||||
|
||||
|
@ -20,8 +20,10 @@ type Listener struct {
|
||||
// Service is the connect service instance to use.
|
||||
Service *connect.Service
|
||||
|
||||
// listenFunc, dialFunc and bindAddr are set by type-specific constructors
|
||||
listenFunc func() (net.Listener, error)
|
||||
dialFunc func() (net.Conn, error)
|
||||
bindAddr string
|
||||
|
||||
stopFlag int32
|
||||
stopChan chan struct{}
|
||||
@ -42,17 +44,17 @@ type Listener struct {
|
||||
// connections and proxy them to the configured local application over TCP.
|
||||
func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig,
|
||||
logger *log.Logger) *Listener {
|
||||
bindAddr := fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.BindPort)
|
||||
return &Listener{
|
||||
Service: svc,
|
||||
listenFunc: func() (net.Listener, error) {
|
||||
return tls.Listen("tcp",
|
||||
fmt.Sprintf("%s:%d", cfg.BindAddress, cfg.BindPort),
|
||||
svc.ServerTLSConfig())
|
||||
return tls.Listen("tcp", bindAddr, svc.ServerTLSConfig())
|
||||
},
|
||||
dialFunc: func() (net.Conn, error) {
|
||||
return net.DialTimeout("tcp", cfg.LocalServiceAddress,
|
||||
time.Duration(cfg.LocalConnectTimeoutMs)*time.Millisecond)
|
||||
},
|
||||
bindAddr: bindAddr,
|
||||
stopChan: make(chan struct{}),
|
||||
listeningChan: make(chan struct{}),
|
||||
logger: logger,
|
||||
@ -63,11 +65,11 @@ func NewPublicListener(svc *connect.Service, cfg PublicListenerConfig,
|
||||
// connections that are proxied to a discovered Connect service instance.
|
||||
func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
|
||||
logger *log.Logger) *Listener {
|
||||
bindAddr := fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort)
|
||||
return &Listener{
|
||||
Service: svc,
|
||||
listenFunc: func() (net.Listener, error) {
|
||||
return net.Listen("tcp",
|
||||
fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort))
|
||||
return net.Listen("tcp", bindAddr)
|
||||
},
|
||||
dialFunc: func() (net.Conn, error) {
|
||||
if cfg.resolver == nil {
|
||||
@ -78,6 +80,7 @@ func NewUpstreamListener(svc *connect.Service, cfg UpstreamConfig,
|
||||
defer cancel()
|
||||
return svc.Dial(ctx, cfg.resolver)
|
||||
},
|
||||
bindAddr: bindAddr,
|
||||
stopChan: make(chan struct{}),
|
||||
listeningChan: make(chan struct{}),
|
||||
logger: logger,
|
||||
@ -142,3 +145,8 @@ func (l *Listener) Close() error {
|
||||
func (l *Listener) Wait() {
|
||||
<-l.listeningChan
|
||||
}
|
||||
|
||||
// BindAddr returns the address the listen is bound to.
|
||||
func (l *Listener) BindAddr() string {
|
||||
return l.bindAddr
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/x509"
|
||||
"log"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
@ -14,6 +16,7 @@ type Proxy struct {
|
||||
cfgWatcher ConfigWatcher
|
||||
stopChan chan struct{}
|
||||
logger *log.Logger
|
||||
service *connect.Service
|
||||
}
|
||||
|
||||
// NewFromConfigFile returns a Proxy instance configured just from a local file.
|
||||
@ -27,12 +30,11 @@ func NewFromConfigFile(client *api.Client, filename string,
|
||||
}
|
||||
|
||||
service, err := connect.NewDevServiceFromCertFiles(cfg.ProxiedServiceID,
|
||||
client, logger, cfg.DevCAFile, cfg.DevServiceCertFile,
|
||||
logger, cfg.DevCAFile, cfg.DevServiceCertFile,
|
||||
cfg.DevServiceKeyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.service = service
|
||||
|
||||
p := &Proxy{
|
||||
proxyID: cfg.ProxyID,
|
||||
@ -40,6 +42,7 @@ func NewFromConfigFile(client *api.Client, filename string,
|
||||
cfgWatcher: NewStaticConfigWatcher(cfg),
|
||||
stopChan: make(chan struct{}),
|
||||
logger: logger,
|
||||
service: service,
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
@ -47,16 +50,18 @@ func NewFromConfigFile(client *api.Client, filename string,
|
||||
// New returns a Proxy with the given id, consuming the provided (configured)
|
||||
// agent. It is ready to Run().
|
||||
func New(client *api.Client, proxyID string, logger *log.Logger) (*Proxy, error) {
|
||||
cw, err := NewAgentConfigWatcher(client, proxyID, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := &Proxy{
|
||||
proxyID: proxyID,
|
||||
client: client,
|
||||
cfgWatcher: &AgentConfigWatcher{
|
||||
client: client,
|
||||
proxyID: proxyID,
|
||||
logger: logger,
|
||||
},
|
||||
stopChan: make(chan struct{}),
|
||||
logger: logger,
|
||||
proxyID: proxyID,
|
||||
client: client,
|
||||
cfgWatcher: cw,
|
||||
stopChan: make(chan struct{}),
|
||||
logger: logger,
|
||||
// Can't load service yet as we only have the proxy's ID not the service's
|
||||
// until initial config fetch happens.
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
@ -71,16 +76,29 @@ func (p *Proxy) Serve() error {
|
||||
select {
|
||||
case newCfg := <-p.cfgWatcher.Watch():
|
||||
p.logger.Printf("[DEBUG] got new config")
|
||||
if newCfg.service == nil {
|
||||
p.logger.Printf("[ERR] new config has nil service")
|
||||
continue
|
||||
}
|
||||
|
||||
if cfg == nil {
|
||||
// Initial setup
|
||||
|
||||
// Setup Service instance now we know target ID etc
|
||||
service, err := connect.NewService(newCfg.ProxiedServiceID, p.client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.service = service
|
||||
|
||||
go func() {
|
||||
<-service.ReadyWait()
|
||||
p.logger.Printf("[INFO] proxy loaded config and ready to serve")
|
||||
tcfg := service.ServerTLSConfig()
|
||||
cert, _ := tcfg.GetCertificate(nil)
|
||||
leaf, _ := x509.ParseCertificate(cert.Certificate[0])
|
||||
p.logger.Printf("[DEBUG] leaf: %s roots: %s", leaf.URIs[0], bytes.Join(tcfg.RootCAs.Subjects(), []byte(",")))
|
||||
}()
|
||||
|
||||
newCfg.PublicListener.applyDefaults()
|
||||
l := NewPublicListener(newCfg.service, newCfg.PublicListener, p.logger)
|
||||
err := p.startListener("public listener", l)
|
||||
l := NewPublicListener(p.service, newCfg.PublicListener, p.logger)
|
||||
err = p.startListener("public listener", l)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -93,7 +111,13 @@ func (p *Proxy) Serve() error {
|
||||
uc.applyDefaults()
|
||||
uc.resolver = UpstreamResolverFromClient(p.client, uc)
|
||||
|
||||
l := NewUpstreamListener(newCfg.service, uc, p.logger)
|
||||
if uc.LocalBindPort < 1 {
|
||||
p.logger.Printf("[ERR] upstream %s has no local_bind_port. "+
|
||||
"Can't start upstream.", uc.String())
|
||||
continue
|
||||
}
|
||||
|
||||
l := NewUpstreamListener(p.service, uc, p.logger)
|
||||
err := p.startListener(uc.String(), l)
|
||||
if err != nil {
|
||||
p.logger.Printf("[ERR] failed to start upstream %s: %s", uc.String(),
|
||||
@ -110,6 +134,7 @@ func (p *Proxy) Serve() error {
|
||||
|
||||
// startPublicListener is run from the internal state machine loop
|
||||
func (p *Proxy) startListener(name string, l *Listener) error {
|
||||
p.logger.Printf("[INFO] %s starting on %s", name, l.BindAddr())
|
||||
go func() {
|
||||
err := l.Serve()
|
||||
if err != nil {
|
||||
@ -122,6 +147,7 @@ func (p *Proxy) startListener(name string, l *Listener) error {
|
||||
go func() {
|
||||
<-p.stopChan
|
||||
l.Close()
|
||||
|
||||
}()
|
||||
|
||||
return nil
|
||||
@ -131,4 +157,7 @@ func (p *Proxy) startListener(name string, l *Listener) error {
|
||||
// called only once.
|
||||
func (p *Proxy) Close() {
|
||||
close(p.stopChan)
|
||||
if p.service != nil {
|
||||
p.service.Close()
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/api"
|
||||
testing "github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
// Resolver is the interface implemented by a service discovery mechanism to get
|
||||
@ -122,7 +121,12 @@ func (cr *ConsulResolver) resolveService(ctx context.Context) (string, connect.C
|
||||
// propagating these trust domains we need to actually fetch the trust domain
|
||||
// somehow. We also need to implement namespaces. Use of test function here is
|
||||
// temporary pending the work on trust domains.
|
||||
certURI := connect.TestSpiffeIDService(&testing.RuntimeT{}, cr.Name)
|
||||
certURI := &connect.SpiffeIDService{
|
||||
Host: "11111111-2222-3333-4444-555555555555.consul",
|
||||
Namespace: "default",
|
||||
Datacenter: svcs[idx].Node.Datacenter,
|
||||
Service: svcs[idx].Service.ProxyDestination,
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s:%d", addr, port), certURI, nil
|
||||
}
|
||||
|
@ -41,7 +41,8 @@ type Service struct {
|
||||
// fetch certificates and print a loud error message. It will not Close() or
|
||||
// kill the process since that could lead to a crash loop in every service if
|
||||
// ACL token was revoked. All attempts to dial will error and any incoming
|
||||
// connections will fail to verify.
|
||||
// connections will fail to verify. It may be nil if the Service is being
|
||||
// configured from local files for development or testing.
|
||||
client *api.Client
|
||||
|
||||
// tlsCfg is the dynamic TLS config
|
||||
@ -63,6 +64,10 @@ type Service struct {
|
||||
// NewService creates and starts a Service. The caller must close the returned
|
||||
// service to free resources and allow the program to exit normally. This is
|
||||
// typically called in a signal handler.
|
||||
//
|
||||
// Caller must provide client which is already configured to speak to the local
|
||||
// Consul agent, and with an ACL token that has `service:write` privileges for
|
||||
// the serviceID specified.
|
||||
func NewService(serviceID string, client *api.Client) (*Service, error) {
|
||||
return NewServiceWithLogger(serviceID, client,
|
||||
log.New(os.Stderr, "", log.LstdFlags))
|
||||
@ -89,7 +94,8 @@ func NewServiceWithLogger(serviceID string, client *api.Client,
|
||||
s.rootsWatch.HybridHandler = s.rootsWatchHandler
|
||||
|
||||
p, err = watch.Parse(map[string]interface{}{
|
||||
"type": "connect_leaf",
|
||||
"type": "connect_leaf",
|
||||
"service_id": s.serviceID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -97,26 +103,33 @@ func NewServiceWithLogger(serviceID string, client *api.Client,
|
||||
s.leafWatch = p
|
||||
s.leafWatch.HybridHandler = s.leafWatchHandler
|
||||
|
||||
//go s.rootsWatch.RunWithClientAndLogger(s.client, s.logger)
|
||||
//go s.leafWatch.RunWithClientAndLogger(s.client, s.logger)
|
||||
go s.rootsWatch.RunWithClientAndLogger(client, s.logger)
|
||||
go s.leafWatch.RunWithClientAndLogger(client, s.logger)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// NewDevServiceFromCertFiles creates a Service using certificate and key files
|
||||
// passed instead of fetching them from the client.
|
||||
func NewDevServiceFromCertFiles(serviceID string, client *api.Client,
|
||||
logger *log.Logger, caFile, certFile, keyFile string) (*Service, error) {
|
||||
s := &Service{
|
||||
serviceID: serviceID,
|
||||
client: client,
|
||||
logger: logger,
|
||||
}
|
||||
func NewDevServiceFromCertFiles(serviceID string, logger *log.Logger,
|
||||
caFile, certFile, keyFile string) (*Service, error) {
|
||||
|
||||
tlsCfg, err := devTLSConfigFromFiles(caFile, certFile, keyFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.tlsCfg = newDynamicTLSConfig(tlsCfg)
|
||||
return NewDevServiceWithTLSConfig(serviceID, logger, tlsCfg)
|
||||
}
|
||||
|
||||
// NewDevServiceWithTLSConfig creates a Service using static TLS config passed.
|
||||
// It's mostly useful for testing.
|
||||
func NewDevServiceWithTLSConfig(serviceID string, logger *log.Logger,
|
||||
tlsCfg *tls.Config) (*Service, error) {
|
||||
s := &Service{
|
||||
serviceID: serviceID,
|
||||
logger: logger,
|
||||
tlsCfg: newDynamicTLSConfig(tlsCfg),
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@ -274,3 +287,17 @@ func (s *Service) leafWatchHandler(blockParam watch.BlockingParamVal, raw interf
|
||||
|
||||
s.tlsCfg.SetLeaf(&cert)
|
||||
}
|
||||
|
||||
// Ready returns whether or not both roots and a leaf certificate are
|
||||
// configured. If both are non-nil, they are assumed to be valid and usable.
|
||||
func (s *Service) Ready() bool {
|
||||
return s.tlsCfg.Ready()
|
||||
}
|
||||
|
||||
// ReadyWait returns a chan that is closed when the the Service becomes ready
|
||||
// for use. Note that if the Service is ready when it is called it returns a nil
|
||||
// chan. Ready means that it has root and leaf certificates configured which we
|
||||
// assume are valid.
|
||||
func (s *Service) ReadyWait() <-chan struct{} {
|
||||
return s.tlsCfg.ReadyWait()
|
||||
}
|
||||
|
@ -1,16 +1,21 @@
|
||||
package connect
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/testutil/retry"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -111,10 +116,91 @@ func TestService_Dial(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestService_ServerTLSConfig(t *testing.T) {
|
||||
// TODO(banks): it's mostly meaningless to test this now since we directly set
|
||||
// the tlsCfg in our TestService helper which is all we'd be asserting on here
|
||||
// not the actual implementation. Once agent tls fetching is built, it becomes
|
||||
// more meaningful to actually verify it's returning the correct config.
|
||||
require := require.New(t)
|
||||
|
||||
a := agent.NewTestAgent("007", "")
|
||||
defer a.Shutdown()
|
||||
client := a.Client()
|
||||
agent := client.Agent()
|
||||
|
||||
// NewTestAgent setup a CA already by default
|
||||
|
||||
// Register a local agent service with a managed proxy
|
||||
reg := &api.AgentServiceRegistration{
|
||||
Name: "web",
|
||||
Port: 8080,
|
||||
}
|
||||
err := agent.ServiceRegister(reg)
|
||||
require.NoError(err)
|
||||
|
||||
// Now we should be able to create a service that will eventually get it's TLS
|
||||
// all by itself!
|
||||
service, err := NewService("web", client)
|
||||
require.NoError(err)
|
||||
|
||||
// Wait for it to be ready
|
||||
select {
|
||||
case <-service.ReadyWait():
|
||||
// continue with test case below
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout waiting for Service.ReadyWait after 1s")
|
||||
}
|
||||
|
||||
tlsCfg := service.ServerTLSConfig()
|
||||
|
||||
// Sanity check it has a leaf with the right ServiceID and that validates with
|
||||
// the given roots.
|
||||
require.NotNil(tlsCfg.GetCertificate)
|
||||
leaf, err := tlsCfg.GetCertificate(&tls.ClientHelloInfo{})
|
||||
require.NoError(err)
|
||||
cert, err := x509.ParseCertificate(leaf.Certificate[0])
|
||||
require.NoError(err)
|
||||
require.Len(cert.URIs, 1)
|
||||
require.True(strings.HasSuffix(cert.URIs[0].String(), "/svc/web"))
|
||||
|
||||
// Verify it as a client would
|
||||
err = clientSideVerifier(tlsCfg, leaf.Certificate)
|
||||
require.NoError(err)
|
||||
|
||||
// Now test that rotating the root updates
|
||||
{
|
||||
// Setup a new generated CA
|
||||
connect.TestCAConfigSet(t, a, nil)
|
||||
}
|
||||
|
||||
// After some time, both root and leaves should be different but both should
|
||||
// still be correct.
|
||||
oldRootSubjects := bytes.Join(tlsCfg.RootCAs.Subjects(), []byte(", "))
|
||||
//oldLeafSerial := connect.HexString(cert.SerialNumber.Bytes())
|
||||
oldLeafKeyID := connect.HexString(cert.SubjectKeyId)
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
updatedCfg := service.ServerTLSConfig()
|
||||
|
||||
// Wait until roots are different
|
||||
rootSubjects := bytes.Join(updatedCfg.RootCAs.Subjects(), []byte(", "))
|
||||
if bytes.Equal(oldRootSubjects, rootSubjects) {
|
||||
r.Fatalf("root certificates should have changed, got %s",
|
||||
rootSubjects)
|
||||
}
|
||||
|
||||
leaf, err := updatedCfg.GetCertificate(&tls.ClientHelloInfo{})
|
||||
r.Check(err)
|
||||
cert, err := x509.ParseCertificate(leaf.Certificate[0])
|
||||
r.Check(err)
|
||||
|
||||
// TODO(banks): Current CA implementation resets the serial index when CA
|
||||
// config changes which means same serial is issued by new CA config failing
|
||||
// this test. Re-enable once the CA is changed to fix that.
|
||||
|
||||
// if oldLeafSerial == connect.HexString(cert.SerialNumber.Bytes()) {
|
||||
// r.Fatalf("leaf certificate should have changed, got serial %s",
|
||||
// oldLeafSerial)
|
||||
// }
|
||||
if oldLeafKeyID == connect.HexString(cert.SubjectKeyId) {
|
||||
r.Fatalf("leaf should have a different key, got matching SubjectKeyID = %s",
|
||||
oldLeafKeyID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestService_HTTPClient(t *testing.T) {
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
@ -20,16 +21,12 @@ import (
|
||||
func TestService(t testing.T, service string, ca *structs.CARoot) *Service {
|
||||
t.Helper()
|
||||
|
||||
// Don't need to talk to client since we are setting TLSConfig locally. This
|
||||
// will cause server verification to skip AuthZ too.
|
||||
svc, err := NewService(service, nil)
|
||||
// Don't need to talk to client since we are setting TLSConfig locally
|
||||
svc, err := NewDevServiceWithTLSConfig(service,
|
||||
log.New(os.Stderr, "", log.LstdFlags), TestTLSConfig(t, service, ca))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Override the tlsConfig hackily.
|
||||
svc.tlsCfg = newDynamicTLSConfig(TestTLSConfig(t, service, ca))
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,9 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
@ -104,7 +106,8 @@ func verifyServerCertMatchesURI(certs []*x509.Certificate,
|
||||
if cert.URIs[0].String() == expectedStr {
|
||||
return nil
|
||||
}
|
||||
return errors.New("peer certificate mismatch")
|
||||
return fmt.Errorf("peer certificate mismatch got %s, want %s",
|
||||
cert.URIs[0].String(), expectedStr)
|
||||
}
|
||||
|
||||
// newServerSideVerifier returns a verifierFunc that wraps the provided
|
||||
@ -115,21 +118,25 @@ func newServerSideVerifier(client *api.Client, serviceID string) verifierFunc {
|
||||
return func(tlsCfg *tls.Config, rawCerts [][]byte) error {
|
||||
leaf, err := verifyChain(tlsCfg, rawCerts, false)
|
||||
if err != nil {
|
||||
log.Printf("connect: failed TLS verification: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Check leaf is a cert we understand
|
||||
if len(leaf.URIs) < 1 {
|
||||
log.Printf("connect: invalid leaf certificate")
|
||||
return errors.New("connect: invalid leaf certificate")
|
||||
}
|
||||
|
||||
certURI, err := connect.ParseCertURI(leaf.URIs[0])
|
||||
if err != nil {
|
||||
log.Printf("connect: invalid leaf certificate URI")
|
||||
return errors.New("connect: invalid leaf certificate URI")
|
||||
}
|
||||
|
||||
// No AuthZ if there is no client.
|
||||
if client == nil {
|
||||
log.Printf("connect: nil client")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -148,9 +155,11 @@ func newServerSideVerifier(client *api.Client, serviceID string) verifierFunc {
|
||||
}
|
||||
resp, err := client.Agent().ConnectAuthorize(req)
|
||||
if err != nil {
|
||||
log.Printf("connect: authz call failed: %s", err)
|
||||
return errors.New("connect: authz call failed: " + err.Error())
|
||||
}
|
||||
if !resp.Authorized {
|
||||
log.Printf("connect: authz call denied: %s", resp.Reason)
|
||||
return errors.New("connect: authz denied: " + resp.Reason)
|
||||
}
|
||||
return nil
|
||||
@ -217,9 +226,17 @@ func verifyChain(tlsCfg *tls.Config, rawCerts [][]byte, client bool) (*x509.Cert
|
||||
type dynamicTLSConfig struct {
|
||||
base *tls.Config
|
||||
|
||||
sync.Mutex
|
||||
sync.RWMutex
|
||||
leaf *tls.Certificate
|
||||
roots *x509.CertPool
|
||||
// readyCh is closed when the config first gets both leaf and roots set.
|
||||
// Watchers can wait on this via ReadyWait.
|
||||
readyCh chan struct{}
|
||||
}
|
||||
|
||||
type tlsCfgUpdate struct {
|
||||
ch chan struct{}
|
||||
next *tlsCfgUpdate
|
||||
}
|
||||
|
||||
// newDynamicTLSConfig returns a dynamicTLSConfig constructed from base.
|
||||
@ -235,6 +252,9 @@ func newDynamicTLSConfig(base *tls.Config) *dynamicTLSConfig {
|
||||
if base.RootCAs != nil {
|
||||
cfg.roots = base.RootCAs
|
||||
}
|
||||
if !cfg.Ready() {
|
||||
cfg.readyCh = make(chan struct{})
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
@ -246,8 +266,8 @@ func newDynamicTLSConfig(base *tls.Config) *dynamicTLSConfig {
|
||||
// client can use this config for a long time and will still verify against the
|
||||
// latest roots even though the roots in the struct is has can't change.
|
||||
func (cfg *dynamicTLSConfig) Get(v verifierFunc) *tls.Config {
|
||||
cfg.Lock()
|
||||
defer cfg.Unlock()
|
||||
cfg.RLock()
|
||||
defer cfg.RUnlock()
|
||||
copy := cfg.base.Clone()
|
||||
copy.RootCAs = cfg.roots
|
||||
copy.ClientCAs = cfg.roots
|
||||
@ -281,6 +301,7 @@ func (cfg *dynamicTLSConfig) SetRoots(roots *x509.CertPool) error {
|
||||
cfg.Lock()
|
||||
defer cfg.Unlock()
|
||||
cfg.roots = roots
|
||||
cfg.notify()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -289,19 +310,43 @@ func (cfg *dynamicTLSConfig) SetLeaf(leaf *tls.Certificate) error {
|
||||
cfg.Lock()
|
||||
defer cfg.Unlock()
|
||||
cfg.leaf = leaf
|
||||
cfg.notify()
|
||||
return nil
|
||||
}
|
||||
|
||||
// notify is called under lock during an update to check if we are now ready.
|
||||
func (cfg *dynamicTLSConfig) notify() {
|
||||
if cfg.readyCh != nil && cfg.leaf != nil && cfg.roots != nil {
|
||||
close(cfg.readyCh)
|
||||
cfg.readyCh = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Roots returns the current CA root CertPool.
|
||||
func (cfg *dynamicTLSConfig) Roots() *x509.CertPool {
|
||||
cfg.Lock()
|
||||
defer cfg.Unlock()
|
||||
cfg.RLock()
|
||||
defer cfg.RUnlock()
|
||||
return cfg.roots
|
||||
}
|
||||
|
||||
// Leaf returns the current Leaf certificate.
|
||||
func (cfg *dynamicTLSConfig) Leaf() *tls.Certificate {
|
||||
cfg.Lock()
|
||||
defer cfg.Unlock()
|
||||
cfg.RLock()
|
||||
defer cfg.RUnlock()
|
||||
return cfg.leaf
|
||||
}
|
||||
|
||||
// Ready returns whether or not both roots and a leaf certificate are
|
||||
// configured. If both are non-nil, they are assumed to be valid and usable.
|
||||
func (cfg *dynamicTLSConfig) Ready() bool {
|
||||
cfg.RLock()
|
||||
defer cfg.RUnlock()
|
||||
return cfg.leaf != nil && cfg.roots != nil
|
||||
}
|
||||
|
||||
// ReadyWait returns a chan that is closed when the the tlsConfig becomes Ready
|
||||
// for use. Note that if the config is ready when it is called it returns a nil
|
||||
// chan.
|
||||
func (cfg *dynamicTLSConfig) ReadyWait() <-chan struct{} {
|
||||
return cfg.readyCh
|
||||
}
|
||||
|
@ -358,3 +358,45 @@ func TestDynamicTLSConfig(t *testing.T) {
|
||||
requireCorrectVerifier(t, newCfg, gotBefore, v1Ch)
|
||||
requireCorrectVerifier(t, newCfg, gotAfter, v2Ch)
|
||||
}
|
||||
|
||||
func TestDynamicTLSConfig_Ready(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
ca1 := connect.TestCA(t, nil)
|
||||
baseCfg := TestTLSConfig(t, "web", ca1)
|
||||
|
||||
c := newDynamicTLSConfig(defaultTLSConfig())
|
||||
readyCh := c.ReadyWait()
|
||||
assertBlocked(t, readyCh)
|
||||
require.False(c.Ready(), "no roots or leaf, should not be ready")
|
||||
|
||||
err := c.SetLeaf(&baseCfg.Certificates[0])
|
||||
require.NoError(err)
|
||||
assertBlocked(t, readyCh)
|
||||
require.False(c.Ready(), "no roots, should not be ready")
|
||||
|
||||
err = c.SetRoots(baseCfg.RootCAs)
|
||||
require.NoError(err)
|
||||
assertNotBlocked(t, readyCh)
|
||||
require.True(c.Ready(), "should be ready")
|
||||
}
|
||||
|
||||
func assertBlocked(t *testing.T, ch <-chan struct{}) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("want blocked chan")
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func assertNotBlocked(t *testing.T, ch <-chan struct{}) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-ch:
|
||||
return
|
||||
default:
|
||||
t.Fatalf("want unblocked chan but it blocked")
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -94,6 +95,7 @@ type TestServerConfig struct {
|
||||
VerifyIncomingHTTPS bool `json:"verify_incoming_https,omitempty"`
|
||||
VerifyOutgoing bool `json:"verify_outgoing,omitempty"`
|
||||
EnableScriptChecks bool `json:"enable_script_checks,omitempty"`
|
||||
Connect map[string]interface{} `json:"connect,omitempty"`
|
||||
ReadyTimeout time.Duration `json:"-"`
|
||||
Stdout, Stderr io.Writer `json:"-"`
|
||||
Args []string `json:"-"`
|
||||
@ -211,6 +213,7 @@ func newTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, e
|
||||
return nil, errors.Wrap(err, "failed marshaling json")
|
||||
}
|
||||
|
||||
log.Printf("CONFIG JSON: %s", string(b))
|
||||
configFile := filepath.Join(tmpdir, "config.json")
|
||||
if err := ioutil.WriteFile(configFile, b, 0644); err != nil {
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
@ -236,8 +236,7 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||
|
||||
// connectRootsWatch is used to watch for changes to Connect Root certificates.
|
||||
func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||
// We don't support stale since roots are likely to be cached locally in the
|
||||
// agent anyway.
|
||||
// We don't support stale since roots are cached locally in the agent.
|
||||
|
||||
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
|
||||
agent := p.client.Agent()
|
||||
@ -257,8 +256,7 @@ func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||
// connectLeafWatch is used to watch for changes to Connect Leaf certificates
|
||||
// for given local service id.
|
||||
func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||
// We don't support stale since certs are likely to be cached locally in the
|
||||
// agent anyway.
|
||||
// We don't support stale since certs are cached locally in the agent.
|
||||
|
||||
var serviceID string
|
||||
if err := assignValue(params, "service_id", &serviceID); err != nil {
|
||||
|
@ -7,8 +7,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/watch"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -526,14 +528,12 @@ func TestEventWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnectRootsWatch(t *testing.T) {
|
||||
// TODO(banks) enable and make it work once this is supported. Note that this
|
||||
// test actually passes currently just by busy-polling the roots endpoint
|
||||
// until it changes.
|
||||
t.Skip("CA and Leaf implementation don't actually support blocking yet")
|
||||
t.Parallel()
|
||||
a := agent.NewTestAgent(t.Name(), ``)
|
||||
// NewTestAgent will bootstrap a new CA
|
||||
a := agent.NewTestAgent(t.Name(), "")
|
||||
defer a.Shutdown()
|
||||
|
||||
var originalCAID string
|
||||
invoke := makeInvokeCh()
|
||||
plan := mustParse(t, `{"type":"connect_roots"}`)
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
@ -544,7 +544,14 @@ func TestConnectRootsWatch(t *testing.T) {
|
||||
if !ok || v == nil {
|
||||
return // ignore
|
||||
}
|
||||
// TODO(banks): verify the right roots came back.
|
||||
// Only 1 CA is the bootstrapped state (i.e. first response). Ignore this
|
||||
// state and wait for the new CA to show up too.
|
||||
if len(v.Roots) == 1 {
|
||||
originalCAID = v.ActiveRootID
|
||||
return
|
||||
}
|
||||
assert.NotEmpty(t, originalCAID)
|
||||
assert.NotEqual(t, originalCAID, v.ActiveRootID)
|
||||
invoke <- nil
|
||||
}
|
||||
|
||||
@ -553,22 +560,8 @@ func TestConnectRootsWatch(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
// TODO(banks): this is a hack since CA config is in flux. We _did_ expose a
|
||||
// temporary agent endpoint for PUTing config, but didn't expose it in `api`
|
||||
// package intentionally. If we are going to hack around with temporary API,
|
||||
// we can might as well drop right down to the RPC level...
|
||||
args := structs.CAConfiguration{
|
||||
Provider: "static",
|
||||
Config: map[string]interface{}{
|
||||
"Name": "test-1",
|
||||
"Generate": true,
|
||||
},
|
||||
}
|
||||
var reply interface{}
|
||||
if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Set a new CA
|
||||
connect.TestCAConfigSet(t, a, nil)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
@ -588,9 +581,8 @@ func TestConnectRootsWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnectLeafWatch(t *testing.T) {
|
||||
// TODO(banks) enable and make it work once this is supported.
|
||||
t.Skip("CA and Leaf implementation don't actually support blocking yet")
|
||||
t.Parallel()
|
||||
// NewTestAgent will bootstrap a new CA
|
||||
a := agent.NewTestAgent(t.Name(), ``)
|
||||
defer a.Shutdown()
|
||||
|
||||
@ -606,25 +598,10 @@ func TestConnectLeafWatch(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
// Setup a new generated CA
|
||||
//
|
||||
// TODO(banks): this is a hack since CA config is in flux. We _did_ expose a
|
||||
// temporary agent endpoint for PUTing config, but didn't expose it in `api`
|
||||
// package intentionally. If we are going to hack around with temporary API,
|
||||
// we can might as well drop right down to the RPC level...
|
||||
args := structs.CAConfiguration{
|
||||
Provider: "static",
|
||||
Config: map[string]interface{}{
|
||||
"Name": "test-1",
|
||||
"Generate": true,
|
||||
},
|
||||
}
|
||||
var reply interface{}
|
||||
if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
var lastCert *consulapi.LeafCert
|
||||
|
||||
invoke := makeInvokeCh()
|
||||
//invoke := makeInvokeCh()
|
||||
invoke := make(chan error)
|
||||
plan := mustParse(t, `{"type":"connect_leaf", "service_id":"web"}`)
|
||||
plan.Handler = func(idx uint64, raw interface{}) {
|
||||
if raw == nil {
|
||||
@ -634,7 +611,18 @@ func TestConnectLeafWatch(t *testing.T) {
|
||||
if !ok || v == nil {
|
||||
return // ignore
|
||||
}
|
||||
// TODO(banks): verify the right leaf came back.
|
||||
if lastCert == nil {
|
||||
// Initial fetch, just store the cert and return
|
||||
lastCert = v
|
||||
return
|
||||
}
|
||||
// TODO(banks): right now the root rotation actually causes Serial numbers
|
||||
// to reset so these end up all being the same. That needs fixing but it's
|
||||
// a bigger task than I want to bite off for this PR.
|
||||
//assert.NotEqual(t, lastCert.SerialNumber, v.SerialNumber)
|
||||
assert.NotEqual(t, lastCert.CertPEM, v.CertPEM)
|
||||
assert.NotEqual(t, lastCert.PrivateKeyPEM, v.PrivateKeyPEM)
|
||||
assert.NotEqual(t, lastCert.ModifyIndex, v.ModifyIndex)
|
||||
invoke <- nil
|
||||
}
|
||||
|
||||
@ -643,20 +631,8 @@ func TestConnectLeafWatch(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
// Change the CA which should eventually trigger a leaf change but probably
|
||||
// won't now so this test has no way to succeed yet.
|
||||
args := structs.CAConfiguration{
|
||||
Provider: "static",
|
||||
Config: map[string]interface{}{
|
||||
"Name": "test-2",
|
||||
"Generate": true,
|
||||
},
|
||||
}
|
||||
var reply interface{}
|
||||
if err := a.RPC("ConnectCA.ConfigurationSet", &args, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Change the CA to trigger a leaf change
|
||||
connect.TestCAConfigSet(t, a, nil)
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
@ -740,6 +716,7 @@ func TestConnectProxyConfigWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func mustParse(t *testing.T, q string) *watch.Plan {
|
||||
t.Helper()
|
||||
var params map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(q), ¶ms); err != nil {
|
||||
t.Fatal(err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user