From 476ea7b04a67ece39b4e0eda3233231c818bcbb8 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 27 Apr 2018 11:24:49 -0700 Subject: [PATCH] agent: start/stop proxies --- agent/agent.go | 71 +++++++++++++++++++++++++++++++++++- agent/agent_endpoint_test.go | 8 ++-- agent/local/proxy.go | 22 ++++++++++- agent/local/state.go | 36 ++++++++++-------- agent/local/state_test.go | 21 +++++++---- agent/proxy/noop.go | 7 ++++ agent/proxy/noop_test.go | 9 +++++ agent/structs/connect.go | 3 ++ 8 files changed, 147 insertions(+), 30 deletions(-) create mode 100644 agent/proxy/noop.go create mode 100644 agent/proxy/noop_test.go diff --git a/agent/agent.go b/agent/agent.go index 9dfe2abead..f70c16379a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -40,6 +40,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" + "github.com/kardianos/osext" "github.com/shirou/gopsutil/host" "golang.org/x/net/http2" ) @@ -1268,6 +1269,11 @@ func (a *Agent) ShutdownAgent() error { chk.Stop() } + // Unload all our proxies so that we stop the running processes. + if err := a.unloadProxies(); err != nil { + a.logger.Printf("[WARN] agent: error stopping managed proxies: %s", err) + } + var err error if a.delegate != nil { err = a.delegate.Shutdown() @@ -2032,19 +2038,58 @@ func (a *Agent) AddProxy(proxy *structs.ConnectManagedProxy, persist bool) error // Lookup the target service token in state if there is one. token := a.State.ServiceToken(proxy.TargetServiceID) + // Determine if we need to default the command + if proxy.ExecMode == structs.ProxyExecModeDaemon && len(proxy.Command) == 0 { + // We use the globally configured default command. If it is empty + // then we need to determine the subcommand for this agent. + cmd := a.config.ConnectProxyDefaultDaemonCommand + if len(cmd) == 0 { + var err error + cmd, err = a.defaultProxyCommand() + if err != nil { + return err + } + } + + proxy.CommandDefault = cmd + } + // Add the proxy to local state first since we may need to assign a port which // needs to be coordinate under state lock. AddProxy will generate the // NodeService for the proxy populated with the allocated (or configured) port // and an ID, but it doesn't add it to the agent directly since that could // deadlock and we may need to coordinate adding it and persisting etc. - proxyService, err := a.State.AddProxy(proxy, token) + proxyState, oldProxy, err := a.State.AddProxy(proxy, token) if err != nil { return err } + proxyService := proxyState.Proxy.ProxyService + + // If we replaced an existing proxy, stop that process. + if oldProxy != nil { + if err := oldProxy.ProxyProcess.Stop(); err != nil { + a.logger.Printf( + "[ERR] error stopping managed proxy, may still be running: %s", + err) + } + } + + // Start the proxy process + if err := proxyState.ProxyProcess.Start(); err != nil { + a.State.RemoveProxy(proxyService.ID) + return fmt.Errorf("error starting managed proxy: %s", err) + } // TODO(banks): register proxy health checks. err = a.AddService(proxyService, nil, persist, token) if err != nil { + // Stop the proxy process if it was started + if err := proxyState.ProxyProcess.Stop(); err != nil { + a.logger.Printf( + "[ERR] error stopping managed proxy, may still be running: %s", + err) + } + // Remove the state too a.State.RemoveProxy(proxyService.ID) return err @@ -2061,15 +2106,37 @@ func (a *Agent) RemoveProxy(proxyID string, persist bool) error { return fmt.Errorf("proxyID missing") } - if err := a.State.RemoveProxy(proxyID); err != nil { + // Remove the proxy from the local state + proxyState, err := a.State.RemoveProxy(proxyID) + if err != nil { return err } + // Stop the process. The proxy implementation is expected to perform + // retries so if this fails then retries have already been performed and + // the most we can do is just error. + if err := proxyState.ProxyProcess.Stop(); err != nil { + return fmt.Errorf("error stopping managed proxy process: %s", err) + } + // TODO(banks): unpersist proxy return nil } +// defaultProxyCommand returns the default Connect managed proxy command. +func (a *Agent) defaultProxyCommand() ([]string, error) { + // Get the path to the current exectuable. This is cached once by the + // library so this is effectively just a variable read. + execPath, err := osext.Executable() + if err != nil { + return nil, err + } + + // "consul connect proxy" default value for managed daemon proxy + return []string{execPath, "connect", "proxy"}, nil +} + func (a *Agent) cancelCheckMonitors(checkID types.CheckID) { // Stop any monitors delete(a.checkReapAfter, checkID) diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 6cff0fa595..26a04dddd6 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -70,7 +70,7 @@ func TestAgent_Services(t *testing.T) { }, TargetServiceID: "mysql", } - _, err := a.State.AddProxy(prxy1, "") + _, _, err := a.State.AddProxy(prxy1, "") require.NoError(t, err) req, _ := http.NewRequest("GET", "/v1/agent/services", nil) @@ -1435,7 +1435,7 @@ func TestAgent_RegisterService_ManagedConnectProxy(t *testing.T) { proxy := a.State.Proxy("web-proxy") require.NotNil(proxy) assert.Equal(structs.ProxyExecModeScript, proxy.Proxy.ExecMode) - assert.Equal("proxy.sh", proxy.Proxy.Command) + assert.Equal([]string{"proxy.sh"}, proxy.Proxy.Command) assert.Equal(args.Connect.Proxy.Config, proxy.Proxy.Config) // Ensure the token was configured @@ -2352,7 +2352,7 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) { ProxyServiceID: "test-proxy", TargetServiceID: "test", TargetServiceName: "test", - ContentHash: "84346af2031659c9", + ContentHash: "365a50cbb9a748b6", ExecMode: "daemon", Command: nil, Config: map[string]interface{}{ @@ -2372,7 +2372,7 @@ func TestAgentConnectProxyConfig_Blocking(t *testing.T) { ur, err := copystructure.Copy(expectedResponse) require.NoError(t, err) updatedResponse := ur.(*api.ConnectProxyConfig) - updatedResponse.ContentHash = "e1e3395f0d00cd41" + updatedResponse.ContentHash = "b5bb0e4a0a58ca25" upstreams := updatedResponse.Config["upstreams"].([]interface{}) upstreams = append(upstreams, map[string]interface{}{ diff --git a/agent/local/proxy.go b/agent/local/proxy.go index 7f004a7abc..37484a32f0 100644 --- a/agent/local/proxy.go +++ b/agent/local/proxy.go @@ -14,12 +14,32 @@ import ( func (s *State) newProxyProcess(p *structs.ConnectManagedProxy, pToken string) (proxy.Proxy, error) { switch p.ExecMode { case structs.ProxyExecModeDaemon: + command := p.Command + if len(command) == 0 { + command = p.CommandDefault + } + + // This should never happen since validation should happen upstream + // but verify it because the alternative is to panic below. + if len(command) == 0 { + return nil, fmt.Errorf("daemon mode managed proxy requires command") + } + + // Build the command to execute. + var cmd exec.Cmd + cmd.Path = command[0] + cmd.Args = command[1:] + + // Build the daemon structure return &proxy.Daemon{ - Command: exec.Command(p.Command), + Command: &cmd, ProxyToken: pToken, Logger: s.logger, }, nil + case structs.ProxyExecModeScript: + return &proxy.Noop{}, nil + default: return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode) } diff --git a/agent/local/state.go b/agent/local/state.go index ccb4d77e1a..ecd3299fd8 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -127,8 +127,8 @@ type ManagedProxy struct { // use service-scoped ACL tokens distributed externally. ProxyToken string - // ManagedProxy is the managed proxy itself that is running. - ManagedProxy proxy.Proxy + // ProxyProcess is the managed proxy itself that is running. + ProxyProcess proxy.Proxy // WatchCh is a close-only chan that is closed when the proxy is removed or // updated. @@ -573,22 +573,26 @@ func (l *State) CriticalCheckStates() map[types.CheckID]*CheckState { // (since that has to do other book keeping). The token passed here is the ACL // token the service used to register itself so must have write on service // record. -func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*structs.NodeService, error) { +// +// AddProxy returns the newly added proxy, any replaced proxy, and an error. +// The second return value (replaced proxy) can be used to determine if +// the process needs to be updated or not. +func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*ManagedProxy, *ManagedProxy, error) { if proxy == nil { - return nil, fmt.Errorf("no proxy") + return nil, nil, fmt.Errorf("no proxy") } // Lookup the local service target := l.Service(proxy.TargetServiceID) if target == nil { - return nil, fmt.Errorf("target service ID %s not registered", + return nil, nil, fmt.Errorf("target service ID %s not registered", proxy.TargetServiceID) } // Get bind info from config cfg, err := proxy.ParseConfig() if err != nil { - return nil, err + return nil, nil, err } // Construct almost all of the NodeService that needs to be registered by the @@ -604,7 +608,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str pToken, err := uuid.GenerateUUID() if err != nil { - return nil, err + return nil, nil, err } // Initialize the managed proxy process. This doesn't start anything, @@ -612,7 +616,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str // caller should call Proxy and use the returned ManagedProxy instance. proxyProcess, err := l.newProxyProcess(proxy, pToken) if err != nil { - return nil, err + return nil, nil, err } // Lock now. We can't lock earlier as l.Service would deadlock and shouldn't @@ -646,7 +650,7 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str } // If no ports left (or auto ports disabled) fail if svc.Port < 1 { - return nil, fmt.Errorf("no port provided for proxy bind_port and none "+ + return nil, nil, fmt.Errorf("no port provided for proxy bind_port and none "+ " left in the allocated range [%d, %d]", l.config.ProxyBindMinPort, l.config.ProxyBindMaxPort) } @@ -654,7 +658,8 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str proxy.ProxyService = svc // All set, add the proxy and return the service - if old, ok := l.managedProxies[svc.ID]; ok { + old, ok := l.managedProxies[svc.ID] + if ok { // Notify watchers of the existing proxy config that it's changing. Note // this is safe here even before the map is updated since we still hold the // state lock and the watcher can't re-read the new config until we return @@ -664,22 +669,23 @@ func (l *State) AddProxy(proxy *structs.ConnectManagedProxy, token string) (*str l.managedProxies[svc.ID] = &ManagedProxy{ Proxy: proxy, ProxyToken: pToken, - ManagedProxy: proxyProcess, + ProxyProcess: proxyProcess, WatchCh: make(chan struct{}), } // No need to trigger sync as proxy state is local only. - return svc, nil + return l.managedProxies[svc.ID], old, nil } // RemoveProxy is used to remove a proxy entry from the local state. -func (l *State) RemoveProxy(id string) error { +// This returns the proxy that was removed. +func (l *State) RemoveProxy(id string) (*ManagedProxy, error) { l.Lock() defer l.Unlock() p := l.managedProxies[id] if p == nil { - return fmt.Errorf("Proxy %s does not exist", id) + return nil, fmt.Errorf("Proxy %s does not exist", id) } delete(l.managedProxies, id) @@ -687,7 +693,7 @@ func (l *State) RemoveProxy(id string) error { close(p.WatchCh) // No need to trigger sync as proxy state is local only. - return nil + return p, nil } // Proxy returns the local proxy state. diff --git a/agent/local/state_test.go b/agent/local/state_test.go index dd887ccb12..f79249a73e 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1684,7 +1684,7 @@ func TestStateProxyManagement(t *testing.T) { p1 := structs.ConnectManagedProxy{ ExecMode: structs.ProxyExecModeDaemon, - Command: "consul connect proxy", + Command: []string{"consul", "connect", "proxy"}, TargetServiceID: "web", } @@ -1710,9 +1710,10 @@ func TestStateProxyManagement(t *testing.T) { require.NoError(err) // Should work now - svc, err := state.AddProxy(&p1, "fake-token") + pstate, err := state.AddProxy(&p1, "fake-token") require.NoError(err) + svc := pstate.Proxy.ProxyService assert.Equal("web-proxy", svc.ID) assert.Equal("web-proxy", svc.Service) assert.Equal(structs.ServiceKindConnectProxy, svc.Kind) @@ -1739,8 +1740,9 @@ func TestStateProxyManagement(t *testing.T) { // Second proxy should claim other port p2 := p1 p2.TargetServiceID = "cache" - svc2, err := state.AddProxy(&p2, "fake-token") + pstate2, err := state.AddProxy(&p2, "fake-token") require.NoError(err) + svc2 := pstate2.Proxy.ProxyService assert.Contains([]int{20000, 20001}, svc2.Port) assert.NotEqual(svc.Port, svc2.Port) @@ -1758,8 +1760,9 @@ func TestStateProxyManagement(t *testing.T) { "bind_port": 1234, "bind_address": "0.0.0.0", } - svc3, err := state.AddProxy(&p3, "fake-token") + pstate3, err := state.AddProxy(&p3, "fake-token") require.NoError(err) + svc3 := pstate3.Proxy.ProxyService require.Equal("0.0.0.0", svc3.Address) require.Equal(1234, svc3.Port) @@ -1771,8 +1774,9 @@ func TestStateProxyManagement(t *testing.T) { require.NotNil(gotP3) var ws memdb.WatchSet ws.Add(gotP3.WatchCh) - svc3, err = state.AddProxy(&p3updated, "fake-token") + pstate3, err = state.AddProxy(&p3updated, "fake-token") require.NoError(err) + svc3 = pstate3.Proxy.ProxyService require.Equal("0.0.0.0", svc3.Address) require.Equal(1234, svc3.Port) gotProxy3 := state.Proxy(svc3.ID) @@ -1782,19 +1786,20 @@ func TestStateProxyManagement(t *testing.T) { "watch should have fired so ws.Watch should not timeout") // Remove one of the auto-assigned proxies - err = state.RemoveProxy(svc2.ID) + _, err = state.RemoveProxy(svc2.ID) require.NoError(err) // Should be able to create a new proxy for that service with the port (it // should have been "freed"). p4 := p2 - svc4, err := state.AddProxy(&p4, "fake-token") + pstate4, err := state.AddProxy(&p4, "fake-token") require.NoError(err) + svc4 := pstate4.Proxy.ProxyService assert.Contains([]int{20000, 20001}, svc2.Port) assert.Equal(svc4.Port, svc2.Port, "should get the same port back that we freed") // Remove a proxy that doesn't exist should error - err = state.RemoveProxy("nope") + _, err = state.RemoveProxy("nope") require.Error(err) assert.Equal(&p4, state.Proxy(p4.ProxyService.ID).Proxy, diff --git a/agent/proxy/noop.go b/agent/proxy/noop.go new file mode 100644 index 0000000000..9b35a24272 --- /dev/null +++ b/agent/proxy/noop.go @@ -0,0 +1,7 @@ +package proxy + +// Noop implements Proxy and does nothing. +type Noop struct{} + +func (p *Noop) Start() error { return nil } +func (p *Noop) Stop() error { return nil } diff --git a/agent/proxy/noop_test.go b/agent/proxy/noop_test.go new file mode 100644 index 0000000000..77513ad29f --- /dev/null +++ b/agent/proxy/noop_test.go @@ -0,0 +1,9 @@ +package proxy + +import ( + "testing" +) + +func TestNoop_impl(t *testing.T) { + var _ Proxy = new(Noop) +} diff --git a/agent/structs/connect.go b/agent/structs/connect.go index 29330a6525..b40091adff 100644 --- a/agent/structs/connect.go +++ b/agent/structs/connect.go @@ -66,6 +66,9 @@ type ConnectManagedProxy struct { // for ProxyExecModeScript. Command []string + // CommandDefault is the default command to execute if Command is empty. + CommandDefault []string `json:"-" hash:"ignore"` + // Config is the arbitrary configuration data provided with the registration. Config map[string]interface{}