agent/proxy: send logs to the correct location for daemon proxies

This commit is contained in:
Mitchell Hashimoto 2018-05-02 20:11:58 -07:00
parent ba00fa3548
commit 6cdacd1fd9
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
5 changed files with 124 additions and 11 deletions

View File

@ -362,6 +362,7 @@ func (a *Agent) Start() error {
a.proxyManager = proxy.NewManager() a.proxyManager = proxy.NewManager()
a.proxyManager.State = a.State a.proxyManager.State = a.State
a.proxyManager.Logger = a.logger a.proxyManager.Logger = a.logger
a.proxyManager.LogDir = filepath.Join(a.config.DataDir, "proxy", "logs")
go a.proxyManager.Run() go a.proxyManager.Run()
// Start watching for critical services to deregister, based on their // Start watching for critical services to deregister, based on their

View File

@ -178,10 +178,6 @@ func (p *Daemon) start() (*os.Process, error) {
copy(cmd.Env, p.Command.Env) copy(cmd.Env, p.Command.Env)
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken)) cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", EnvProxyToken, p.ProxyToken))
// TODO(mitchellh): temporary until we introduce the file based logging
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// Args must always contain a 0 entry which is usually the executed binary. // Args must always contain a 0 entry which is usually the executed binary.
// To be safe and a bit more robust we default this, but only to prevent // To be safe and a bit more robust we default this, but only to prevent
// a panic below. // a panic below.

View File

@ -5,6 +5,7 @@ import (
"log" "log"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"sync" "sync"
"time" "time"
@ -53,6 +54,12 @@ type Manager struct {
// implementation type. // implementation type.
Logger *log.Logger Logger *log.Logger
// LogDir is the path to the directory where logs will be written
// for daemon mode proxies. This directory will be created if it does
// not exist. If this is empty then logs will be dumped into the
// working directory.
LogDir string
// CoalescePeriod and QuiescencePeriod control the timers for coalescing // CoalescePeriod and QuiescencePeriod control the timers for coalescing
// updates from the local state. See the defaults at the top of this // updates from the local state. See the defaults at the top of this
// file for more documentation. These will be set to those defaults // file for more documentation. These will be set to those defaults
@ -328,6 +335,13 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field") return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field")
} }
// Attempt to create the log directory now that we have a proxy
if m.LogDir != "" {
if err := os.MkdirAll(m.LogDir, 0700); err != nil {
m.Logger.Printf("[ERROR] agent/proxy: failed to create log directory: %s", err)
}
}
p := mp.Proxy p := mp.Proxy
switch p.ExecMode { switch p.ExecMode {
case structs.ProxyExecModeDaemon: case structs.ProxyExecModeDaemon:
@ -343,6 +357,9 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
var cmd exec.Cmd var cmd exec.Cmd
cmd.Path = command[0] cmd.Path = command[0]
cmd.Args = command // idx 0 is path but preserved since it should be cmd.Args = command // idx 0 is path but preserved since it should be
if err := m.configureLogDir(p.ProxyService.ID, &cmd); err != nil {
return nil, fmt.Errorf("error configuring proxy logs: %s", err)
}
// Build the daemon structure // Build the daemon structure
return &Daemon{ return &Daemon{
@ -355,3 +372,42 @@ func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode) return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
} }
} }
// configureLogDir sets up the file descriptors to stdout/stderr so that
// they log to the proper file path for the given service ID.
func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error {
// Create the log directory
if m.LogDir != "" {
if err := os.MkdirAll(m.LogDir, 0700); err != nil {
return err
}
}
// Configure the stdout, stderr paths
stdoutPath := logPath(m.LogDir, id, "stdout")
stderrPath := logPath(m.LogDir, id, "stderr")
// Open the files. We want to append to each. We expect these files
// to be rotated by some external process.
stdoutF, err := os.OpenFile(stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("error creating stdout file: %s", err)
}
stderrF, err := os.OpenFile(stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
// Don't forget to close stdoutF which successfully opened
stdoutF.Close()
return fmt.Errorf("error creating stderr file: %s", err)
}
cmd.Stdout = stdoutF
cmd.Stderr = stderrF
return nil
}
// logPath is a helper to return the path to the log file for the given
// directory, service ID, and stream type (stdout or stderr).
func logPath(dir, id, stream string) string {
return filepath.Join(dir, fmt.Sprintf("%s-%s.log", id, stream))
}

View File

@ -1,6 +1,7 @@
package proxy package proxy
import ( import (
"io/ioutil"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -17,7 +18,8 @@ func TestManagerClose_noRun(t *testing.T) {
t.Parallel() t.Parallel()
// Really we're testing that it doesn't deadlock here. // Really we're testing that it doesn't deadlock here.
m := testManager(t) m, closer := testManager(t)
defer closer()
require.NoError(t, m.Close()) require.NoError(t, m.Close())
// Close again for sanity // Close again for sanity
@ -30,7 +32,8 @@ func TestManagerRun_initialSync(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := testManager(t) m, closer := testManager(t)
defer closer()
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -57,7 +60,8 @@ func TestManagerRun_syncNew(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := testManager(t) m, closer := testManager(t)
defer closer()
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -99,7 +103,8 @@ func TestManagerRun_syncDelete(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := testManager(t) m, closer := testManager(t)
defer closer()
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -138,7 +143,8 @@ func TestManagerRun_syncUpdate(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := testManager(t) m, closer := testManager(t)
defer closer()
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -181,14 +187,63 @@ func TestManagerRun_syncUpdate(t *testing.T) {
}) })
} }
func testManager(t *testing.T) *Manager { func TestManagerRun_daemonLogs(t *testing.T) {
t.Parallel()
require := require.New(t)
state := local.TestState(t)
m, closer := testManager(t)
defer closer()
m.State = state
defer m.Kill()
// Configure a log dir so that we can read the logs
td, closer := testTempDir(t)
defer closer()
m.LogDir = filepath.Join(td, "logs")
// Create the service and calculate the log paths
id := testStateProxy(t, state, "web", helperProcess("output"))
stdoutPath := logPath(m.LogDir, id, "stdout")
stderrPath := logPath(m.LogDir, id, "stderr")
// Start the manager
go m.Run()
// We should see the path appear shortly
retry.Run(t, func(r *retry.R) {
if _, err := os.Stat(stdoutPath); err != nil {
r.Fatalf("error waiting for stdout path: %s", err)
}
if _, err := os.Stat(stderrPath); err != nil {
r.Fatalf("error waiting for stderr path: %s", err)
}
})
expectedOut := "hello stdout\n"
actual, err := ioutil.ReadFile(stdoutPath)
require.NoError(err)
require.Equal([]byte(expectedOut), actual)
expectedErr := "hello stderr\n"
actual, err = ioutil.ReadFile(stderrPath)
require.NoError(err)
require.Equal([]byte(expectedErr), actual)
}
func testManager(t *testing.T) (*Manager, func()) {
m := NewManager() m := NewManager()
// Set these periods low to speed up tests // Set these periods low to speed up tests
m.CoalescePeriod = 1 * time.Millisecond m.CoalescePeriod = 1 * time.Millisecond
m.QuiescentPeriod = 1 * time.Millisecond m.QuiescentPeriod = 1 * time.Millisecond
return m // Setup a temporary directory for logs
td, closer := testTempDir(t)
m.LogDir = td
return m, func() { closer() }
} }
// testStateProxy registers a proxy with the given local state and the command // testStateProxy registers a proxy with the given local state and the command

View File

@ -138,6 +138,11 @@ func TestHelperProcess(t *testing.T) {
// Run forever // Run forever
<-make(chan struct{}) <-make(chan struct{})
case "output":
fmt.Fprintf(os.Stdout, "hello stdout\n")
fmt.Fprintf(os.Stderr, "hello stderr\n")
<-make(chan struct{})
default: default:
fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd) fmt.Fprintf(os.Stderr, "Unknown command: %q\n", cmd)
os.Exit(2) os.Exit(2)