agent: start/stop proxies

This commit is contained in:
Mitchell Hashimoto 2018-04-27 11:24:49 -07:00
parent fbfc6fce66
commit 476ea7b04a
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
8 changed files with 147 additions and 30 deletions

View File

@ -40,6 +40,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/kardianos/osext"
"github.com/shirou/gopsutil/host"
"golang.org/x/net/http2"
)
@ -1268,6 +1269,11 @@ func (a *Agent) ShutdownAgent() error {
chk.Stop()
}
// Unload all our proxies so that we stop the running processes.
if err := a.unloadProxies(); err != nil {
a.logger.Printf("[WARN] agent: error stopping managed proxies: %s", err)
}
var err error
if a.delegate != nil {
err = a.delegate.Shutdown()
@ -2032,19 +2038,58 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error
// Lookup the target service token in state if there is one.
token := a.State.ServiceToken(proxy.TargetServiceID)
// Determine if we need to default the command
if proxy.ExecMode == structs.ProxyExecModeDaemon && len(proxy.Command) == 0 {
// We use the globally configured default command. If it is empty
// then we need to determine the subcommand for this agent.
cmd := a.config.ConnectProxyDefaultDaemonCommand
if len(cmd) == 0 {
var err error
cmd, err = a.defaultProxyCommand()
if err != nil {
return err
}
}
proxy.CommandDefault = cmd
}
// Add the proxy to local state first since we may need to assign a port which
// needs to be coordinate under state lock. AddProxy will generate the
// NodeService for the proxy populated with the allocated (or configured) port
// and an ID, but it doesn't add it to the agent directly since that could
// deadlock and we may need to coordinate adding it and persisting etc.
proxyService, err := a.State.AddProxy(proxy, token)
proxyState, oldProxy, err := a.State.AddProxy(proxy, token)
if err != nil {
return err
}
proxyService := proxyState.Proxy.ProxyService
// If we replaced an existing proxy, stop that process.
if oldProxy != nil {
if err := oldProxy.ProxyProcess.Stop(); err != nil {
a.logger.Printf(
"[ERR] error stopping managed proxy, may still be running: %s",
err)
}
}
// Start the proxy process
if err := proxyState.ProxyProcess.Start(); err != nil {
a.State.RemoveProxy(proxyService.ID)
return fmt.Errorf("error starting managed proxy: %s", err)
}
// TODO(banks): register proxy health checks.
err = a.AddService(proxyService, nil, persist, token)
if err != nil {
// Stop the proxy process if it was started
if err := proxyState.ProxyProcess.Stop(); err != nil {
a.logger.Printf(
"[ERR] error stopping managed proxy, may still be running: %s",
err)
}
// Remove the state too
a.State.RemoveProxy(proxyService.ID)
return err
@ -2061,15 +2106,37 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error {
return fmt.Errorf("proxyID missing")
}
if err := a.State.RemoveProxy(proxyID); err != nil {
// Remove the proxy from the local state
proxyState, err := a.State.RemoveProxy(proxyID)
if err != nil {
return err
}
// Stop the process. The proxy implementation is expected to perform
// retries so if this fails then retries have already been performed and
// the most we can do is just error.
if err := proxyState.ProxyProcess.Stop(); err != nil {
return fmt.Errorf("error stopping managed proxy process: %s", err)
}
// TODO(banks): unpersist proxy
return nil
}
// defaultProxyCommand returns the default Connect managed proxy command.
func (a *Agent) defaultProxyCommand() ([]string, error) {
// Get the path to the current exectuable. This is cached once by the
// library so this is effectively just a variable read.
execPath, err := osext.Executable()
if err != nil {
return nil, err
}
// "consul connect proxy" default value for managed daemon proxy
return []string{execPath, "connect", "proxy"}, nil
}
func (a *Agent) cancelCheckMonitors(checkID types.CheckID) {
// Stop any monitors
delete(a.checkReapAfter, checkID)

View File

@ -70,7 +70,7 @@ func TestAgent_Services(t *testing.T) {
},
TargetServiceID: "mysql",
}
_, err := a.State.AddProxy(prxy1, "")
_, _, err := a.State.AddProxy(prxy1, "")
require.NoError(t, err)
req, _ := http.NewRequest("GET", "/v1/agent/services", nil)
@ -1435,7 +1435,7 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) {
proxy := a.State.Proxy("web-proxy")
require.NotNil(proxy)
assert.Equal(structs.ProxyExecModeScript, proxy.Proxy.ExecMode)
assert.Equal("proxy.sh", proxy.Proxy.Command)
assert.Equal([]string{"proxy.sh"}, proxy.Proxy.Command)
assert.Equal(args.Connect.Proxy.Config, proxy.Proxy.Config)
// Ensure the token was configured
@ -2352,7 +2352,7 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) {
ProxyServiceID: "test-proxy",
TargetServiceID: "test",
TargetServiceName: "test",
ContentHash: "84346af2031659c9",
ContentHash: "365a50cbb9a748b6",
ExecMode: "daemon",
Command: nil,
Config: map[string]interface{}{
@ -2372,7 +2372,7 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) {
ur, err := copystructure.Copy(expectedResponse)
require.NoError(t, err)
updatedResponse := ur.(*api.ConnectProxyConfig)
updatedResponse.ContentHash = "e1e3395f0d00cd41"
updatedResponse.ContentHash = "b5bb0e4a0a58ca25"
upstreams := updatedResponse.Config["upstreams"].([]interface{})
upstreams = append(upstreams,
map[string]interface{}{

View File

@ -14,12 +14,32 @@ import (
func (s *State) newProxyProcess(p *structs.ConnectManagedProxy, pToken string) (proxy.Proxy, error) {
switch p.ExecMode {
case structs.ProxyExecModeDaemon:
command := p.Command
if len(command) == 0 {
command = p.CommandDefault
}
// This should never happen since validation should happen upstream
// but verify it because the alternative is to panic below.
if len(command) == 0 {
return nil, fmt.Errorf("daemon mode managed proxy requires command")
}
// Build the command to execute.
var cmd exec.Cmd
cmd.Path = command[0]
cmd.Args = command[1:]
// Build the daemon structure
return &proxy.Daemon{
Command: exec.Command(p.Command),
Command: &cmd,
ProxyToken: pToken,
Logger: s.logger,
}, nil
case structs.ProxyExecModeScript:
return &proxy.Noop{}, nil
default:
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
}

View File

@ -127,8 +127,8 @@ type ManagedProxy struct {
// use service-scoped ACL tokens distributed externally.
ProxyToken string
// ManagedProxy is the managed proxy itself that is running.
ManagedProxy proxy.Proxy
// ProxyProcess is the managed proxy itself that is running.
ProxyProcess proxy.Proxy
// WatchCh is a close-only chan that is closed when the proxy is removed or
// updated.
@ -573,22 +573,26 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState {
// (since that has to do other book keeping). The token passed here is the ACL
// token the service used to register itself so must have write on service
// record.
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*structs.NodeService, error) {
//
// AddProxy returns the newly added proxy, any replaced proxy, and an error.
// The second return value (replaced proxy) can be used to determine if
// the process needs to be updated or not.
func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, *ManagedProxy, error) {
if proxy == nil {
return nil, fmt.Errorf("no proxy")
return nil, nil, fmt.Errorf("no proxy")
}
// Lookup the local service
target := l.Service(proxy.TargetServiceID)
if target == nil {
return nil, fmt.Errorf("target service ID %s not registered",
return nil, nil, fmt.Errorf("target service ID %s not registered",
proxy.TargetServiceID)
}
// Get bind info from config
cfg, err := proxy.ParseConfig()
if err != nil {
return nil, err
return nil, nil, err
}
// Construct almost all of the NodeService that needs to be registered by the
@ -604,7 +608,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
pToken, err := uuid.GenerateUUID()
if err != nil {
return nil, err
return nil, nil, err
}
// Initialize the managed proxy process. This doesn't start anything,
@ -612,7 +616,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
// caller should call Proxy and use the returned ManagedProxy instance.
proxyProcess, err := l.newProxyProcess(proxy, pToken)
if err != nil {
return nil, err
return nil, nil, err
}
// Lock now. We can't lock earlier as l.Service would deadlock and shouldn't
@ -646,7 +650,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
}
// If no ports left (or auto ports disabled) fail
if svc.Port < 1 {
return nil, fmt.Errorf("no port provided for proxy bind_port and none "+
return nil, nil, fmt.Errorf("no port provided for proxy bind_port and none "+
" left in the allocated range [%d, %d]", l.config.ProxyBindMinPort,
l.config.ProxyBindMaxPort)
}
@ -654,7 +658,8 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
proxy.ProxyService = svc
// All set, add the proxy and return the service
if old, ok := l.managedProxies[svc.ID]; ok {
old, ok := l.managedProxies[svc.ID]
if ok {
// Notify watchers of the existing proxy config that it's changing. Note
// this is safe here even before the map is updated since we still hold the
// state lock and the watcher can't re-read the new config until we return
@ -664,22 +669,23 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str
l.managedProxies[svc.ID] = &ManagedProxy{
Proxy: proxy,
ProxyToken: pToken,
ManagedProxy: proxyProcess,
ProxyProcess: proxyProcess,
WatchCh: make(chan struct{}),
}
// No need to trigger sync as proxy state is local only.
return svc, nil
return l.managedProxies[svc.ID], old, nil
}
// RemoveProxy is used to remove a proxy entry from the local state.
func (l *State) RemoveProxy(id string) error {
// This returns the proxy that was removed.
func (l *State) RemoveProxy(id string) (*ManagedProxy, error) {
l.Lock()
defer l.Unlock()
p := l.managedProxies[id]
if p == nil {
return fmt.Errorf("Proxy %s does not exist", id)
return nil, fmt.Errorf("Proxy %s does not exist", id)
}
delete(l.managedProxies, id)
@ -687,7 +693,7 @@ func (l *State) RemoveProxy(id string) error {
close(p.WatchCh)
// No need to trigger sync as proxy state is local only.
return nil
return p, nil
}
// Proxy returns the local proxy state.

View File

@ -1684,7 +1684,7 @@ func TestStateProxyManagement(t *testing.T) {
p1 := structs.ConnectManagedProxy{
ExecMode: structs.ProxyExecModeDaemon,
Command: "consul connect proxy",
Command: []string{"consul", "connect", "proxy"},
TargetServiceID: "web",
}
@ -1710,9 +1710,10 @@ func TestStateProxyManagement(t *testing.T) {
require.NoError(err)
// Should work now
svc, err := state.AddProxy(&p1, "fake-token")
pstate, err := state.AddProxy(&p1, "fake-token")
require.NoError(err)
svc := pstate.Proxy.ProxyService
assert.Equal("web-proxy", svc.ID)
assert.Equal("web-proxy", svc.Service)
assert.Equal(structs.ServiceKindConnectProxy, svc.Kind)
@ -1739,8 +1740,9 @@ func TestStateProxyManagement(t *testing.T) {
// Second proxy should claim other port
p2 := p1
p2.TargetServiceID = "cache"
svc2, err := state.AddProxy(&p2, "fake-token")
pstate2, err := state.AddProxy(&p2, "fake-token")
require.NoError(err)
svc2 := pstate2.Proxy.ProxyService
assert.Contains([]int{20000, 20001}, svc2.Port)
assert.NotEqual(svc.Port, svc2.Port)
@ -1758,8 +1760,9 @@ func TestStateProxyManagement(t *testing.T) {
"bind_port": 1234,
"bind_address": "0.0.0.0",
}
svc3, err := state.AddProxy(&p3, "fake-token")
pstate3, err := state.AddProxy(&p3, "fake-token")
require.NoError(err)
svc3 := pstate3.Proxy.ProxyService
require.Equal("0.0.0.0", svc3.Address)
require.Equal(1234, svc3.Port)
@ -1771,8 +1774,9 @@ func TestStateProxyManagement(t *testing.T) {
require.NotNil(gotP3)
var ws memdb.WatchSet
ws.Add(gotP3.WatchCh)
svc3, err = state.AddProxy(&p3updated, "fake-token")
pstate3, err = state.AddProxy(&p3updated, "fake-token")
require.NoError(err)
svc3 = pstate3.Proxy.ProxyService
require.Equal("0.0.0.0", svc3.Address)
require.Equal(1234, svc3.Port)
gotProxy3 := state.Proxy(svc3.ID)
@ -1782,19 +1786,20 @@ func TestStateProxyManagement(t *testing.T) {
"watch should have fired so ws.Watch should not timeout")
// Remove one of the auto-assigned proxies
err = state.RemoveProxy(svc2.ID)
_, err = state.RemoveProxy(svc2.ID)
require.NoError(err)
// Should be able to create a new proxy for that service with the port (it
// should have been "freed").
p4 := p2
svc4, err := state.AddProxy(&p4, "fake-token")
pstate4, err := state.AddProxy(&p4, "fake-token")
require.NoError(err)
svc4 := pstate4.Proxy.ProxyService
assert.Contains([]int{20000, 20001}, svc2.Port)
assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed")
// Remove a proxy that doesn't exist should error
err = state.RemoveProxy("nope")
_, err = state.RemoveProxy("nope")
require.Error(err)
assert.Equal(&p4, state.Proxy(p4.ProxyService.ID).Proxy,

7
agent/proxy/noop.go Normal file
View File

@ -0,0 +1,7 @@
package proxy
// Noop implements Proxy and does nothing.
type Noop struct{}
func (p *Noop) Start() error { return nil }
func (p *Noop) Stop() error { return nil }

9
agent/proxy/noop_test.go Normal file
View File

@ -0,0 +1,9 @@
package proxy
import (
"testing"
)
func TestNoop_impl(t *testing.T) {
var _ Proxy = new(Noop)
}

View File

@ -66,6 +66,9 @@ type ConnectManagedProxy struct {
// for ProxyExecModeScript.
Command []string
// CommandDefault is the default command to execute if Command is empty.
CommandDefault []string `json:"-" hash:"ignore"`
// Config is the arbitrary configuration data provided with the registration.
Config map[string]interface{}