mirror of https://github.com/status-im/consul.git
register TCP check for managed proxies
This commit is contained in:
parent
280f14d64c
commit
17789d4fe3
|
@ -2123,8 +2123,23 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
|
||||||
}
|
}
|
||||||
proxyService := proxyState.Proxy.ProxyService
|
proxyService := proxyState.Proxy.ProxyService
|
||||||
|
|
||||||
// TODO(banks): register proxy health checks.
|
// Register proxy TCP check. The built in proxy doesn't listen publically
|
||||||
err = a.AddService(proxyService, nil, persist, token)
|
// until it's loaded certs so this ensures we won't route traffic until it's
|
||||||
|
// ready.
|
||||||
|
proxyCfg, err := a.applyProxyConfigDefaults(proxyState.Proxy)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
chkTypes := []*structs.CheckType{
|
||||||
|
&structs.CheckType{
|
||||||
|
Name: "Connect Proxy Listening",
|
||||||
|
TCP: fmt.Sprintf("%s:%d", proxyCfg["bind_address"],
|
||||||
|
proxyCfg["bind_port"]),
|
||||||
|
Interval: 10 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = a.AddService(proxyService, chkTypes, persist, token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Remove the state too
|
// Remove the state too
|
||||||
a.State.RemoveProxy(proxyService.ID)
|
a.State.RemoveProxy(proxyService.ID)
|
||||||
|
@ -2138,6 +2153,56 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// applyProxyConfigDefaults takes a *structs.ConnectManagedProxy and returns
|
||||||
|
// it's Config map merged with any defaults from the Agent's config. It would be
|
||||||
|
// nicer if this were defined as a method on structs.ConnectManagedProxy but we
|
||||||
|
// can't do that because ot the import cycle it causes with agent/config.
|
||||||
|
func (a *Agent) applyProxyConfigDefaults(p *structs.ConnectManagedProxy) (map[string]interface{}, error) {
|
||||||
|
if p == nil || p.ProxyService == nil {
|
||||||
|
// Should never happen but protect from panic
|
||||||
|
return nil, fmt.Errorf("invalid proxy state")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup the target service
|
||||||
|
target := a.State.Service(p.TargetServiceID)
|
||||||
|
if target == nil {
|
||||||
|
// Can happen during deregistration race between proxy and scheduler.
|
||||||
|
return nil, fmt.Errorf("unknown target service ID: %s", p.TargetServiceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge globals defaults
|
||||||
|
config := make(map[string]interface{})
|
||||||
|
for k, v := range a.config.ConnectProxyDefaultConfig {
|
||||||
|
if _, ok := config[k]; !ok {
|
||||||
|
config[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy config from the proxy
|
||||||
|
for k, v := range p.Config {
|
||||||
|
config[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set defaults for anything that is still not specified but required.
|
||||||
|
// Note that these are not included in the content hash. Since we expect
|
||||||
|
// them to be static in general but some like the default target service
|
||||||
|
// port might not be. In that edge case services can set that explicitly
|
||||||
|
// when they re-register which will be caught though.
|
||||||
|
if _, ok := config["bind_port"]; !ok {
|
||||||
|
config["bind_port"] = p.ProxyService.Port
|
||||||
|
}
|
||||||
|
if _, ok := config["bind_address"]; !ok {
|
||||||
|
// Default to binding to the same address the agent is configured to
|
||||||
|
// bind to.
|
||||||
|
config["bind_address"] = a.config.BindAddr.String()
|
||||||
|
}
|
||||||
|
if _, ok := config["local_service_address"]; !ok {
|
||||||
|
// Default to localhost and the port the service registered with
|
||||||
|
config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d", target.Port)
|
||||||
|
}
|
||||||
|
return config, nil
|
||||||
|
}
|
||||||
|
|
||||||
// applyProxyDefaults modifies the given proxy by applying any configured
|
// applyProxyDefaults modifies the given proxy by applying any configured
|
||||||
// defaults, such as the default execution mode, command, etc.
|
// defaults, such as the default execution mode, command, etc.
|
||||||
func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error {
|
func (a *Agent) applyProxyDefaults(proxy *structs.ConnectManagedProxy) error {
|
||||||
|
|
|
@ -1033,34 +1033,10 @@ func (s *HTTPServer) AgentConnectProxyConfig(resp http.ResponseWriter, req *http
|
||||||
}
|
}
|
||||||
contentHash := fmt.Sprintf("%x", hash)
|
contentHash := fmt.Sprintf("%x", hash)
|
||||||
|
|
||||||
// Merge globals defaults
|
// Set defaults
|
||||||
config := make(map[string]interface{})
|
config, err := s.agent.applyProxyConfigDefaults(proxy.Proxy)
|
||||||
for k, v := range s.agent.config.ConnectProxyDefaultConfig {
|
if err != nil {
|
||||||
if _, ok := config[k]; !ok {
|
return "", nil, err
|
||||||
config[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set defaults for anything that is still not specified but required.
|
|
||||||
// Note that these are not included in the content hash. Since we expect
|
|
||||||
// them to be static in general but some like the default target service
|
|
||||||
// port might not be. In that edge case services can set that explicitly
|
|
||||||
// when they re-register which will be caught though.
|
|
||||||
for k, v := range proxy.Proxy.Config {
|
|
||||||
config[k] = v
|
|
||||||
}
|
|
||||||
if _, ok := config["bind_port"]; !ok {
|
|
||||||
config["bind_port"] = proxy.Proxy.ProxyService.Port
|
|
||||||
}
|
|
||||||
if _, ok := config["bind_address"]; !ok {
|
|
||||||
// Default to binding to the same address the agent is configured to
|
|
||||||
// bind to.
|
|
||||||
config["bind_address"] = s.agent.config.BindAddr.String()
|
|
||||||
}
|
|
||||||
if _, ok := config["local_service_address"]; !ok {
|
|
||||||
// Default to localhost and the port the service registered with
|
|
||||||
config["local_service_address"] = fmt.Sprintf("127.0.0.1:%d",
|
|
||||||
target.Port)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only merge in telemetry config from agent if the requested is
|
// Only merge in telemetry config from agent if the requested is
|
||||||
|
|
|
@ -2577,6 +2577,11 @@ func TestAgent_AddProxy(t *testing.T) {
|
||||||
script_command = ["bar", "foo"]
|
script_command = ["bar", "foo"]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ports {
|
||||||
|
proxy_min_port = 20000
|
||||||
|
proxy_max_port = 20000
|
||||||
|
}
|
||||||
`)
|
`)
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
@ -2590,6 +2595,7 @@ func TestAgent_AddProxy(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
desc string
|
desc string
|
||||||
proxy, wantProxy *structs.ConnectManagedProxy
|
proxy, wantProxy *structs.ConnectManagedProxy
|
||||||
|
wantTCPCheck string
|
||||||
wantErr bool
|
wantErr bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
@ -2606,7 +2612,7 @@ func TestAgent_AddProxy(t *testing.T) {
|
||||||
wantErr: true,
|
wantErr: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "basic proxy adding, unregistered service",
|
desc: "basic proxy adding, registered service",
|
||||||
proxy: &structs.ConnectManagedProxy{
|
proxy: &structs.ConnectManagedProxy{
|
||||||
ExecMode: structs.ProxyExecModeDaemon,
|
ExecMode: structs.ProxyExecModeDaemon,
|
||||||
Command: []string{"consul", "connect", "proxy"},
|
Command: []string{"consul", "connect", "proxy"},
|
||||||
|
@ -2656,6 +2662,21 @@ func TestAgent_AddProxy(t *testing.T) {
|
||||||
},
|
},
|
||||||
wantErr: false,
|
wantErr: false,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "managed proxy with custom bind port",
|
||||||
|
proxy: &structs.ConnectManagedProxy{
|
||||||
|
ExecMode: structs.ProxyExecModeDaemon,
|
||||||
|
Command: []string{"consul", "connect", "proxy"},
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": "bar",
|
||||||
|
"bind_address": "127.10.10.10",
|
||||||
|
"bind_port": 1234,
|
||||||
|
},
|
||||||
|
TargetServiceID: "web",
|
||||||
|
},
|
||||||
|
wantTCPCheck: "127.10.10.10:1234",
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
@ -2677,6 +2698,26 @@ func TestAgent_AddProxy(t *testing.T) {
|
||||||
}
|
}
|
||||||
wantProxy.ProxyService = got.Proxy.ProxyService
|
wantProxy.ProxyService = got.Proxy.ProxyService
|
||||||
require.Equal(wantProxy, got.Proxy)
|
require.Equal(wantProxy, got.Proxy)
|
||||||
|
|
||||||
|
// Ensure a TCP check was created for the service.
|
||||||
|
gotCheck := a.State.Check("service:web-proxy")
|
||||||
|
require.NotNil(gotCheck)
|
||||||
|
require.Equal("Connect Proxy Listening", gotCheck.Name)
|
||||||
|
|
||||||
|
// Confusingly, a.State.Check("service:web-proxy") will return the state
|
||||||
|
// but it's Definition field will be empty. This appears to be expected
|
||||||
|
// when adding Checks as part of `AddService`. Notice how `AddService`
|
||||||
|
// tests in this file don't assert on that state but instead look at the
|
||||||
|
// agent's check state directly to ensure the right thing was registered.
|
||||||
|
// We'll do the same for now.
|
||||||
|
gotTCP, ok := a.checkTCPs["service:web-proxy"]
|
||||||
|
require.True(ok)
|
||||||
|
wantTCPCheck := tt.wantTCPCheck
|
||||||
|
if wantTCPCheck == "" {
|
||||||
|
wantTCPCheck = "127.0.0.1:20000"
|
||||||
|
}
|
||||||
|
require.Equal(wantTCPCheck, gotTCP.TCP)
|
||||||
|
require.Equal(10*time.Second, gotTCP.Interval)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -360,7 +360,6 @@ func (p *Daemon) Close() error {
|
||||||
// If we're already stopped or never started, then no problem.
|
// If we're already stopped or never started, then no problem.
|
||||||
if p.stopped || p.process == nil {
|
if p.stopped || p.process == nil {
|
||||||
p.stopped = true
|
p.stopped = true
|
||||||
p.lock.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue