mirror of https://github.com/status-im/consul.git
Abandon daemonize for simpler solution (preserving history):
Reverts: - bdb274852ae469c89092d6050697c0ff97178465 - 2c689179c4f61c11f0016214c0fc127a0b813bfe - d62e25c4a7ab753914b6baccd66f88ffd10949a3 - c727ffbcc98e3e0bf41e1a7bdd40169bd2d22191 - 31b4d18933fd0acbe157e28d03ad59c2abf9a1fb - 85c3f8df3eabc00f490cd392213c3b928a85aa44
This commit is contained in:
parent
a2fe604191
commit
cdc7cfaa36
|
@ -366,7 +366,6 @@ func (a *Agent) Start() error {
|
|||
a.proxyManager = proxy.NewManager()
|
||||
a.proxyManager.State = a.State
|
||||
a.proxyManager.Logger = a.logger
|
||||
a.proxyManager.DisableDetach = a.config.ConnectDisableDetachedDaemons
|
||||
if a.config.DataDir != "" {
|
||||
// DataDir is required for all non-dev mode agents, but we want
|
||||
// to allow setting the data dir for demos and so on for the agent,
|
||||
|
@ -1320,8 +1319,11 @@ func (a *Agent) ShutdownAgent() error {
|
|||
}
|
||||
|
||||
// Stop the proxy manager
|
||||
// NOTE(mitchellh): we use Kill for now to kill the processes since
|
||||
// the local state isn't snapshotting meaning the proxy tokens are
|
||||
// regenerated each time forcing the processes to restart anyways.
|
||||
if a.proxyManager != nil {
|
||||
if err := a.proxyManager.Close(); err != nil {
|
||||
if err := a.proxyManager.Kill(); err != nil {
|
||||
a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
|
|
@ -621,16 +621,6 @@ type RuntimeConfig struct {
|
|||
// that.
|
||||
ConnectEnabled bool
|
||||
|
||||
// ConnectDisableDetachedDaemons is not exposed publically and is meant for
|
||||
// testing where having processes outlive the test is inconvenient. It also
|
||||
// allows tests outside of the `agent/proxy` package to ignore the unpleasant
|
||||
// details of self-executing the test binary in order to correctly detach a
|
||||
// process. It's set to true by default in TestAgent and setting it to false
|
||||
// in any test requires several hoops to be jumped through to allow the test
|
||||
// binary to behave as a daemonizer and for the agent to be configured to use
|
||||
// the right invocation of the binary for it.
|
||||
ConnectDisableDetachedDaemons bool
|
||||
|
||||
// ConnectProxyBindMinPort is the inclusive start of the range of ports
|
||||
// allocated to the agent for starting proxy listeners on where no explicit
|
||||
// port is specified.
|
||||
|
|
|
@ -4199,7 +4199,6 @@ func TestSanitize(t *testing.T) {
|
|||
"ClientAddrs": [],
|
||||
"ConnectCAConfig": {},
|
||||
"ConnectCAProvider": "",
|
||||
"ConnectDisableDetachedDaemons": false,
|
||||
"ConnectEnabled": false,
|
||||
"ConnectProxyBindMaxPort": 0,
|
||||
"ConnectProxyBindMinPort": 0,
|
||||
|
|
|
@ -1691,7 +1691,7 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
require := require.New(t)
|
||||
assert := assert.New(t)
|
||||
|
||||
_, err := state.AddProxy(&p1, "fake-token", "")
|
||||
_, err := state.AddProxy(&p1, "fake-token")
|
||||
require.Error(err, "should fail as the target service isn't registered")
|
||||
|
||||
// Sanity check done, lets add a couple of target services to the state
|
||||
|
@ -1710,7 +1710,7 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
require.NoError(err)
|
||||
|
||||
// Should work now
|
||||
pstate, err := state.AddProxy(&p1, "fake-token", "")
|
||||
pstate, err := state.AddProxy(&p1, "fake-token")
|
||||
require.NoError(err)
|
||||
|
||||
svc := pstate.Proxy.ProxyService
|
||||
|
@ -1724,9 +1724,8 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
|
||||
{
|
||||
// Re-registering same proxy again should not pick a random port but re-use
|
||||
// the assigned one. It should also keep the same proxy token since we don't
|
||||
// want to force restart for config change.
|
||||
pstateDup, err := state.AddProxy(&p1, "fake-token", "")
|
||||
// the assigned one.
|
||||
pstateDup, err := state.AddProxy(&p1, "fake-token")
|
||||
require.NoError(err)
|
||||
svcDup := pstateDup.Proxy.ProxyService
|
||||
|
||||
|
@ -1737,8 +1736,6 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
assert.Equal("", svcDup.Address, "should have empty address by default")
|
||||
// Port must be same as before
|
||||
assert.Equal(svc.Port, svcDup.Port)
|
||||
// Same ProxyToken
|
||||
assert.Equal(pstate.ProxyToken, pstateDup.ProxyToken)
|
||||
}
|
||||
|
||||
// Let's register a notifier now
|
||||
|
@ -1751,7 +1748,7 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
// Second proxy should claim other port
|
||||
p2 := p1
|
||||
p2.TargetServiceID = "cache"
|
||||
pstate2, err := state.AddProxy(&p2, "fake-token", "")
|
||||
pstate2, err := state.AddProxy(&p2, "fake-token")
|
||||
require.NoError(err)
|
||||
svc2 := pstate2.Proxy.ProxyService
|
||||
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||
|
@ -1767,7 +1764,7 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
// Third proxy should fail as all ports are used
|
||||
p3 := p1
|
||||
p3.TargetServiceID = "db"
|
||||
_, err = state.AddProxy(&p3, "fake-token", "")
|
||||
_, err = state.AddProxy(&p3, "fake-token")
|
||||
require.Error(err)
|
||||
|
||||
// Should have a notification but we'll do nothing so that the next
|
||||
|
@ -1778,7 +1775,7 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
"bind_port": 1234,
|
||||
"bind_address": "0.0.0.0",
|
||||
}
|
||||
pstate3, err := state.AddProxy(&p3, "fake-token", "")
|
||||
pstate3, err := state.AddProxy(&p3, "fake-token")
|
||||
require.NoError(err)
|
||||
svc3 := pstate3.Proxy.ProxyService
|
||||
require.Equal("0.0.0.0", svc3.Address)
|
||||
|
@ -1796,7 +1793,7 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
require.NotNil(gotP3)
|
||||
var ws memdb.WatchSet
|
||||
ws.Add(gotP3.WatchCh)
|
||||
pstate3, err = state.AddProxy(&p3updated, "fake-token", "")
|
||||
pstate3, err = state.AddProxy(&p3updated, "fake-token")
|
||||
require.NoError(err)
|
||||
svc3 = pstate3.Proxy.ProxyService
|
||||
require.Equal("0.0.0.0", svc3.Address)
|
||||
|
@ -1820,7 +1817,7 @@ func TestStateProxyManagement(t *testing.T) {
|
|||
// Should be able to create a new proxy for that service with the port (it
|
||||
// should have been "freed").
|
||||
p4 := p2
|
||||
pstate4, err := state.AddProxy(&p4, "fake-token", "")
|
||||
pstate4, err := state.AddProxy(&p4, "fake-token")
|
||||
require.NoError(err)
|
||||
svc4 := pstate4.Proxy.ProxyService
|
||||
assert.Contains([]int{20000, 20001}, svc2.Port)
|
||||
|
@ -1868,65 +1865,3 @@ func drainCh(ch chan struct{}) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the logic for retaining tokens and ports through restore (i.e.
|
||||
// proxy-service already restored and token passed in externally)
|
||||
func TestStateProxyRestore(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
state := local.NewState(local.Config{
|
||||
// Wide random range to make it very unlikely to pass by chance
|
||||
ProxyBindMinPort: 10000,
|
||||
ProxyBindMaxPort: 20000,
|
||||
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
||||
|
||||
// Stub state syncing
|
||||
state.TriggerSyncChanges = func() {}
|
||||
|
||||
webSvc := structs.NodeService{
|
||||
Service: "web",
|
||||
}
|
||||
|
||||
p1 := structs.ConnectManagedProxy{
|
||||
ExecMode: structs.ProxyExecModeDaemon,
|
||||
Command: []string{"consul", "connect", "proxy"},
|
||||
TargetServiceID: "web",
|
||||
}
|
||||
|
||||
p2 := p1
|
||||
|
||||
require := require.New(t)
|
||||
assert := assert.New(t)
|
||||
|
||||
// Add a target service
|
||||
require.NoError(state.AddService(&webSvc, "fake-token-web"))
|
||||
|
||||
// Add the proxy for first time to get the proper service definition to
|
||||
// register
|
||||
pstate, err := state.AddProxy(&p1, "fake-token", "")
|
||||
require.NoError(err)
|
||||
|
||||
// Now start again with a brand new state
|
||||
state2 := local.NewState(local.Config{
|
||||
// Wide random range to make it very unlikely to pass by chance
|
||||
ProxyBindMinPort: 10000,
|
||||
ProxyBindMaxPort: 20000,
|
||||
}, log.New(os.Stderr, "", log.LstdFlags), &token.Store{})
|
||||
|
||||
// Stub state syncing
|
||||
state2.TriggerSyncChanges = func() {}
|
||||
|
||||
// Register the target service
|
||||
require.NoError(state2.AddService(&webSvc, "fake-token-web"))
|
||||
|
||||
// "Restore" the proxy service
|
||||
require.NoError(state.AddService(p1.ProxyService, "fake-token-web"))
|
||||
|
||||
// Now we can AddProxy with the "restored" token
|
||||
pstate2, err := state.AddProxy(&p2, "fake-token", pstate.ProxyToken)
|
||||
require.NoError(err)
|
||||
|
||||
// Check it still has the same port and token as before
|
||||
assert.Equal(pstate.ProxyToken, pstate2.ProxyToken)
|
||||
assert.Equal(p1.ProxyService.Port, p2.ProxyService.Port)
|
||||
}
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -32,12 +29,9 @@ const (
|
|||
//
|
||||
// Consul will ensure that if the daemon crashes, that it is restarted.
|
||||
type Daemon struct {
|
||||
// Path is the path to the executable to run
|
||||
Path string
|
||||
|
||||
// Args are the arguments to run with, the first element should be the same as
|
||||
// Path.
|
||||
Args []string
|
||||
// Command is the command to execute to start this daemon. This must
|
||||
// be a Cmd that isn't yet started.
|
||||
Command *exec.Cmd
|
||||
|
||||
// ProxyId is the ID of the proxy service. This is required for API
|
||||
// requests (along with the token) and is passed via env var.
|
||||
|
@ -58,36 +52,10 @@ type Daemon struct {
|
|||
// created but the error will be logged to the Logger.
|
||||
PidPath string
|
||||
|
||||
// StdoutPath, StderrPath are the paths to the files that stdout and stderr
|
||||
// should be written to.
|
||||
StdoutPath, StderrPath string
|
||||
|
||||
// DisableDetach is used by tests that don't actually care about detached
|
||||
// child behaviour (i.e. outside proxy package) to bypass detaching and
|
||||
// daemonizing Daemons. This makes tests much simpler as they don't need to
|
||||
// implement a test-binary mode to enable self-exec daemonizing etc. and there
|
||||
// are fewer risks of detached processes being spawned and then not killed in
|
||||
// face of missed teardown/panic/interrupt of test runs etc.
|
||||
DisableDetach bool
|
||||
|
||||
// gracefulWait can be set for tests and controls how long Stop() will wait
|
||||
// for process to terminate before killing. If not set defaults to 5 seconds.
|
||||
// If this is lowered for tests, it must remain higher than pollInterval
|
||||
// (preferably a factor of 2 or more) or the tests will potentially return
|
||||
// errors from Stop() where the process races to Kill() a process that was
|
||||
// already stopped by SIGINT but we didn't yet detect the change since poll
|
||||
// didn't occur.
|
||||
// For tests, they can set this to change the default duration to wait
|
||||
// for a graceful quit.
|
||||
gracefulWait time.Duration
|
||||
|
||||
// pollInterval can be set for tests and controls how frequently the child
|
||||
// process is sent SIG 0 to check it's still running. If not set defaults to 1
|
||||
// second.
|
||||
pollInterval time.Duration
|
||||
|
||||
// daemonizeCmd is set only in tests to control the path and args to the
|
||||
// daemonize command.
|
||||
daemonizeCmd []string
|
||||
|
||||
// process is the started process
|
||||
lock sync.Mutex
|
||||
stopped bool
|
||||
|
@ -119,18 +87,6 @@ func (p *Daemon) Start() error {
|
|||
p.stopCh = stopCh
|
||||
p.exitedCh = exitedCh
|
||||
|
||||
// Ensure log dirs exist
|
||||
if p.StdoutPath != "" {
|
||||
if err := os.MkdirAll(path.Dir(p.StdoutPath), 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if p.StderrPath != "" {
|
||||
if err := os.MkdirAll(path.Dir(p.StderrPath), 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Start the loop.
|
||||
go p.keepAlive(stopCh, exitedCh)
|
||||
|
||||
|
@ -152,7 +108,7 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
|
|||
// attempts keeps track of the number of restart attempts we've had and
|
||||
// is used to calculate the wait time using an exponential backoff.
|
||||
var attemptsDeadline time.Time
|
||||
var attempts uint32
|
||||
var attempts uint
|
||||
|
||||
for {
|
||||
if process == nil {
|
||||
|
@ -165,15 +121,7 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
|
|||
|
||||
// Calculate the exponential backoff and wait if we have to
|
||||
if attempts > DaemonRestartBackoffMin {
|
||||
delayedAttempts := attempts - DaemonRestartBackoffMin
|
||||
waitTime := time.Duration(0)
|
||||
if delayedAttempts > 31 {
|
||||
// don't shift off the end of the uint32 if the process is in a crash
|
||||
// loop forever
|
||||
waitTime = DaemonRestartMaxWait
|
||||
} else {
|
||||
waitTime = (1 << delayedAttempts) * time.Second
|
||||
}
|
||||
waitTime := (1 << (attempts - DaemonRestartBackoffMin)) * time.Second
|
||||
if waitTime > DaemonRestartMaxWait {
|
||||
waitTime = DaemonRestartMaxWait
|
||||
}
|
||||
|
@ -205,8 +153,8 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
|
|||
return
|
||||
}
|
||||
|
||||
// Process isn't started currently. We're restarting. Start it and save
|
||||
// the process if we have it.
|
||||
// Process isn't started currently. We're restarting. Start it
|
||||
// and save the process if we have it.
|
||||
var err error
|
||||
process, err = p.start()
|
||||
if err == nil {
|
||||
|
@ -218,40 +166,34 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
|
|||
p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err)
|
||||
continue
|
||||
}
|
||||
// NOTE: it is a postcondition of this method that we don't return while
|
||||
// the process is still running. See NOTE below but it's essential that
|
||||
// from here to the <-waitCh below nothing can cause this method to exit
|
||||
// or loop if err is non-nil.
|
||||
|
||||
}
|
||||
|
||||
// Wait will never work since child is detached and released, so poll the
|
||||
// PID with sig 0 to check it's still alive.
|
||||
interval := p.pollInterval
|
||||
if interval < 1 {
|
||||
interval = 1 * time.Second
|
||||
// Wait for the process to exit. Note that if we restored this proxy
|
||||
// then Wait will always fail because we likely aren't the parent
|
||||
// process. Therefore, we do an extra sanity check after to use other
|
||||
// syscalls to verify the process is truly dead.
|
||||
ps, err := process.Wait()
|
||||
if _, err := findProcess(process.Pid); err == nil {
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
// We want a busy loop, but not too busy. 1 second between
|
||||
// detecting a process death seems reasonable.
|
||||
|
||||
case <-stopCh:
|
||||
// If we receive a stop request we want to exit immediately.
|
||||
return
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
waitCh, closer := externalWait(process.Pid, interval)
|
||||
defer closer()
|
||||
|
||||
// NOTE: we must not select on anything else here; Stop() requires the
|
||||
// postcondition for this method to be that the managed process is not
|
||||
// running when we return and the defer above closes p.exitedCh. If we
|
||||
// select on stopCh or another signal here we introduce races where we might
|
||||
// exit early and leave the actual process running (for example because
|
||||
// SIGINT was ignored but Stop saw us exit and assumed all was good). That
|
||||
// means that there is no way to stop the Daemon without killing the process
|
||||
// but that's OK because agent Shutdown can just persist the state and exit
|
||||
// without Stopping this and the right thing will happen. If we ever need to
|
||||
// stop managing a process without killing it at a time when the agent
|
||||
// process is not shutting down then we might have to re-think that.
|
||||
<-waitCh
|
||||
|
||||
// Note that we don't need to call Release explicitly. It's a no-op for Unix
|
||||
// but even on Windows it's called automatically by a Finalizer during
|
||||
// garbage collection to free the process handle.
|
||||
// (https://github.com/golang/go/blob/1174ad3a8f6f9d2318ac45fca3cd90f12915cf04/src/os/exec.go#L26)
|
||||
process = nil
|
||||
p.Logger.Printf("[INFO] agent/proxy: daemon exited")
|
||||
if err != nil {
|
||||
p.Logger.Printf("[INFO] agent/proxy: daemon exited with error: %s", err)
|
||||
} else if status, ok := exitStatus(ps); ok {
|
||||
p.Logger.Printf("[INFO] agent/proxy: daemon exited with exit code: %d", status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -259,78 +201,33 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) {
|
|||
// configured *exec.Command with the modifications documented on Daemon
|
||||
// such as setting the proxy token environmental variable.
|
||||
func (p *Daemon) start() (*os.Process, error) {
|
||||
cmd := *p.Command
|
||||
|
||||
// Add the proxy token to the environment. We first copy the env because
|
||||
// it is a slice and therefore the "copy" above will only copy the slice
|
||||
// reference. We allocate an exactly sized slice.
|
||||
baseEnv := os.Environ()
|
||||
env := make([]string, len(baseEnv), len(baseEnv)+2)
|
||||
copy(env, baseEnv)
|
||||
env = append(env,
|
||||
cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+1)
|
||||
copy(cmd.Env, p.Command.Env)
|
||||
cmd.Env = append(cmd.Env,
|
||||
fmt.Sprintf("%s=%s", EnvProxyId, p.ProxyId),
|
||||
fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
|
||||
|
||||
// Args must always contain a 0 entry which is usually the executed binary.
|
||||
// To be safe and a bit more robust we default this, but only to prevent
|
||||
// a panic below.
|
||||
if len(p.Args) == 0 {
|
||||
p.Args = []string{p.Path}
|
||||
if len(cmd.Args) == 0 {
|
||||
cmd.Args = []string{cmd.Path}
|
||||
}
|
||||
|
||||
// If we are running in a test mode that disabled detaching daemon processes
|
||||
// for simplicity, just exec the thing directly. This should never be the case
|
||||
// in real life since this config is not publically exposed but makes testing
|
||||
// way cleaner outside of this package.
|
||||
if p.DisableDetach {
|
||||
cmd := exec.Command(p.Path, p.Args[1:]...)
|
||||
err := cmd.Start()
|
||||
return cmd.Process, err
|
||||
}
|
||||
|
||||
// Watch closely, we now swap out the exec.Cmd args for ones to run the same
|
||||
// command via the connect daemonize command which takes care of correctly
|
||||
// "double forking" to ensure the child is fully detached and adopted by the
|
||||
// init process while the agent keeps running.
|
||||
var daemonCmd exec.Cmd
|
||||
|
||||
dCmd, err := p.daemonizeCommand()
|
||||
if err != nil {
|
||||
// Start it
|
||||
p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", cmd.Path, cmd.Args[1:])
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
daemonCmd.Path = dCmd[0]
|
||||
// First arguments are for the stdout, stderr
|
||||
daemonCmd.Args = append(dCmd, p.StdoutPath)
|
||||
daemonCmd.Args = append(daemonCmd.Args, p.StderrPath)
|
||||
daemonCmd.Args = append(daemonCmd.Args, p.Args...)
|
||||
daemonCmd.Env = env
|
||||
|
||||
// setup stdout so we can read the PID
|
||||
var out bytes.Buffer
|
||||
daemonCmd.Stdout = &out
|
||||
daemonCmd.Stderr = &out
|
||||
|
||||
// Run it to completion - it should exit immediately (this calls wait to
|
||||
// ensure we don't leave the daemonize command as a zombie)
|
||||
p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", daemonCmd.Path,
|
||||
daemonCmd.Args[1:])
|
||||
if err := daemonCmd.Run(); err != nil {
|
||||
p.Logger.Printf("[DEBUG] agent/proxy: daemonize output: %s", out.String())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read the PID from stdout
|
||||
outStr, err := out.ReadString('\n')
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pid, err := strconv.Atoi(strings.TrimSpace(outStr))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse PID from output of daemonize: %s",
|
||||
err)
|
||||
}
|
||||
|
||||
// Write the pid file. This might error and that's okay.
|
||||
if p.PidPath != "" {
|
||||
pid := strconv.Itoa(pid)
|
||||
pid := strconv.FormatInt(int64(cmd.Process.Pid), 10)
|
||||
if err := file.WriteAtomic(p.PidPath, []byte(pid)); err != nil {
|
||||
p.Logger.Printf(
|
||||
"[DEBUG] agent/proxy: error writing pid file %q: %s",
|
||||
|
@ -338,38 +235,7 @@ func (p *Daemon) start() (*os.Process, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Finally, adopt the process so we can send signals
|
||||
return findProcess(pid)
|
||||
}
|
||||
|
||||
// daemonizeCommand returns the daemonize command.
|
||||
func (p *Daemon) daemonizeCommand() ([]string, error) {
|
||||
// Test override
|
||||
if p.daemonizeCmd != nil {
|
||||
return p.daemonizeCmd, nil
|
||||
}
|
||||
// Get the path to the current executable. This is cached once by the library
|
||||
// so this is effectively just a variable read.
|
||||
execPath, err := os.Executable()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Sanity check to prevent runaway test invocations because test didn't setup
|
||||
// daemonizeCmd correctly. This is kinda jank but go doesn't have a way to
|
||||
// detect and alter behaviour in test binaries by design. In this case though
|
||||
// we really need to never allow tests to self-execute which can cause
|
||||
// recursive explosion of test runs. This check seems safe for current go
|
||||
// tooling based on https://github.com/golang/go/issues/12120. If you hit
|
||||
// this, you need to find a way to configure your test
|
||||
// agent/proxyManager/Daemon to use agent/proxy/TestHelperProcess to run
|
||||
// daemonize in a safe way. TestAgent should do this automatically by default.
|
||||
if strings.HasSuffix(execPath, ".test") ||
|
||||
strings.HasSuffix(execPath, ".test.exe") {
|
||||
panic("test did not setup daemonizeCmd override and will dangerously" +
|
||||
" self-execute the test binary.")
|
||||
}
|
||||
|
||||
return []string{execPath, "connect", "daemonize"}, nil
|
||||
return cmd.Process, nil
|
||||
}
|
||||
|
||||
// Stop stops the daemon.
|
||||
|
@ -417,7 +283,7 @@ func (p *Daemon) Stop() error {
|
|||
}()
|
||||
}
|
||||
|
||||
// First, try a graceful stop.
|
||||
// First, try a graceful stop
|
||||
err := process.Signal(os.Interrupt)
|
||||
if err == nil {
|
||||
select {
|
||||
|
@ -427,28 +293,16 @@ func (p *Daemon) Stop() error {
|
|||
|
||||
case <-time.After(gracefulWait):
|
||||
// Interrupt didn't work
|
||||
p.Logger.Printf("[DEBUG] agent/proxy: graceful wait of %s passed, "+
|
||||
"killing", gracefulWait)
|
||||
}
|
||||
} else if isProcessAlreadyFinishedErr(err) {
|
||||
// This can happen due to races between signals and polling.
|
||||
return nil
|
||||
} else {
|
||||
p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err)
|
||||
}
|
||||
|
||||
// Graceful didn't work (e.g. on windows where SIGINT isn't implemented),
|
||||
// forcibly kill
|
||||
err = process.Kill()
|
||||
if err != nil && isProcessAlreadyFinishedErr(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
// Graceful didn't work, forcibly kill
|
||||
return process.Kill()
|
||||
}
|
||||
|
||||
// Close implements Proxy by stopping the run loop but not killing the process.
|
||||
// One Close is called, Stop has no effect.
|
||||
func (p *Daemon) Close() error {
|
||||
// stopKeepAlive is like Stop but keeps the process running. This is
|
||||
// used only for tests.
|
||||
func (p *Daemon) stopKeepAlive() error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
|
@ -475,8 +329,10 @@ func (p *Daemon) Equal(raw Proxy) bool {
|
|||
|
||||
// We compare equality on a subset of the command configuration
|
||||
return p.ProxyToken == p2.ProxyToken &&
|
||||
p.Path == p2.Path &&
|
||||
reflect.DeepEqual(p.Args, p2.Args)
|
||||
p.Command.Path == p2.Command.Path &&
|
||||
p.Command.Dir == p2.Command.Dir &&
|
||||
reflect.DeepEqual(p.Command.Args, p2.Command.Args) &&
|
||||
reflect.DeepEqual(p.Command.Env, p2.Command.Env)
|
||||
}
|
||||
|
||||
// MarshalSnapshot implements Proxy
|
||||
|
@ -490,10 +346,12 @@ func (p *Daemon) MarshalSnapshot() map[string]interface{} {
|
|||
}
|
||||
|
||||
return map[string]interface{}{
|
||||
"Pid": p.process.Pid,
|
||||
"Path": p.Path,
|
||||
"Args": p.Args,
|
||||
"ProxyToken": p.ProxyToken,
|
||||
"Pid": p.process.Pid,
|
||||
"CommandPath": p.Command.Path,
|
||||
"CommandArgs": p.Command.Args,
|
||||
"CommandDir": p.Command.Dir,
|
||||
"CommandEnv": p.Command.Env,
|
||||
"ProxyToken": p.ProxyToken,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -509,8 +367,12 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
|
|||
|
||||
// Set the basic fields
|
||||
p.ProxyToken = s.ProxyToken
|
||||
p.Path = s.Path
|
||||
p.Args = s.Args
|
||||
p.Command = &exec.Cmd{
|
||||
Path: s.CommandPath,
|
||||
Args: s.CommandArgs,
|
||||
Dir: s.CommandDir,
|
||||
Env: s.CommandEnv,
|
||||
}
|
||||
|
||||
// FindProcess on many systems returns no error even if the process
|
||||
// is now dead. We perform an extra check that the process is alive.
|
||||
|
@ -536,12 +398,14 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error {
|
|||
// within the manager snapshot and is restored automatically.
|
||||
type daemonSnapshot struct {
|
||||
// Pid of the process. This is the only value actually required to
|
||||
// regain management control. The remainder values are for Equal.
|
||||
// regain mangement control. The remainder values are for Equal.
|
||||
Pid int
|
||||
|
||||
// Command information
|
||||
Path string
|
||||
Args []string
|
||||
CommandPath string
|
||||
CommandArgs []string
|
||||
CommandDir string
|
||||
CommandEnv []string
|
||||
|
||||
// NOTE(mitchellh): longer term there are discussions/plans to only
|
||||
// store the hash of the token but for now we need the full token in
|
||||
|
|
|
@ -3,8 +3,8 @@ package proxy
|
|||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -28,10 +28,12 @@ func TestDaemonStartStop(t *testing.T) {
|
|||
uuid, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
|
||||
d := helperProcessDaemon("start-stop", path)
|
||||
d.ProxyId = "tubes"
|
||||
d.ProxyToken = uuid
|
||||
d.Logger = testLogger
|
||||
d := &Daemon{
|
||||
Command: helperProcess("start-stop", path),
|
||||
ProxyId: "tubes",
|
||||
ProxyToken: uuid,
|
||||
Logger: testLogger,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
defer d.Stop()
|
||||
|
||||
|
@ -66,78 +68,6 @@ func TestDaemonStartStop(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestDaemonDetachesChild(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
td, closer := testTempDir(t)
|
||||
defer closer()
|
||||
|
||||
path := filepath.Join(td, "file")
|
||||
pidPath := filepath.Join(td, "child.pid")
|
||||
|
||||
// Start the parent process wrapping a start-stop test. The parent is acting
|
||||
// as our "agent". We need an extra indirection to be able to kill the "agent"
|
||||
// and still be running the test process.
|
||||
parentCmd := helperProcess("parent", pidPath, "start-stop", path)
|
||||
require.NoError(parentCmd.Start())
|
||||
|
||||
// Wait for the pid file to exist so we know parent is running
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(pidPath)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.Fatalf("error: %s", err)
|
||||
})
|
||||
|
||||
// And wait for the actual file to be sure the child is running (it should be
|
||||
// since parent doesn't write PID until child starts but the child might not
|
||||
// have completed the write to disk yet which causes flakiness below).
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.Fatalf("error: %s", err)
|
||||
})
|
||||
|
||||
// Always cleanup child process after
|
||||
defer func() {
|
||||
_, err := os.Stat(pidPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
bs, err := ioutil.ReadFile(pidPath)
|
||||
require.NoError(err)
|
||||
pid, err := strconv.Atoi(string(bs))
|
||||
require.NoError(err)
|
||||
proc, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
proc.Kill()
|
||||
}()
|
||||
|
||||
time.Sleep(20 * time.Second)
|
||||
|
||||
// Now kill the parent and wait for it
|
||||
require.NoError(parentCmd.Process.Kill())
|
||||
|
||||
_, err := parentCmd.Process.Wait()
|
||||
require.NoError(err)
|
||||
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
// The child should still be running so file should still be there AND child processid should still be there
|
||||
_, err = os.Stat(path)
|
||||
require.NoError(err, "child should still be running")
|
||||
|
||||
// Let defer clean up the child process
|
||||
}
|
||||
|
||||
func TestDaemonRestart(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -146,8 +76,10 @@ func TestDaemonRestart(t *testing.T) {
|
|||
defer closer()
|
||||
path := filepath.Join(td, "file")
|
||||
|
||||
d := helperProcessDaemon("restart", path)
|
||||
d.Logger = testLogger
|
||||
d := &Daemon{
|
||||
Command: helperProcess("restart", path),
|
||||
Logger: testLogger,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
defer d.Stop()
|
||||
|
||||
|
@ -179,11 +111,12 @@ func TestDaemonStop_kill(t *testing.T) {
|
|||
|
||||
path := filepath.Join(td, "file")
|
||||
|
||||
d := helperProcessDaemon("stop-kill", path)
|
||||
d.ProxyToken = "hello"
|
||||
d.Logger = testLogger
|
||||
d.gracefulWait = 200 * time.Millisecond
|
||||
d.pollInterval = 100 * time.Millisecond
|
||||
d := &Daemon{
|
||||
Command: helperProcess("stop-kill", path),
|
||||
ProxyToken: "hello",
|
||||
Logger: testLogger,
|
||||
gracefulWait: 200 * time.Millisecond,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
|
||||
// Wait for the file to exist
|
||||
|
@ -199,7 +132,7 @@ func TestDaemonStop_kill(t *testing.T) {
|
|||
// Stop the process
|
||||
require.NoError(d.Stop())
|
||||
|
||||
// Stat the file so that we can get the mtime
|
||||
// State the file so that we can get the mtime
|
||||
fi, err := os.Stat(path)
|
||||
require.NoError(err)
|
||||
mtime := fi.ModTime()
|
||||
|
@ -216,7 +149,6 @@ func TestDaemonStart_pidFile(t *testing.T) {
|
|||
|
||||
require := require.New(t)
|
||||
td, closer := testTempDir(t)
|
||||
|
||||
defer closer()
|
||||
|
||||
path := filepath.Join(td, "file")
|
||||
|
@ -224,10 +156,12 @@ func TestDaemonStart_pidFile(t *testing.T) {
|
|||
uuid, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
|
||||
d := helperProcessDaemon("start-once", path)
|
||||
d.ProxyToken = uuid
|
||||
d.Logger = testLogger
|
||||
d.PidPath = pidPath
|
||||
d := &Daemon{
|
||||
Command: helperProcess("start-once", path),
|
||||
ProxyToken: uuid,
|
||||
Logger: testLogger,
|
||||
PidPath: pidPath,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
defer d.Stop()
|
||||
|
||||
|
@ -264,9 +198,11 @@ func TestDaemonRestart_pidFile(t *testing.T) {
|
|||
path := filepath.Join(td, "file")
|
||||
pidPath := filepath.Join(td, "pid")
|
||||
|
||||
d := helperProcessDaemon("restart", path)
|
||||
d.Logger = testLogger
|
||||
d.PidPath = pidPath
|
||||
d := &Daemon{
|
||||
Command: helperProcess("restart", path),
|
||||
Logger: testLogger,
|
||||
PidPath: pidPath,
|
||||
}
|
||||
require.NoError(d.Start())
|
||||
defer d.Stop()
|
||||
|
||||
|
@ -308,32 +244,51 @@ func TestDaemonEqual(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
"Different type",
|
||||
&Daemon{},
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{},
|
||||
},
|
||||
&Noop{},
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"Nil",
|
||||
&Daemon{},
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{},
|
||||
},
|
||||
nil,
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"Equal",
|
||||
&Daemon{},
|
||||
&Daemon{},
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{},
|
||||
},
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{},
|
||||
},
|
||||
true,
|
||||
},
|
||||
|
||||
{
|
||||
"Different path",
|
||||
&Daemon{
|
||||
Path: "/foo",
|
||||
Command: &exec.Cmd{Path: "/foo"},
|
||||
},
|
||||
&Daemon{
|
||||
Path: "/bar",
|
||||
Command: &exec.Cmd{Path: "/bar"},
|
||||
},
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"Different dir",
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{Dir: "/foo"},
|
||||
},
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{Dir: "/bar"},
|
||||
},
|
||||
false,
|
||||
},
|
||||
|
@ -341,10 +296,10 @@ func TestDaemonEqual(t *testing.T) {
|
|||
{
|
||||
"Different args",
|
||||
&Daemon{
|
||||
Args: []string{"foo"},
|
||||
Command: &exec.Cmd{Args: []string{"foo"}},
|
||||
},
|
||||
&Daemon{
|
||||
Args: []string{"bar"},
|
||||
Command: &exec.Cmd{Args: []string{"bar"}},
|
||||
},
|
||||
false,
|
||||
},
|
||||
|
@ -352,9 +307,11 @@ func TestDaemonEqual(t *testing.T) {
|
|||
{
|
||||
"Different token",
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{},
|
||||
ProxyToken: "one",
|
||||
},
|
||||
&Daemon{
|
||||
Command: &exec.Cmd{},
|
||||
ProxyToken: "two",
|
||||
},
|
||||
false,
|
||||
|
@ -378,7 +335,7 @@ func TestDaemonMarshalSnapshot(t *testing.T) {
|
|||
{
|
||||
"stopped daemon",
|
||||
&Daemon{
|
||||
Path: "/foo",
|
||||
Command: &exec.Cmd{Path: "/foo"},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
|
@ -386,14 +343,16 @@ func TestDaemonMarshalSnapshot(t *testing.T) {
|
|||
{
|
||||
"basic",
|
||||
&Daemon{
|
||||
Path: "/foo",
|
||||
Command: &exec.Cmd{Path: "/foo"},
|
||||
process: &os.Process{Pid: 42},
|
||||
},
|
||||
map[string]interface{}{
|
||||
"Pid": 42,
|
||||
"Path": "/foo",
|
||||
"Args": []string(nil),
|
||||
"ProxyToken": "",
|
||||
"Pid": 42,
|
||||
"CommandPath": "/foo",
|
||||
"CommandArgs": []string(nil),
|
||||
"CommandDir": "",
|
||||
"CommandEnv": []string(nil),
|
||||
"ProxyToken": "",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -417,9 +376,11 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) {
|
|||
uuid, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
|
||||
d := helperProcessDaemon("start-stop", path)
|
||||
d.ProxyToken = uuid
|
||||
d.Logger = testLogger
|
||||
d := &Daemon{
|
||||
Command: helperProcess("start-stop", path),
|
||||
ProxyToken: uuid,
|
||||
Logger: testLogger,
|
||||
}
|
||||
defer d.Stop()
|
||||
require.NoError(d.Start())
|
||||
|
||||
|
@ -437,7 +398,7 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) {
|
|||
snap := d.MarshalSnapshot()
|
||||
|
||||
// Stop the original daemon but keep it alive
|
||||
require.NoError(d.Close())
|
||||
require.NoError(d.stopKeepAlive())
|
||||
|
||||
// Restore the second daemon
|
||||
d2 := &Daemon{Logger: testLogger}
|
||||
|
@ -469,9 +430,11 @@ func TestDaemonUnmarshalSnapshot_notRunning(t *testing.T) {
|
|||
uuid, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
|
||||
d := helperProcessDaemon("start-stop", path)
|
||||
d.ProxyToken = uuid
|
||||
d.Logger = testLogger
|
||||
d := &Daemon{
|
||||
Command: helperProcess("start-stop", path),
|
||||
ProxyToken: uuid,
|
||||
Logger: testLogger,
|
||||
}
|
||||
defer d.Stop()
|
||||
require.NoError(d.Start())
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -84,16 +85,6 @@ type Manager struct {
|
|||
CoalescePeriod time.Duration
|
||||
QuiescentPeriod time.Duration
|
||||
|
||||
// DisableDetach is used by tests that don't actually care about detached
|
||||
// child behaviour (i.e. outside proxy package) to bypass detaching and
|
||||
// daemonizing Daemons. This makes tests much simpler as they don't need to
|
||||
// implement a test-binary mode to enable self-exec daemonizing etc. and there
|
||||
// are fewer risks of detached processes being spawned and then not killed in
|
||||
// face of missed teardown/panic/interrupt of test runs etc. It's public since
|
||||
// it needs to be configurable from the agent package when setting up the
|
||||
// proxyManager instance.
|
||||
DisableDetach bool
|
||||
|
||||
// lock is held while reading/writing any internal state of the manager.
|
||||
// cond is a condition variable on lock that is broadcasted for runState
|
||||
// changes.
|
||||
|
@ -112,10 +103,6 @@ type Manager struct {
|
|||
// proxies (unlikely scenario).
|
||||
lastSnapshot *snapshot
|
||||
|
||||
// daemonizeCmd is set only in tests to control the path and args to the
|
||||
// daemonize command.
|
||||
daemonizeCmd []string
|
||||
|
||||
proxies map[string]Proxy
|
||||
}
|
||||
|
||||
|
@ -161,35 +148,7 @@ func (m *Manager) Close() error {
|
|||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
return m.stop(func(p Proxy) error {
|
||||
return p.Close()
|
||||
})
|
||||
}
|
||||
|
||||
// Kill will Close the manager and Kill all proxies that were being managed.
|
||||
// Only ONE of Kill or Close must be called. If Close has been called already
|
||||
// then this will have no effect.
|
||||
func (m *Manager) Kill() error {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
return m.stop(func(p Proxy) error {
|
||||
return p.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// stop stops the run loop and cleans up all the proxies by calling
|
||||
// the given cleaner. If the cleaner returns an error the proxy won't be
|
||||
// removed from the map.
|
||||
//
|
||||
// The lock must be held while this is called.
|
||||
func (m *Manager) stop(cleaner func(Proxy) error) error {
|
||||
for {
|
||||
// Special case state that exits the for loop
|
||||
if m.runState == managerStateStopped {
|
||||
break
|
||||
}
|
||||
|
||||
switch m.runState {
|
||||
case managerStateIdle:
|
||||
// Idle so just set it to stopped and return. We notify
|
||||
|
@ -208,13 +167,29 @@ func (m *Manager) stop(cleaner func(Proxy) error) error {
|
|||
case managerStateStopping:
|
||||
// Still stopping, wait...
|
||||
m.cond.Wait()
|
||||
|
||||
case managerStateStopped:
|
||||
// Stopped, target state reached
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Kill will Close the manager and Kill all proxies that were being managed.
|
||||
//
|
||||
// This is safe to call with Close already called since Close is idempotent.
|
||||
func (m *Manager) Kill() error {
|
||||
// Close first so that we aren't getting changes in proxies
|
||||
if err := m.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
// Clean up all the proxies
|
||||
var err error
|
||||
for id, proxy := range m.proxies {
|
||||
if err := cleaner(proxy); err != nil {
|
||||
if err := proxy.Stop(); err != nil {
|
||||
err = multierror.Append(
|
||||
err, fmt.Errorf("failed to stop proxy %q: %s", id, err))
|
||||
continue
|
||||
|
@ -426,13 +401,18 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
|
|||
return nil, fmt.Errorf("daemon mode managed proxy requires command")
|
||||
}
|
||||
|
||||
// Build the command to execute.
|
||||
var cmd exec.Cmd
|
||||
cmd.Path = command[0]
|
||||
cmd.Args = command // idx 0 is path but preserved since it should be
|
||||
if err := m.configureLogDir(id, &cmd); err != nil {
|
||||
return nil, fmt.Errorf("error configuring proxy logs: %s", err)
|
||||
}
|
||||
|
||||
// Build the daemon structure
|
||||
proxy.Path = command[0]
|
||||
proxy.Args = command // idx 0 is path but preserved since it should be
|
||||
proxy.Command = &cmd
|
||||
proxy.ProxyId = id
|
||||
proxy.ProxyToken = mp.ProxyToken
|
||||
proxy.daemonizeCmd = m.daemonizeCmd
|
||||
proxy.DisableDetach = m.DisableDetach
|
||||
return proxy, nil
|
||||
|
||||
default:
|
||||
|
@ -447,10 +427,8 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy
|
|||
switch mode {
|
||||
case structs.ProxyExecModeDaemon:
|
||||
return &Daemon{
|
||||
Logger: m.Logger,
|
||||
StdoutPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stdout"),
|
||||
StderrPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stderr"),
|
||||
PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
|
||||
Logger: m.Logger,
|
||||
PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
|
||||
}, nil
|
||||
|
||||
default:
|
||||
|
@ -458,6 +436,41 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy
|
|||
}
|
||||
}
|
||||
|
||||
// configureLogDir sets up the file descriptors to stdout/stderr so that
|
||||
// they log to the proper file path for the given service ID.
|
||||
func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error {
|
||||
// Create the log directory
|
||||
logDir := ""
|
||||
if m.DataDir != "" {
|
||||
logDir = filepath.Join(m.DataDir, "logs")
|
||||
if err := os.MkdirAll(logDir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Configure the stdout, stderr paths
|
||||
stdoutPath := logPath(logDir, id, "stdout")
|
||||
stderrPath := logPath(logDir, id, "stderr")
|
||||
|
||||
// Open the files. We want to append to each. We expect these files
|
||||
// to be rotated by some external process.
|
||||
stdoutF, err := os.OpenFile(stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating stdout file: %s", err)
|
||||
}
|
||||
stderrF, err := os.OpenFile(stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
// Don't forget to close stdoutF which successfully opened
|
||||
stdoutF.Close()
|
||||
|
||||
return fmt.Errorf("error creating stderr file: %s", err)
|
||||
}
|
||||
|
||||
cmd.Stdout = stdoutF
|
||||
cmd.Stderr = stderrF
|
||||
return nil
|
||||
}
|
||||
|
||||
// logPath is a helper to return the path to the log file for the given
|
||||
// directory, service ID, and stream type (stdout or stderr).
|
||||
func logPath(dir, id, stream string) string {
|
||||
|
|
|
@ -324,7 +324,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
|
|||
|
||||
// Add a second proxy so that we can determine when we're up
|
||||
// and running.
|
||||
path2 := filepath.Join(td, "file2")
|
||||
path2 := filepath.Join(td, "file")
|
||||
testStateProxy(t, state, "db", helperProcess("start-stop", path2))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path2)
|
||||
|
@ -343,7 +343,7 @@ func TestManagerRun_snapshotRestore(t *testing.T) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
r.Fatalf("file still exists: %s", path)
|
||||
r.Fatalf("file still exists")
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -361,10 +361,6 @@ func testManager(t *testing.T) (*Manager, func()) {
|
|||
td, closer := testTempDir(t)
|
||||
m.DataDir = td
|
||||
|
||||
// Override daemonize command to use the built-in test binary. Note that Args
|
||||
// includes the binary path as first arg.
|
||||
m.daemonizeCmd = helperProcess("daemonize").Args
|
||||
|
||||
return m, func() { closer() }
|
||||
}
|
||||
|
||||
|
@ -372,9 +368,8 @@ func testManager(t *testing.T) (*Manager, func()) {
|
|||
// (expected to be from the helperProcess function call). It returns the
|
||||
// ID for deregistration.
|
||||
func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.Cmd) string {
|
||||
// Note that exec.Command already ensures the command name is the first
|
||||
// argument in the list so no need to append again
|
||||
command := cmd.Args
|
||||
command := []string{cmd.Path}
|
||||
command = append(command, cmd.Args...)
|
||||
|
||||
require.NoError(t, state.AddService(&structs.NodeService{
|
||||
Service: service,
|
||||
|
@ -384,7 +379,7 @@ func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec.
|
|||
ExecMode: structs.ProxyExecModeDaemon,
|
||||
Command: command,
|
||||
TargetServiceID: service,
|
||||
}, "token", "")
|
||||
}, "token")
|
||||
require.NoError(t, err)
|
||||
|
||||
return p.Proxy.ProxyService.ID
|
||||
|
|
|
@ -5,7 +5,6 @@ type Noop struct{}
|
|||
|
||||
func (p *Noop) Start() error { return nil }
|
||||
func (p *Noop) Stop() error { return nil }
|
||||
func (p *Noop) Close() error { return nil }
|
||||
func (p *Noop) Equal(Proxy) bool { return true }
|
||||
func (p *Noop) MarshalSnapshot() map[string]interface{} { return nil }
|
||||
func (p *Noop) UnmarshalSnapshot(map[string]interface{}) error { return nil }
|
||||
|
|
|
@ -1,42 +0,0 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// isProcessAlreadyFinishedErr does a janky comparison with an error string
|
||||
// defined in os/exec_unix.go and os/exec_windows.go which we encounter due to
|
||||
// races with polling the external process. These case tests to fail since Stop
|
||||
// returns an error sometimes so we should notice if this string stops matching
|
||||
// the error in a future go version.
|
||||
func isProcessAlreadyFinishedErr(err error) bool {
|
||||
return strings.Contains(err.Error(), "os: process already finished")
|
||||
}
|
||||
|
||||
// externalWait mimics process.Wait for an external process. The returned
|
||||
// channel is closed when the process exits. It works by polling the process
|
||||
// with signal 0 and verifying no error is returned. A closer func is also
|
||||
// returned that can be invoked to terminate the waiter early.
|
||||
func externalWait(pid int, pollInterval time.Duration) (<-chan struct{}, func()) {
|
||||
ch := make(chan struct{})
|
||||
stopCh := make(chan struct{})
|
||||
closer := func() {
|
||||
close(stopCh)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
if _, err := findProcess(pid); err != nil {
|
||||
close(ch)
|
||||
return
|
||||
}
|
||||
time.Sleep(pollInterval)
|
||||
}
|
||||
}()
|
||||
return ch, closer
|
||||
}
|
|
@ -1,70 +0,0 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/testutil/retry"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExternalWait(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
td, closer := testTempDir(t)
|
||||
defer closer()
|
||||
path := filepath.Join(td, "file")
|
||||
|
||||
cmd := helperProcess("restart", path)
|
||||
require.NoError(cmd.Start())
|
||||
exitCh := make(chan struct{})
|
||||
// Launch waiter to make sure this process isn't zombified when it exits part
|
||||
// way through the test.
|
||||
go func() {
|
||||
cmd.Process.Wait()
|
||||
close(exitCh)
|
||||
}()
|
||||
defer cmd.Process.Kill()
|
||||
|
||||
// Create waiter
|
||||
pollInterval := 1 * time.Millisecond
|
||||
waitCh, closer := externalWait(cmd.Process.Pid, pollInterval)
|
||||
defer closer()
|
||||
|
||||
// Wait for the file to exist so we don't rely on timing to not race with
|
||||
// process startup.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, err := os.Stat(path)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.Fatalf("error: %s", err)
|
||||
})
|
||||
|
||||
// waitCh should not be closed until process quits. We'll wait a bit to verify
|
||||
// we weren't just too quick to see a process exit
|
||||
select {
|
||||
case <-waitCh:
|
||||
t.Fatal("waitCh should not be closed yet")
|
||||
default:
|
||||
}
|
||||
|
||||
// Delete the file
|
||||
require.NoError(os.Remove(path))
|
||||
|
||||
// Wait for the child to actually exit cleanly
|
||||
<-exitCh
|
||||
|
||||
// Now we _should_ see waitCh close (need to wait at least a whole poll
|
||||
// interval)
|
||||
select {
|
||||
case <-waitCh:
|
||||
// OK
|
||||
case <-time.After(10 * pollInterval):
|
||||
t.Fatal("waitCh should be closed")
|
||||
}
|
||||
}
|
|
@ -7,8 +7,8 @@ import (
|
|||
)
|
||||
|
||||
func findProcess(pid int) (*os.Process, error) {
|
||||
// On Windows, os.FindProcess will error if the process is not alive, so we
|
||||
// don't have to do any further checking. The nature of it being non-nil means
|
||||
// it seems to be healthy.
|
||||
// On Windows, os.FindProcess will error if the process is not alive,
|
||||
// so we don't have to do any further checking. The nature of it being
|
||||
// non-nil means it seems to be healthy.
|
||||
return os.FindProcess(pid)
|
||||
}
|
||||
|
|
|
@ -40,18 +40,12 @@ type Proxy interface {
|
|||
Start() error
|
||||
|
||||
// Stop stops the proxy and disallows it from ever being started again.
|
||||
// This should also clean up any resources used by this Proxy.
|
||||
//
|
||||
// If the proxy is not started yet, this should not return an error, but
|
||||
// it should disallow Start from working again. If the proxy is already
|
||||
// stopped, this should not return an error.
|
||||
Stop() error
|
||||
|
||||
// Close should clean up any resources associated with this proxy but
|
||||
// keep it running in the background. Only one of Close or Stop can be
|
||||
// called.
|
||||
Close() error
|
||||
|
||||
// Equal returns true if the argument is equal to the proxy being called.
|
||||
// This is called by the manager to determine if a change in configuration
|
||||
// results in a proxy that needs to be restarted or not. If Equal returns
|
||||
|
|
|
@ -7,12 +7,8 @@ import (
|
|||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/command/connect/daemonize"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
// testLogger is a logger that can be used by tests that require a
|
||||
|
@ -53,25 +49,6 @@ func helperProcess(s ...string) *exec.Cmd {
|
|||
return cmd
|
||||
}
|
||||
|
||||
// helperProcessDaemon returns a *Daemon that can be used to execute the
|
||||
// TestHelperProcess function below. This can be used to test multi-process
|
||||
// interactions. The Daemon has it's Path, Args and stdio paths populated but
|
||||
// other fields might need to be set depending on test requirements.
|
||||
//
|
||||
// This relies on the TestMainInterceptDaemonize hack being active.
|
||||
func helperProcessDaemon(s ...string) *Daemon {
|
||||
cs := []string{os.Args[0], "-test.run=TestHelperProcess", "--", helperProcessSentinel}
|
||||
cs = append(cs, s...)
|
||||
|
||||
return &Daemon{
|
||||
Path: os.Args[0],
|
||||
Args: cs,
|
||||
StdoutPath: "_", // dev null them for now
|
||||
StderrPath: "_",
|
||||
daemonizeCmd: helperProcess("daemonize").Args,
|
||||
}
|
||||
}
|
||||
|
||||
// This is not a real test. This is just a helper process kicked off by tests
|
||||
// using the helperProcess helper function.
|
||||
func TestHelperProcess(t *testing.T) {
|
||||
|
@ -178,53 +155,6 @@ func TestHelperProcess(t *testing.T) {
|
|||
|
||||
<-make(chan struct{})
|
||||
|
||||
// Parent runs the given process in a Daemon and then sleeps until the test
|
||||
// code kills it. It exists to test that the Daemon-managed child process
|
||||
// survives it's parent exiting which we can't test directly without exiting
|
||||
// the test process so we need an extra level of indirection. The test code
|
||||
// using this must pass a file path as the first argument for the child
|
||||
// processes PID to be written and then must take care to clean up that PID
|
||||
// later or the child will be left running forever.
|
||||
case "parent":
|
||||
// We will write the PID for the child to the file in the first argument
|
||||
// then pass rest of args through to command.
|
||||
pidFile := args[0]
|
||||
d := helperProcess(args[1:]...)
|
||||
if err := d.Start(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Write PID
|
||||
pidBs := []byte(strconv.Itoa(d.Process.Pid))
|
||||
if err := ioutil.WriteFile(pidFile, pidBs, 0644); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
|
||||
// TODO: Also kill the detached process (once it is detached)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Wait "forever" (calling test chooses when we exit with signal/Wait to
|
||||
// minimise coordination).
|
||||
for {
|
||||
time.Sleep(time.Hour)
|
||||
}
|
||||
|
||||
case "daemonize":
|
||||
// Run daemonize!
|
||||
ui := &cli.BasicUi{Writer: os.Stdout, ErrorWriter: os.Stderr}
|
||||
cli := &cli.CLI{
|
||||
Args: append([]string{"daemonize"}, args...),
|
||||
Commands: map[string]cli.CommandFactory{
|
||||
"daemonize": func() (cli.Command, error) {
|
||||
return daemonize.New(ui), nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
exitCode, err := cli.Run()
|
||||
if err != nil {
|
||||
log.Printf("[ERR] running hijacked daemonize command: %s", err)
|
||||
}
|
||||
os.Exit(exitCode)
|
||||
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd)
|
||||
os.Exit(2)
|
||||
|
|
|
@ -376,14 +376,6 @@ func TestConfig(sources ...config.Source) *config.RuntimeConfig {
|
|||
fmt.Println("WARNING:", w)
|
||||
}
|
||||
|
||||
// Set internal flag to simplify connect daemon execution. We test the full
|
||||
// daemonization behaviour explicitly in `proxy` package, everywhere else it's
|
||||
// just super painful to setup self-executable daemonize commands etc. For now
|
||||
// this is not overridable because it's simpler not to expose this config
|
||||
// publically at all but we could revisit that later if there is a legitimate
|
||||
// reason to want to test full detached daemon behaviour with a TestAgent.
|
||||
cfg.ConnectDisableDetachedDaemons = true
|
||||
|
||||
return &cfg
|
||||
}
|
||||
|
||||
|
|
|
@ -1140,7 +1140,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
|
|||
Port: 8000,
|
||||
Connect: &AgentServiceConnect{
|
||||
Proxy: &AgentServiceConnectProxy{
|
||||
Command: []string{"consul", "connect", "proxy"},
|
||||
Command: []string{"consul connect proxy"},
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
|
@ -1157,7 +1157,7 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
|
|||
ProxyServiceID: "foo-proxy",
|
||||
TargetServiceID: "foo",
|
||||
TargetServiceName: "foo",
|
||||
ContentHash: "2a29f8237db69d0e",
|
||||
ContentHash: "93baee1d838888ae",
|
||||
ExecMode: "daemon",
|
||||
Command: []string{"consul", "connect", "proxy"},
|
||||
Config: map[string]interface{}{
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"github.com/hashicorp/consul/command/connect/ca"
|
||||
caget "github.com/hashicorp/consul/command/connect/ca/get"
|
||||
caset "github.com/hashicorp/consul/command/connect/ca/set"
|
||||
"github.com/hashicorp/consul/command/connect/daemonize"
|
||||
"github.com/hashicorp/consul/command/connect/proxy"
|
||||
"github.com/hashicorp/consul/command/event"
|
||||
"github.com/hashicorp/consul/command/exec"
|
||||
|
@ -75,7 +74,6 @@ func init() {
|
|||
Register("connect ca get-config", func(ui cli.Ui) (cli.Command, error) { return caget.New(ui), nil })
|
||||
Register("connect ca set-config", func(ui cli.Ui) (cli.Command, error) { return caset.New(ui), nil })
|
||||
Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil })
|
||||
RegisterHidden("connect daemonize", func(ui cli.Ui) (cli.Command, error) { return daemonize.New(ui), nil })
|
||||
Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil })
|
||||
Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil })
|
||||
Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil })
|
||||
|
|
|
@ -1,88 +0,0 @@
|
|||
package daemonize
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
_ "net/http/pprof"
|
||||
"os" // Expose pprof if configured
|
||||
"os/exec"
|
||||
"syscall"
|
||||
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func New(ui cli.Ui) *cmd {
|
||||
return &cmd{UI: ui}
|
||||
}
|
||||
|
||||
type cmd struct {
|
||||
UI cli.Ui
|
||||
|
||||
stdoutPath string
|
||||
stderrPath string
|
||||
cmdArgs []string
|
||||
}
|
||||
|
||||
func (c *cmd) Run(args []string) int {
|
||||
numArgs := len(args)
|
||||
if numArgs < 3 {
|
||||
c.UI.Error("Need at least 3 arguments; stdoutPath, stdinPath, " +
|
||||
"executablePath [arguments...]")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c.stdoutPath, c.stderrPath = args[0], args[1]
|
||||
c.cmdArgs = args[2:] // includes the executable as arg 0 as expected
|
||||
|
||||
// Open log files if specified
|
||||
var stdoutF, stderrF *os.File
|
||||
var err error
|
||||
if c.stdoutPath != "_" {
|
||||
stdoutF, err = os.OpenFile(c.stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("error creating stdout file: %s", err))
|
||||
return 1
|
||||
}
|
||||
}
|
||||
if c.stderrPath != "_" {
|
||||
stderrF, err = os.OpenFile(c.stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("error creating stderr file: %s", err))
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
// Exec the command passed in a new session then exit to ensure it's adopted
|
||||
// by the init process. Use the passed file paths for std out/err.
|
||||
cmd := &exec.Cmd{
|
||||
Path: c.cmdArgs[0],
|
||||
Args: c.cmdArgs,
|
||||
// Inherit Dir and Env by default.
|
||||
SysProcAttr: &syscall.SysProcAttr{Setsid: true},
|
||||
}
|
||||
cmd.Stdin = nil
|
||||
cmd.Stdout = stdoutF
|
||||
cmd.Stderr = stderrF
|
||||
|
||||
// Exec the child
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
c.UI.Error("command failed with error: " + err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Print it's PID to stdout
|
||||
fmt.Fprintf(os.Stdout, "%d\n", cmd.Process.Pid)
|
||||
|
||||
// Release (no-op on unix) and exit to orphan the child and get it adopted by
|
||||
// init.
|
||||
cmd.Process.Release()
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *cmd) Synopsis() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *cmd) Help() string {
|
||||
return ""
|
||||
}
|
|
@ -24,36 +24,12 @@ func Register(name string, fn Factory) {
|
|||
registry[name] = fn
|
||||
}
|
||||
|
||||
// RegisterHidden adds a new CLI sub-command to the registry that won't show up
|
||||
// in help or autocomplete.
|
||||
func RegisterHidden(name string, fn Factory) {
|
||||
if hiddenRegistry == nil {
|
||||
hiddenRegistry = make(map[string]Factory)
|
||||
}
|
||||
|
||||
if hiddenRegistry[name] != nil {
|
||||
panic(fmt.Errorf("Command %q is already registered", name))
|
||||
}
|
||||
hiddenRegistry[name] = fn
|
||||
}
|
||||
|
||||
// Map returns a realized mapping of available CLI commands in a format that
|
||||
// the CLI class can consume. This should be called after all registration is
|
||||
// complete.
|
||||
func Map(ui cli.Ui) map[string]cli.CommandFactory {
|
||||
return makeCommands(ui, registry)
|
||||
}
|
||||
|
||||
// Map returns a realized mapping of available but hidden CLI commands in a
|
||||
// format that the CLI class can consume. This should be called after all
|
||||
// registration is complete.
|
||||
func MapHidden(ui cli.Ui) map[string]cli.CommandFactory {
|
||||
return makeCommands(ui, hiddenRegistry)
|
||||
}
|
||||
|
||||
func makeCommands(ui cli.Ui, reg map[string]Factory) map[string]cli.CommandFactory {
|
||||
m := make(map[string]cli.CommandFactory)
|
||||
for name, fn := range reg {
|
||||
for name, fn := range registry {
|
||||
thisFn := fn
|
||||
m[name] = func() (cli.Command, error) {
|
||||
return thisFn(ui)
|
||||
|
@ -66,10 +42,6 @@ func makeCommands(ui cli.Ui, reg map[string]Factory) map[string]cli.CommandFacto
|
|||
// command name. This should be populated at package init() time via Register().
|
||||
var registry map[string]Factory
|
||||
|
||||
// hiddenRegistry behaves identically to registry but is for commands that are
|
||||
// hidden - i.e. not publically documented in the help or autocomplete.
|
||||
var hiddenRegistry map[string]Factory
|
||||
|
||||
// MakeShutdownCh returns a channel that can be used for shutdown notifications
|
||||
// for commands. This channel will send a message for every interrupt or SIGTERM
|
||||
// received.
|
||||
|
|
Loading…
Reference in New Issue