From 2b377dc62425b9a0de51926d20873460c64a5823 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 22 May 2018 14:27:04 +0100 Subject: [PATCH] Run daemon processes as a detached child. This turns out to have a lot more subtelty than we accounted for. The test suite is especially prone to races now we can only poll the child and many extra levels of indirectoin are needed to correctly run daemon process without it becoming a Zombie. I ran this test suite in a loop with parallel enabled to verify for races (-race doesn't find any as they are logical inter-process ones not actual data races). I made it through ~50 runs before hitting an error due to timing which is much better than before. I want to go back and see if we can do better though. Just getting this up. --- agent/agent.go | 5 +- agent/proxy/daemon.go | 225 +++++++++++++++++-------- agent/proxy/daemon_test.go | 184 +++++++++++--------- agent/proxy/manager.go | 53 +----- agent/proxy/manager_test.go | 2 +- agent/proxy/process.go | 43 +++++ agent/proxy/process_test.go | 70 ++++++++ agent/proxy/process_windows.go | 6 +- agent/proxy/proxy_test.go | 59 +++++++ command/commands_oss.go | 2 + command/connect/daemonize/daemonize.go | 90 ++++++++++ command/registry.go | 30 +++- main.go | 7 + 13 files changed, 576 insertions(+), 200 deletions(-) create mode 100644 agent/proxy/process.go create mode 100644 agent/proxy/process_test.go create mode 100644 command/connect/daemonize/daemonize.go diff --git a/agent/agent.go b/agent/agent.go index ca62a03cde..40a50dcb18 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1319,11 +1319,8 @@ func (a *Agent) ShutdownAgent() error { } // Stop the proxy manager - // NOTE(mitchellh): we use Kill for now to kill the processes since - // the local state isn't snapshotting meaning the proxy tokens are - // regenerated each time forcing the processes to restart anyways. if a.proxyManager != nil { - if err := a.proxyManager.Kill(); err != nil { + if err := a.proxyManager.Close(); err != nil { a.logger.Printf("[WARN] agent: error shutting down proxy manager: %s", err) } } diff --git a/agent/proxy/daemon.go b/agent/proxy/daemon.go index 013fbdc286..cbc1747c3a 100644 --- a/agent/proxy/daemon.go +++ b/agent/proxy/daemon.go @@ -1,12 +1,14 @@ package proxy import ( + "bytes" "fmt" "log" "os" "os/exec" "reflect" "strconv" + "strings" "sync" "time" @@ -29,9 +31,12 @@ const ( // // Consul will ensure that if the daemon crashes, that it is restarted. type Daemon struct { - // Command is the command to execute to start this daemon. This must - // be a Cmd that isn't yet started. - Command *exec.Cmd + // Path is the path to the executable to run + Path string + + // Args are the arguments to run with, the first element should be the same as + // Path. + Args []string // ProxyId is the ID of the proxy service. This is required for API // requests (along with the token) and is passed via env var. @@ -52,10 +57,30 @@ type Daemon struct { // created but the error will be logged to the Logger. PidPath string - // For tests, they can set this to change the default duration to wait - // for a graceful quit. + // StdoutPath, StderrPath are the paths to the files that stdout and stderr + // should be written to. + StdoutPath, StderrPath string + + // gracefulWait can be set for tests and controls how long Stop() will wait + // for process to terminate before killing. If not set defaults to 5 seconds. + // If this is lowered for tests, it must remain higher than pollInterval + // (preferably a factor of 2 or more) or the tests will potentially return + // errors from Stop() where the process races to Kill() a process that was + // already stopped by SIGINT but we didn't yet detect the change since poll + // didn't occur. gracefulWait time.Duration + // pollInterval can be set for tests and controls how frequently the child + // process is sent SIG 0 to check it's still running. If not set defaults to 1 + // second. + pollInterval time.Duration + + // daemonizePath is set only in tests to control the path to the daemonize + // command. The test executable itself will not respond to the consul command + // arguments we need to make this work so we rely on the current source being + // built and installed here to run the tests. + daemonizePath string + // process is the started process lock sync.Mutex stopped bool @@ -108,7 +133,7 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { // attempts keeps track of the number of restart attempts we've had and // is used to calculate the wait time using an exponential backoff. var attemptsDeadline time.Time - var attempts uint + var attempts uint32 for { if process == nil { @@ -121,7 +146,15 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { // Calculate the exponential backoff and wait if we have to if attempts > DaemonRestartBackoffMin { - waitTime := (1 << (attempts - DaemonRestartBackoffMin)) * time.Second + delayedAttempts := attempts - DaemonRestartBackoffMin + waitTime := time.Duration(0) + if delayedAttempts > 31 { + // don't shift off the end of the uint32 if the process is in a crash + // loop forever + waitTime = DaemonRestartMaxWait + } else { + waitTime = (1 << delayedAttempts) * time.Second + } if waitTime > DaemonRestartMaxWait { waitTime = DaemonRestartMaxWait } @@ -153,8 +186,8 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { return } - // Process isn't started currently. We're restarting. Start it - // and save the process if we have it. + // Process isn't started currently. We're restarting. Start it and save + // the process if we have it. var err error process, err = p.start() if err == nil { @@ -166,34 +199,40 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { p.Logger.Printf("[ERR] agent/proxy: error restarting daemon: %s", err) continue } - + // NOTE: it is a postcondition of this method that we don't return while + // the process is still running. See NOTE below but it's essential that + // from here to the <-waitCh below nothing can cause this method to exit + // or loop if err is non-nil. } - // Wait for the process to exit. Note that if we restored this proxy - // then Wait will always fail because we likely aren't the parent - // process. Therefore, we do an extra sanity check after to use other - // syscalls to verify the process is truly dead. - ps, err := process.Wait() - if _, err := findProcess(process.Pid); err == nil { - select { - case <-time.After(1 * time.Second): - // We want a busy loop, but not too busy. 1 second between - // detecting a process death seems reasonable. - - case <-stopCh: - // If we receive a stop request we want to exit immediately. - return - } - - continue + // Wait will never work since child is detached and released, so poll the + // PID with sig 0 to check it's still alive. + interval := p.pollInterval + if interval < 1 { + interval = 1 * time.Second } + waitCh, closer := externalWait(process.Pid, interval) + defer closer() + // NOTE: we must not select on anything else here; Stop() requires the + // postcondition for this method to be that the managed process is not + // running when we return and the defer above closes p.exitedCh. If we + // select on stopCh or another signal here we introduce races where we might + // exit early and leave the actual process running (for example because + // SIGINT was ignored but Stop saw us exit and assumed all was good). That + // means that there is no way to stop the Daemon without killing the process + // but that's OK because agent Shutdown can just persist the state and exit + // without Stopping this and the right thing will happen. If we ever need to + // stop managing a process without killing it at a time when the agent + // process is not shutting down then we might have to re-think that. + <-waitCh + + // Note that we don't need to call Release explicitly. It's a no-op for Unix + // but even on Windows it's called automatically by a Finalizer during + // garbage collection to free the process handle. + // (https://github.com/golang/go/blob/1174ad3a8f6f9d2318ac45fca3cd90f12915cf04/src/os/exec.go#L26) process = nil - if err != nil { - p.Logger.Printf("[INFO] agent/proxy: daemon exited with error: %s", err) - } else if status, ok := exitStatus(ps); ok { - p.Logger.Printf("[INFO] agent/proxy: daemon exited with exit code: %d", status) - } + p.Logger.Printf("[INFO] agent/proxy: daemon exited") } } @@ -201,33 +240,66 @@ func (p *Daemon) keepAlive(stopCh <-chan struct{}, exitedCh chan<- struct{}) { // configured *exec.Command with the modifications documented on Daemon // such as setting the proxy token environmental variable. func (p *Daemon) start() (*os.Process, error) { - cmd := *p.Command - // Add the proxy token to the environment. We first copy the env because // it is a slice and therefore the "copy" above will only copy the slice // reference. We allocate an exactly sized slice. - cmd.Env = make([]string, len(p.Command.Env), len(p.Command.Env)+1) - copy(cmd.Env, p.Command.Env) - cmd.Env = append(cmd.Env, + baseEnv := os.Environ() + env := make([]string, len(baseEnv), len(baseEnv)+2) + copy(env, baseEnv) + env = append(env, fmt.Sprintf("%s=%s", EnvProxyId, p.ProxyId), fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken)) // Args must always contain a 0 entry which is usually the executed binary. // To be safe and a bit more robust we default this, but only to prevent // a panic below. - if len(cmd.Args) == 0 { - cmd.Args = []string{cmd.Path} + if len(p.Args) == 0 { + p.Args = []string{p.Path} } - // Start it - p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", cmd.Path, cmd.Args[1:]) - if err := cmd.Start(); err != nil { + // Watch closely, we now swap out the exec.Cmd args for ones to run the same + // command via the connect daemonize command which takes care of correctly + // "double forking" to ensure the child is fully detached and adopted by the + // init process while the agent keeps running. + var daemonCmd exec.Cmd + + dCmd, err := p.daemonizeCommand() + if err != nil { return nil, err } + daemonCmd.Path = dCmd[0] + // First arguments are for the stdout, stderr + daemonCmd.Args = append(dCmd, p.StdoutPath) + daemonCmd.Args = append(daemonCmd.Args, p.StdoutPath) + daemonCmd.Args = append(daemonCmd.Args, p.Args...) + daemonCmd.Env = env + + // setup stdout so we can read the PID + var out bytes.Buffer + daemonCmd.Stdout = &out + + // Run it to completion - it should exit immediately (this calls wait to + // ensure we don't leave the daemonize command as a zombie) + p.Logger.Printf("[DEBUG] agent/proxy: starting proxy: %q %#v", daemonCmd.Path, + daemonCmd.Args[1:]) + if err := daemonCmd.Run(); err != nil { + return nil, err + } + + // Read the PID from stdout + outStr, err := out.ReadString('\n') + if err != nil { + return nil, err + } + pid, err := strconv.Atoi(strings.TrimSpace(outStr)) + if err != nil { + return nil, fmt.Errorf("failed to parse PID from output of daemonize: %s", + err) + } // Write the pid file. This might error and that's okay. if p.PidPath != "" { - pid := strconv.FormatInt(int64(cmd.Process.Pid), 10) + pid := strconv.Itoa(pid) if err := file.WriteAtomic(p.PidPath, []byte(pid)); err != nil { p.Logger.Printf( "[DEBUG] agent/proxy: error writing pid file %q: %s", @@ -235,7 +307,22 @@ func (p *Daemon) start() (*os.Process, error) { } } - return cmd.Process, nil + // Finally, adopt the process so we can send signals + return findProcess(pid) +} + +// daemonizeCommand returns the daemonize command. +func (p *Daemon) daemonizeCommand() ([]string, error) { + // Get the path to the current executable. This is cached once by the + // library so this is effectively just a variable read. + execPath, err := os.Executable() + if err != nil { + return nil, err + } + if p.daemonizePath != "" { + execPath = p.daemonizePath + } + return []string{execPath, "connect", "daemonize"}, nil } // Stop stops the daemon. @@ -283,7 +370,7 @@ func (p *Daemon) Stop() error { }() } - // First, try a graceful stop + // First, try a graceful stop. err := process.Signal(os.Interrupt) if err == nil { select { @@ -293,11 +380,23 @@ func (p *Daemon) Stop() error { case <-time.After(gracefulWait): // Interrupt didn't work + p.Logger.Printf("[DEBUG] agent/proxy: gracefull wait of %s passed, "+ + "killing", gracefulWait) } + } else if isProcessAlreadyFinishedErr(err) { + // This can happen due to races between signals and polling. + return nil + } else { + p.Logger.Printf("[DEBUG] agent/proxy: sigint failed, killing: %s", err) } - // Graceful didn't work, forcibly kill - return process.Kill() + // Graceful didn't work (e.g. on windows where SIGINT isn't implemented), + // forcibly kill + err = process.Kill() + if err != nil && isProcessAlreadyFinishedErr(err) { + return nil + } + return err } // stopKeepAlive is like Stop but keeps the process running. This is @@ -329,10 +428,8 @@ func (p *Daemon) Equal(raw Proxy) bool { // We compare equality on a subset of the command configuration return p.ProxyToken == p2.ProxyToken && - p.Command.Path == p2.Command.Path && - p.Command.Dir == p2.Command.Dir && - reflect.DeepEqual(p.Command.Args, p2.Command.Args) && - reflect.DeepEqual(p.Command.Env, p2.Command.Env) + p.Path == p2.Path && + reflect.DeepEqual(p.Args, p2.Args) } // MarshalSnapshot implements Proxy @@ -346,12 +443,10 @@ func (p *Daemon) MarshalSnapshot() map[string]interface{} { } return map[string]interface{}{ - "Pid": p.process.Pid, - "CommandPath": p.Command.Path, - "CommandArgs": p.Command.Args, - "CommandDir": p.Command.Dir, - "CommandEnv": p.Command.Env, - "ProxyToken": p.ProxyToken, + "Pid": p.process.Pid, + "Path": p.Path, + "Args": p.Args, + "ProxyToken": p.ProxyToken, } } @@ -367,12 +462,8 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error { // Set the basic fields p.ProxyToken = s.ProxyToken - p.Command = &exec.Cmd{ - Path: s.CommandPath, - Args: s.CommandArgs, - Dir: s.CommandDir, - Env: s.CommandEnv, - } + p.Path = s.Path + p.Args = s.Args // FindProcess on many systems returns no error even if the process // is now dead. We perform an extra check that the process is alive. @@ -398,14 +489,12 @@ func (p *Daemon) UnmarshalSnapshot(m map[string]interface{}) error { // within the manager snapshot and is restored automatically. type daemonSnapshot struct { // Pid of the process. This is the only value actually required to - // regain mangement control. The remainder values are for Equal. + // regain management control. The remainder values are for Equal. Pid int // Command information - CommandPath string - CommandArgs []string - CommandDir string - CommandEnv []string + Path string + Args []string // NOTE(mitchellh): longer term there are discussions/plans to only // store the hash of the token but for now we need the full token in diff --git a/agent/proxy/daemon_test.go b/agent/proxy/daemon_test.go index f08716276d..db7ddf47e8 100644 --- a/agent/proxy/daemon_test.go +++ b/agent/proxy/daemon_test.go @@ -3,8 +3,8 @@ package proxy import ( "io/ioutil" "os" - "os/exec" "path/filepath" + "strconv" "testing" "time" @@ -28,12 +28,10 @@ func TestDaemonStartStop(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-stop", path), - ProxyId: "tubes", - ProxyToken: uuid, - Logger: testLogger, - } + d := helperProcessDaemon("start-stop", path) + d.ProxyId = "tubes" + d.ProxyToken = uuid + d.Logger = testLogger require.NoError(d.Start()) defer d.Stop() @@ -68,6 +66,73 @@ func TestDaemonStartStop(t *testing.T) { }) } +func TestDaemonDetachesChild(t *testing.T) { + t.Parallel() + + require := require.New(t) + td, closer := testTempDir(t) + defer closer() + + path := filepath.Join(td, "file") + pidPath := filepath.Join(td, "child.pid") + + // Start the parent process wrapping a start-stop test. The parent is acting + // as our "agent". We need an extra indirection to be able to kill the "agent" + // and still be running the test process. + parentCmd := helperProcess("parent", pidPath, "start-stop", path) + require.NoError(parentCmd.Start()) + + // Wait for the pid file to exist so we know parent is running + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(pidPath) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // And wait for the actual file to be sure the child is running (it should be + // since parent doesn't write PID until child starts but the child might not + // have completed the write to disk yet which causes flakiness below). + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // Always cleanup child process after + defer func() { + _, err := os.Stat(pidPath) + if err != nil { + return + } + bs, err := ioutil.ReadFile(pidPath) + require.NoError(err) + pid, err := strconv.Atoi(string(bs)) + require.NoError(err) + proc, err := os.FindProcess(pid) + if err != nil { + return + } + proc.Kill() + }() + + // Now kill the parent and wait for it + require.NoError(parentCmd.Process.Kill()) + _, err := parentCmd.Process.Wait() + require.NoError(err) + + // The child should still be running so file should still be there + _, err = os.Stat(path) + require.NoError(err, "child should still be running") + + // Let defer clean up the child process +} + func TestDaemonRestart(t *testing.T) { t.Parallel() @@ -76,10 +141,8 @@ func TestDaemonRestart(t *testing.T) { defer closer() path := filepath.Join(td, "file") - d := &Daemon{ - Command: helperProcess("restart", path), - Logger: testLogger, - } + d := helperProcessDaemon("restart", path) + d.Logger = testLogger require.NoError(d.Start()) defer d.Stop() @@ -111,12 +174,11 @@ func TestDaemonStop_kill(t *testing.T) { path := filepath.Join(td, "file") - d := &Daemon{ - Command: helperProcess("stop-kill", path), - ProxyToken: "hello", - Logger: testLogger, - gracefulWait: 200 * time.Millisecond, - } + d := helperProcessDaemon("stop-kill", path) + d.ProxyToken = "hello" + d.Logger = testLogger + d.gracefulWait = 200 * time.Millisecond + d.pollInterval = 100 * time.Millisecond require.NoError(d.Start()) // Wait for the file to exist @@ -132,7 +194,7 @@ func TestDaemonStop_kill(t *testing.T) { // Stop the process require.NoError(d.Stop()) - // State the file so that we can get the mtime + // Stat the file so that we can get the mtime fi, err := os.Stat(path) require.NoError(err) mtime := fi.ModTime() @@ -149,6 +211,7 @@ func TestDaemonStart_pidFile(t *testing.T) { require := require.New(t) td, closer := testTempDir(t) + defer closer() path := filepath.Join(td, "file") @@ -156,12 +219,10 @@ func TestDaemonStart_pidFile(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-once", path), - ProxyToken: uuid, - Logger: testLogger, - PidPath: pidPath, - } + d := helperProcessDaemon("start-once", path) + d.ProxyToken = uuid + d.Logger = testLogger + d.PidPath = pidPath require.NoError(d.Start()) defer d.Stop() @@ -198,11 +259,9 @@ func TestDaemonRestart_pidFile(t *testing.T) { path := filepath.Join(td, "file") pidPath := filepath.Join(td, "pid") - d := &Daemon{ - Command: helperProcess("restart", path), - Logger: testLogger, - PidPath: pidPath, - } + d := helperProcessDaemon("restart", path) + d.Logger = testLogger + d.PidPath = pidPath require.NoError(d.Start()) defer d.Stop() @@ -244,51 +303,32 @@ func TestDaemonEqual(t *testing.T) { }{ { "Different type", - &Daemon{ - Command: &exec.Cmd{}, - }, + &Daemon{}, &Noop{}, false, }, { "Nil", - &Daemon{ - Command: &exec.Cmd{}, - }, + &Daemon{}, nil, false, }, { "Equal", - &Daemon{ - Command: &exec.Cmd{}, - }, - &Daemon{ - Command: &exec.Cmd{}, - }, + &Daemon{}, + &Daemon{}, true, }, { "Different path", &Daemon{ - Command: &exec.Cmd{Path: "/foo"}, + Path: "/foo", }, &Daemon{ - Command: &exec.Cmd{Path: "/bar"}, - }, - false, - }, - - { - "Different dir", - &Daemon{ - Command: &exec.Cmd{Dir: "/foo"}, - }, - &Daemon{ - Command: &exec.Cmd{Dir: "/bar"}, + Path: "/bar", }, false, }, @@ -296,10 +336,10 @@ func TestDaemonEqual(t *testing.T) { { "Different args", &Daemon{ - Command: &exec.Cmd{Args: []string{"foo"}}, + Args: []string{"foo"}, }, &Daemon{ - Command: &exec.Cmd{Args: []string{"bar"}}, + Args: []string{"bar"}, }, false, }, @@ -307,11 +347,9 @@ func TestDaemonEqual(t *testing.T) { { "Different token", &Daemon{ - Command: &exec.Cmd{}, ProxyToken: "one", }, &Daemon{ - Command: &exec.Cmd{}, ProxyToken: "two", }, false, @@ -335,7 +373,7 @@ func TestDaemonMarshalSnapshot(t *testing.T) { { "stopped daemon", &Daemon{ - Command: &exec.Cmd{Path: "/foo"}, + Path: "/foo", }, nil, }, @@ -343,16 +381,14 @@ func TestDaemonMarshalSnapshot(t *testing.T) { { "basic", &Daemon{ - Command: &exec.Cmd{Path: "/foo"}, + Path: "/foo", process: &os.Process{Pid: 42}, }, map[string]interface{}{ - "Pid": 42, - "CommandPath": "/foo", - "CommandArgs": []string(nil), - "CommandDir": "", - "CommandEnv": []string(nil), - "ProxyToken": "", + "Pid": 42, + "Path": "/foo", + "Args": []string(nil), + "ProxyToken": "", }, }, } @@ -376,11 +412,9 @@ func TestDaemonUnmarshalSnapshot(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-stop", path), - ProxyToken: uuid, - Logger: testLogger, - } + d := helperProcessDaemon("start-stop", path) + d.ProxyToken = uuid + d.Logger = testLogger defer d.Stop() require.NoError(d.Start()) @@ -430,11 +464,9 @@ func TestDaemonUnmarshalSnapshot_notRunning(t *testing.T) { uuid, err := uuid.GenerateUUID() require.NoError(err) - d := &Daemon{ - Command: helperProcess("start-stop", path), - ProxyToken: uuid, - Logger: testLogger, - } + d := helperProcessDaemon("start-stop", path) + d.ProxyToken = uuid + d.Logger = testLogger defer d.Stop() require.NoError(d.Start()) diff --git a/agent/proxy/manager.go b/agent/proxy/manager.go index 09eb1f6014..46bde603b9 100644 --- a/agent/proxy/manager.go +++ b/agent/proxy/manager.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "os" - "os/exec" "path/filepath" "sync" "time" @@ -401,16 +400,9 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) { return nil, fmt.Errorf("daemon mode managed proxy requires command") } - // Build the command to execute. - var cmd exec.Cmd - cmd.Path = command[0] - cmd.Args = command // idx 0 is path but preserved since it should be - if err := m.configureLogDir(id, &cmd); err != nil { - return nil, fmt.Errorf("error configuring proxy logs: %s", err) - } - // Build the daemon structure - proxy.Command = &cmd + proxy.Path = command[0] + proxy.Args = command // idx 0 is path but preserved since it should be proxy.ProxyId = id proxy.ProxyToken = mp.ProxyToken return proxy, nil @@ -427,8 +419,10 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy switch mode { case structs.ProxyExecModeDaemon: return &Daemon{ - Logger: m.Logger, - PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id), + Logger: m.Logger, + StdoutPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stdout"), + StderrPath: logPath(filepath.Join(m.DataDir, "logs"), id, "stderr"), + PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id), }, nil default: @@ -436,41 +430,6 @@ func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy } } -// configureLogDir sets up the file descriptors to stdout/stderr so that -// they log to the proper file path for the given service ID. -func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error { - // Create the log directory - logDir := "" - if m.DataDir != "" { - logDir = filepath.Join(m.DataDir, "logs") - if err := os.MkdirAll(logDir, 0700); err != nil { - return err - } - } - - // Configure the stdout, stderr paths - stdoutPath := logPath(logDir, id, "stdout") - stderrPath := logPath(logDir, id, "stderr") - - // Open the files. We want to append to each. We expect these files - // to be rotated by some external process. - stdoutF, err := os.OpenFile(stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - return fmt.Errorf("error creating stdout file: %s", err) - } - stderrF, err := os.OpenFile(stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) - if err != nil { - // Don't forget to close stdoutF which successfully opened - stdoutF.Close() - - return fmt.Errorf("error creating stderr file: %s", err) - } - - cmd.Stdout = stdoutF - cmd.Stderr = stderrF - return nil -} - // logPath is a helper to return the path to the log file for the given // directory, service ID, and stream type (stdout or stderr). func logPath(dir, id, stream string) string { diff --git a/agent/proxy/manager_test.go b/agent/proxy/manager_test.go index 28922cbfa8..228391a5d5 100644 --- a/agent/proxy/manager_test.go +++ b/agent/proxy/manager_test.go @@ -379,7 +379,7 @@ func testStateProxy(t *testing.T, state *local.State, service string, cmd *exec. ExecMode: structs.ProxyExecModeDaemon, Command: command, TargetServiceID: service, - }, "token") + }, "token", "") require.NoError(t, err) return p.Proxy.ProxyService.ID diff --git a/agent/proxy/process.go b/agent/proxy/process.go new file mode 100644 index 0000000000..22d75a3ce3 --- /dev/null +++ b/agent/proxy/process.go @@ -0,0 +1,43 @@ +package proxy + +import ( + "log" + "strings" + "time" +) + +// isProcessAlreadyFinishedErr does a janky comparison with an error string +// defined in os/exec_unix.go and os/exec_windows.go which we encounter due to races. +// These case tests to fail since Stop returns an error sometimes so we should +// notice if this string stops matching the error in a future go version. +func isProcessAlreadyFinishedErr(err error) bool { + return strings.Contains(err.Error(), "os: process already finished") +} + +// externalWait mimics process.Wait for an external process. The returned +// channel is closed when the process exits. It works by polling the process +// with signal 0 and verifying no error is returned. A closer func is also +// returned that can be invoked to terminate the waiter early. +func externalWait(pid int, pollInterval time.Duration) (<-chan struct{}, func()) { + ch := make(chan struct{}) + stopCh := make(chan struct{}) + closer := func() { + close(stopCh) + } + go func() { + for { + select { + case <-stopCh: + return + default: + } + log.Printf("checking pid %d", pid) + if _, err := findProcess(pid); err != nil { + close(ch) + return + } + time.Sleep(pollInterval) + } + }() + return ch, closer +} diff --git a/agent/proxy/process_test.go b/agent/proxy/process_test.go new file mode 100644 index 0000000000..ca8cf394ed --- /dev/null +++ b/agent/proxy/process_test.go @@ -0,0 +1,70 @@ +package proxy + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/consul/testutil/retry" + "github.com/stretchr/testify/require" +) + +func TestExternalWait(t *testing.T) { + t.Parallel() + + require := require.New(t) + td, closer := testTempDir(t) + defer closer() + path := filepath.Join(td, "file") + + cmd := helperProcess("restart", path) + require.NoError(cmd.Start()) + exitCh := make(chan struct{}) + // Launch waiter to make sure this process isn't zombified when it exits part + // way through the test. + go func() { + cmd.Process.Wait() + close(exitCh) + }() + defer cmd.Process.Kill() + + // Create waiter + pollInterval := 1 * time.Millisecond + waitCh, closer := externalWait(cmd.Process.Pid, pollInterval) + defer closer() + + // Wait for the file to exist so we don't rely on timing to not race with + // process startup. + retry.Run(t, func(r *retry.R) { + _, err := os.Stat(path) + if err == nil { + return + } + + r.Fatalf("error: %s", err) + }) + + // waitCh should not be closed until process quits. We'll wait a bit to verify + // we weren't just too quick to see a process exit + select { + case <-waitCh: + t.Fatal("waitCh should not be closed yet") + default: + } + + // Delete the file + require.NoError(os.Remove(path)) + + // Wait for the child to actually exit cleanly + <-exitCh + + // Now we _should_ see waitCh close (need to wait at least a whole poll + // interval) + select { + case <-waitCh: + // OK + case <-time.After(10 * pollInterval): + t.Fatal("waitCh should be closed") + } +} diff --git a/agent/proxy/process_windows.go b/agent/proxy/process_windows.go index 0a00d81ee0..43226aa92d 100644 --- a/agent/proxy/process_windows.go +++ b/agent/proxy/process_windows.go @@ -7,8 +7,8 @@ import ( ) func findProcess(pid int) (*os.Process, error) { - // On Windows, os.FindProcess will error if the process is not alive, - // so we don't have to do any further checking. The nature of it being - // non-nil means it seems to be healthy. + // On Windows, os.FindProcess will error if the process is not alive, so we + // don't have to do any further checking. The nature of it being non-nil means + // it seems to be healthy. return os.FindProcess(pid) } diff --git a/agent/proxy/proxy_test.go b/agent/proxy/proxy_test.go index 9b123787ca..7caa9a1d5b 100644 --- a/agent/proxy/proxy_test.go +++ b/agent/proxy/proxy_test.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "os/signal" + "strconv" "testing" "time" ) @@ -49,6 +50,37 @@ func helperProcess(s ...string) *exec.Cmd { return cmd } +// helperProcessDaemon returns a *Daemon that can be used to execute the +// TestHelperProcess function below. This can be used to test multi-process +// interactions. The Daemon has it's Path, Args and stdio paths populated but +// other fields might need to be set depending on test requirements. +// +// NOTE: this relies on a sufficiently recent version on consul being installed +// in your path so that the daemonize command can be used. That's gross but hard +// to see how we can do better given that tests are separate binaries and we +// need consul's daemonize mode to work correctly. I considered hacks around +// building the local tree and getting the absolute path to the resulting binary +// but that seems gross in a different way. This is the same or weaker +// assumption our `api` test suit makes already... +func helperProcessDaemon(s ...string) *Daemon { + cs := []string{os.Args[0], "-test.run=TestHelperProcess", "--", helperProcessSentinel} + cs = append(cs, s...) + + path, err := exec.LookPath("consul") + if err != nil || path == "" { + panic("consul not found on $PATH - download and install " + + "consul or skip this test") + } + + return &Daemon{ + Path: os.Args[0], + Args: cs, + StdoutPath: "_", // dev null them for now + StderrPath: "_", + daemonizePath: path, + } +} + // This is not a real test. This is just a helper process kicked off by tests // using the helperProcess helper function. func TestHelperProcess(t *testing.T) { @@ -155,6 +187,33 @@ func TestHelperProcess(t *testing.T) { <-make(chan struct{}) + // Parent runs the given process in a Daemon and then exits. It exists to test + // that the Daemon-managed child process survives it's parent exiting which we + // can't test directly without exiting the test process so we need an extra + // level of indirection. The caller must pass a file path as the first + // argument for the child processes PID to be written and then must take care + // to clean up that PID later or the child will be left running forever. + case "parent": + // We will write the PID for the child to the file in the first argument + // then pass rest of args through to command. + pidFile := args[0] + d := helperProcess(args[1:]...) + if err := d.Start(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %s\n", err) + os.Exit(1) + } + // Write PID + pidBs := []byte(strconv.Itoa(d.Process.Pid)) + if err := ioutil.WriteFile(pidFile, pidBs, 0644); err != nil { + fmt.Fprintf(os.Stderr, "Error: %s\n", err) + os.Exit(1) + } + // Wait "forever" (calling test chooses when we exit with signal/Wait to + // minimise coordination) + for { + time.Sleep(time.Hour) + } + default: fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd) os.Exit(2) diff --git a/command/commands_oss.go b/command/commands_oss.go index 8e95282aab..5184f553a2 100644 --- a/command/commands_oss.go +++ b/command/commands_oss.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/command/connect/ca" caget "github.com/hashicorp/consul/command/connect/ca/get" caset "github.com/hashicorp/consul/command/connect/ca/set" + "github.com/hashicorp/consul/command/connect/daemonize" "github.com/hashicorp/consul/command/connect/proxy" "github.com/hashicorp/consul/command/event" "github.com/hashicorp/consul/command/exec" @@ -74,6 +75,7 @@ func init() { Register("connect ca get-config", func(ui cli.Ui) (cli.Command, error) { return caget.New(ui), nil }) Register("connect ca set-config", func(ui cli.Ui) (cli.Command, error) { return caset.New(ui), nil }) Register("connect proxy", func(ui cli.Ui) (cli.Command, error) { return proxy.New(ui, MakeShutdownCh()), nil }) + RegisterHidden("connect daemonize", func(ui cli.Ui) (cli.Command, error) { return daemonize.New(ui), nil }) Register("event", func(ui cli.Ui) (cli.Command, error) { return event.New(ui), nil }) Register("exec", func(ui cli.Ui) (cli.Command, error) { return exec.New(ui, MakeShutdownCh()), nil }) Register("force-leave", func(ui cli.Ui) (cli.Command, error) { return forceleave.New(ui), nil }) diff --git a/command/connect/daemonize/daemonize.go b/command/connect/daemonize/daemonize.go new file mode 100644 index 0000000000..6c4097303d --- /dev/null +++ b/command/connect/daemonize/daemonize.go @@ -0,0 +1,90 @@ +package daemonize + +import ( + "fmt" + _ "net/http/pprof" + "os" // Expose pprof if configured + "os/exec" + "syscall" + + "github.com/mitchellh/cli" +) + +func New(ui cli.Ui) *cmd { + return &cmd{UI: ui} +} + +type cmd struct { + UI cli.Ui + + stdoutPath string + stderrPath string + cmdArgs []string +} + +func (c *cmd) Run(args []string) int { + // Ignore initial `consul connect daemonize` + offset := 3 + numArgs := len(os.Args) - offset + if numArgs < 4 { + c.UI.Error("Need at least 3 arguments; stdoutPath, stdinPath, " + + "executablePath [arguments...]") + os.Exit(1) + } + + c.stdoutPath, c.stderrPath = os.Args[offset], os.Args[offset+1] + c.cmdArgs = os.Args[offset+2:] // includes the executable as arg 0 as expected + + // Open log files if specified + var stdoutF, stderrF *os.File + var err error + if c.stdoutPath != "_" { + stdoutF, err = os.OpenFile(c.stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + c.UI.Error(fmt.Sprintf("error creating stdout file: %s", err)) + return 1 + } + } + if c.stderrPath != "_" { + stderrF, err = os.OpenFile(c.stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + c.UI.Error(fmt.Sprintf("error creating stderr file: %s", err)) + return 1 + } + } + + // Exec the command passed in a new session then exit to ensure it's adopted + // by the init process. Use the passed file paths for std out/err. + cmd := &exec.Cmd{ + Path: c.cmdArgs[0], + Args: c.cmdArgs, + // Inherit Dir and Env by default. + SysProcAttr: &syscall.SysProcAttr{Setsid: true}, + } + cmd.Stdin = nil + cmd.Stdout = stdoutF + cmd.Stderr = stderrF + + // Exec the child + err = cmd.Start() + if err != nil { + c.UI.Error("command failed with error: " + err.Error()) + os.Exit(1) + } + + // Print it's PID to stdout + fmt.Fprintf(os.Stdout, "%d\n", cmd.Process.Pid) + + // Release (no-op on unix) and exit to orphan the child and get it adopted by + // init. + cmd.Process.Release() + return 0 +} + +func (c *cmd) Synopsis() string { + return "" +} + +func (c *cmd) Help() string { + return "" +} diff --git a/command/registry.go b/command/registry.go index 2b092ae722..e3d05fc834 100644 --- a/command/registry.go +++ b/command/registry.go @@ -24,12 +24,36 @@ func Register(name string, fn Factory) { registry[name] = fn } +// RegisterHidden adds a new CLI sub-command to the registry that won't show up +// in help or autocomplete. +func RegisterHidden(name string, fn Factory) { + if hiddenRegistry == nil { + hiddenRegistry = make(map[string]Factory) + } + + if hiddenRegistry[name] != nil { + panic(fmt.Errorf("Command %q is already registered", name)) + } + hiddenRegistry[name] = fn +} + // Map returns a realized mapping of available CLI commands in a format that // the CLI class can consume. This should be called after all registration is // complete. func Map(ui cli.Ui) map[string]cli.CommandFactory { + return makeCommands(ui, registry) +} + +// Map returns a realized mapping of available but hidden CLI commands in a +// format that the CLI class can consume. This should be called after all +// registration is complete. +func MapHidden(ui cli.Ui) map[string]cli.CommandFactory { + return makeCommands(ui, hiddenRegistry) +} + +func makeCommands(ui cli.Ui, reg map[string]Factory) map[string]cli.CommandFactory { m := make(map[string]cli.CommandFactory) - for name, fn := range registry { + for name, fn := range reg { thisFn := fn m[name] = func() (cli.Command, error) { return thisFn(ui) @@ -42,6 +66,10 @@ func Map(ui cli.Ui) map[string]cli.CommandFactory { // command name. This should be populated at package init() time via Register(). var registry map[string]Factory +// hiddenRegistry behaves identically to registry but is for commands that are +// hidden - i.e. not publically documented in the help or autocomplete. +var hiddenRegistry map[string]Factory + // MakeShutdownCh returns a channel that can be used for shutdown notifications // for commands. This channel will send a message for every interrupt or SIGTERM // received. diff --git a/main.go b/main.go index 855967af47..c12106c7e5 100644 --- a/main.go +++ b/main.go @@ -41,6 +41,13 @@ func realMain() int { names = append(names, c) } + // Add hidden command + hidden := command.MapHidden(ui) + for name, cmd := range hidden { + // Don't add names to help since they are hidden! + cmds[name] = cmd + } + cli := &cli.CLI{ Args: args, Commands: cmds,