consul/agent/remote_exec.go

323 lines
7.8 KiB
Go
Raw Normal View History

package agent
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path"
"strconv"
"sync"
"syscall"
"time"
pkg refactor command/agent/* -> agent/* command/consul/* -> agent/consul/* command/agent/command{,_test}.go -> command/agent{,_test}.go command/base/command.go -> command/base.go command/base/* -> command/* commands.go -> command/commands.go The script which did the refactor is: ( cd $GOPATH/src/github.com/hashicorp/consul git mv command/agent/command.go command/agent.go git mv command/agent/command_test.go command/agent_test.go git mv command/agent/flag_slice_value{,_test}.go command/ git mv command/agent . git mv command/base/command.go command/base.go git mv command/base/config_util{,_test}.go command/ git mv commands.go command/ git mv consul agent rmdir command/base/ gsed -i -e 's|package agent|package command|' command/agent{,_test}.go gsed -i -e 's|package agent|package command|' command/flag_slice_value{,_test}.go gsed -i -e 's|package base|package command|' command/base.go command/config_util{,_test}.go gsed -i -e 's|package main|package command|' command/commands.go gsed -i -e 's|base.Command|BaseCommand|' command/commands.go gsed -i -e 's|agent.Command|AgentCommand|' command/commands.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/commands.go gsed -i -e 's|base\.||' command/commands.go gsed -i -e 's|command\.||' command/commands.go gsed -i -e 's|command|c|' main.go gsed -i -e 's|range Commands|range command.Commands|' main.go gsed -i -e 's|Commands: Commands|Commands: command.Commands|' main.go gsed -i -e 's|base\.BoolValue|BoolValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.DurationValue|DurationValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.StringValue|StringValue|' command/operator_autopilot_set.go gsed -i -e 's|base\.UintValue|UintValue|' command/operator_autopilot_set.go gsed -i -e 's|\bCommand\b|BaseCommand|' command/base.go gsed -i -e 's|BaseCommand Options|Command Options|' command/base.go gsed -i -e 's|base.Command|BaseCommand|' command/*.go gsed -i -e 's|c\.Command|c.BaseCommand|g' command/*.go gsed -i -e 's|\tCommand:|\tBaseCommand:|' command/*_test.go gsed -i -e 's|base\.||' command/*_test.go gsed -i -e 's|\bCommand\b|AgentCommand|' command/agent{,_test}.go gsed -i -e 's|cmd.AgentCommand|cmd.BaseCommand|' command/agent.go gsed -i -e 's|cli.AgentCommand = new(Command)|cli.Command = new(AgentCommand)|' command/agent_test.go gsed -i -e 's|exec.AgentCommand|exec.Command|' command/agent_test.go gsed -i -e 's|exec.BaseCommand|exec.Command|' command/agent_test.go gsed -i -e 's|NewTestAgent|agent.NewTestAgent|' command/agent_test.go gsed -i -e 's|= TestConfig|= agent.TestConfig|' command/agent_test.go gsed -i -e 's|: RetryJoin|: agent.RetryJoin|' command/agent_test.go gsed -i -e 's|\.\./\.\./|../|' command/config_util_test.go gsed -i -e 's|\bverifyUniqueListeners|VerifyUniqueListeners|' agent/config{,_test}.go command/agent.go gsed -i -e 's|\bserfLANKeyring\b|SerfLANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bserfWANKeyring\b|SerfWANKeyring|g' agent/{agent,keyring,testagent}.go command/agent.go gsed -i -e 's|\bNewAgent\b|agent.New|g' command/agent{,_test}.go gsed -i -e 's|\bNewAgent|New|' agent/{acl_test,agent,testagent}.go gsed -i -e 's|\bAgent\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bBool\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDefaultConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bDevConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bMergeConfig\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bReadConfigPaths\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bParseMetaPair\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfLANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|\bSerfWANKeyring\b|agent.&|g' command/agent{,_test}.go gsed -i -e 's|circonus\.agent|circonus|g' command/agent{,_test}.go gsed -i -e 's|logger\.agent|logger|g' command/agent{,_test}.go gsed -i -e 's|metrics\.agent|metrics|g' command/agent{,_test}.go gsed -i -e 's|// agent.Agent|// agent|' command/agent{,_test}.go gsed -i -e 's|a\.agent\.Config|a.Config|' command/agent{,_test}.go gsed -i -e 's|agent\.AppendSliceValue|AppendSliceValue|' command/{configtest,validate}.go gsed -i -e 's|consul/consul|agent/consul|' GNUmakefile gsed -i -e 's|\.\./test|../../test|' agent/consul/server_test.go # fix imports f=$(grep -rl 'github.com/hashicorp/consul/command/agent' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/command/agent|github.com/hashicorp/consul/agent|' $f goimports -w $f f=$(grep -rl 'github.com/hashicorp/consul/consul' * | grep '\.go') gsed -i -e 's|github.com/hashicorp/consul/consul|github.com/hashicorp/consul/agent/consul|' $f goimports -w $f goimports -w command/*.go main.go )
2017-06-09 22:28:28 +00:00
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/api"
)
const (
// remoteExecFileName is the name of the file we append to
// the path, e.g. _rexec/session_id/job
remoteExecFileName = "job"
// rExecAck is the suffix added to an ack path
remoteExecAckSuffix = "ack"
// remoteExecAck is the suffix added to an exit code
remoteExecExitSuffix = "exit"
// remoteExecOutputDivider is used to namespace the output
remoteExecOutputDivider = "out"
// remoteExecOutputSize is the size we chunk output too
remoteExecOutputSize = 4 * 1024
// remoteExecOutputDeadline is how long we wait before uploading
// less than the chunk size
remoteExecOutputDeadline = 500 * time.Millisecond
)
// remoteExecEvent is used as the payload of the user event to transmit
// what we need to know about the event
type remoteExecEvent struct {
Prefix string
Session string
}
// remoteExecSpec is used as the specification of the remote exec.
// It is stored in the KV store
type remoteExecSpec struct {
Command string
Script []byte
Wait time.Duration
}
type rexecWriter struct {
2014-09-01 17:42:35 +00:00
BufCh chan []byte
BufSize int
BufIdle time.Duration
CancelCh chan struct{}
buf []byte
bufLen int
bufLock sync.Mutex
flush *time.Timer
}
func (r *rexecWriter) Write(b []byte) (int, error) {
r.bufLock.Lock()
defer r.bufLock.Unlock()
if r.flush != nil {
r.flush.Stop()
r.flush = nil
}
inpLen := len(b)
2014-09-01 17:42:35 +00:00
if r.buf == nil {
r.buf = make([]byte, r.BufSize)
}
COPY:
remain := len(r.buf) - r.bufLen
2014-09-01 17:42:35 +00:00
if remain > len(b) {
copy(r.buf[r.bufLen:], b)
r.bufLen += len(b)
} else {
copy(r.buf[r.bufLen:], b[:remain])
b = b[remain:]
r.bufLen += remain
r.bufLock.Unlock()
2014-09-01 17:42:35 +00:00
r.Flush()
r.bufLock.Lock()
goto COPY
}
2014-09-01 17:42:35 +00:00
r.flush = time.AfterFunc(r.BufIdle, r.Flush)
return inpLen, nil
}
2014-09-01 17:42:35 +00:00
func (r *rexecWriter) Flush() {
r.bufLock.Lock()
defer r.bufLock.Unlock()
2014-09-01 17:42:35 +00:00
if r.flush != nil {
r.flush.Stop()
r.flush = nil
}
if r.bufLen == 0 {
return
}
select {
2014-09-01 17:42:35 +00:00
case r.BufCh <- r.buf[:r.bufLen]:
r.buf = make([]byte, r.BufSize)
r.bufLen = 0
2014-09-01 17:42:35 +00:00
case <-r.CancelCh:
r.bufLen = 0
}
}
// handleRemoteExec is invoked when a new remote exec request is received
func (a *Agent) handleRemoteExec(msg *UserEvent) {
a.logger.Printf("[DEBUG] agent: received remote exec event (ID: %s)", msg.ID)
2015-09-15 12:22:08 +00:00
// Decode the event payload
var event remoteExecEvent
if err := json.Unmarshal(msg.Payload, &event); err != nil {
a.logger.Printf("[ERR] agent: failed to decode remote exec event: %v", err)
return
}
// Read the job specification
var spec remoteExecSpec
if !a.remoteExecGetSpec(&event, &spec) {
return
}
// Write the acknowledgement
if !a.remoteExecWriteAck(&event) {
return
}
// Ensure we write out an exit code
exitCode := 0
defer a.remoteExecWriteExitCode(&event, &exitCode)
// Check if this is a script, we may need to spill to disk
var script string
if len(spec.Script) != 0 {
tmpFile, err := ioutil.TempFile("", "rexec")
if err != nil {
a.logger.Printf("[DEBUG] agent: failed to make tmp file: %v", err)
exitCode = 255
return
}
defer os.Remove(tmpFile.Name())
os.Chmod(tmpFile.Name(), 0750)
tmpFile.Write(spec.Script)
tmpFile.Close()
script = tmpFile.Name()
} else {
script = spec.Command
}
// Create the exec.Cmd
2014-09-01 18:21:57 +00:00
a.logger.Printf("[INFO] agent: remote exec '%s'", script)
cmd, err := ExecScript(script)
if err != nil {
a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err)
exitCode = 255
return
}
// Setup the output streaming
writer := &rexecWriter{
2014-09-01 17:42:35 +00:00
BufCh: make(chan []byte, 16),
BufSize: remoteExecOutputSize,
BufIdle: remoteExecOutputDeadline,
CancelCh: make(chan struct{}),
}
cmd.Stdout = writer
cmd.Stderr = writer
// Start execution
err = cmd.Start()
if err != nil {
a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err)
exitCode = 255
return
}
// Wait for the process to exit
exitCh := make(chan int, 1)
go func() {
err := cmd.Wait()
2014-09-01 17:42:35 +00:00
writer.Flush()
close(writer.BufCh)
if err == nil {
exitCh <- 0
return
}
// Try to determine the exit code
if exitErr, ok := err.(*exec.ExitError); ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
exitCh <- status.ExitStatus()
return
}
}
exitCh <- 1
}()
// Wait until we are complete, uploading as we go
WAIT:
for num := 0; ; num++ {
select {
2014-09-01 17:42:35 +00:00
case out := <-writer.BufCh:
if out == nil {
break WAIT
}
if !a.remoteExecWriteOutput(&event, num, out) {
2014-09-01 17:42:35 +00:00
close(writer.CancelCh)
exitCode = 255
return
}
case <-time.After(spec.Wait):
// Acts like a heartbeat, since there is no output
2014-09-01 20:13:15 +00:00
if !a.remoteExecWriteOutput(&event, num, nil) {
close(writer.CancelCh)
exitCode = 255
return
}
}
}
// Get the exit code
exitCode = <-exitCh
}
// remoteExecGetSpec is used to get the exec specification.
// Returns if execution should continue
func (a *Agent) remoteExecGetSpec(event *remoteExecEvent, spec *remoteExecSpec) bool {
get := structs.KeyRequest{
Datacenter: a.config.Datacenter,
Key: path.Join(event.Prefix, event.Session, remoteExecFileName),
QueryOptions: structs.QueryOptions{
AllowStale: true, // Stale read for scale! Retry on failure.
},
}
get.Token = a.config.GetTokenForAgent()
var out structs.IndexedDirEntries
QUERY:
if err := a.RPC("KVS.Get", &get, &out); err != nil {
a.logger.Printf("[ERR] agent: failed to get remote exec job: %v", err)
return false
}
if len(out.Entries) == 0 {
// If the initial read was stale and had no data, retry as a consistent read
if get.QueryOptions.AllowStale {
a.logger.Printf("[DEBUG] agent: trying consistent fetch of remote exec job spec")
get.QueryOptions.AllowStale = false
goto QUERY
} else {
a.logger.Printf("[DEBUG] agent: remote exec aborted, job spec missing")
return false
}
}
if err := json.Unmarshal(out.Entries[0].Value, &spec); err != nil {
a.logger.Printf("[ERR] agent: failed to decode remote exec spec: %v", err)
return false
}
return true
}
// remoteExecWriteAck is used to write an ack. Returns if execution should
// continue.
func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool {
2014-09-01 17:51:15 +00:00
if err := a.remoteExecWriteKey(event, remoteExecAckSuffix, nil); err != nil {
a.logger.Printf("[ERR] agent: failed to ack remote exec job: %v", err)
return false
}
return true
}
// remoteExecWriteOutput is used to write output
func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool {
2014-09-01 17:51:15 +00:00
suffix := path.Join(remoteExecOutputDivider, fmt.Sprintf("%05x", num))
if err := a.remoteExecWriteKey(event, suffix, output); err != nil {
a.logger.Printf("[ERR] agent: failed to write output for remote exec job: %v", err)
return false
}
return true
}
// remoteExecWriteExitCode is used to write an exit code
func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode *int) bool {
val := []byte(strconv.FormatInt(int64(*exitCode), 10))
2014-09-01 17:51:15 +00:00
if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil {
a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err)
2014-09-01 18:21:57 +00:00
return false
2014-09-01 17:51:15 +00:00
}
2014-09-01 18:21:57 +00:00
return true
2014-09-01 17:51:15 +00:00
}
// remoteExecWriteKey is used to write an output key for a remote exec job
func (a *Agent) remoteExecWriteKey(event *remoteExecEvent, suffix string, val []byte) error {
key := path.Join(event.Prefix, event.Session, a.config.NodeName, suffix)
write := structs.KVSRequest{
Datacenter: a.config.Datacenter,
Op: api.KVLock,
DirEnt: structs.DirEntry{
2014-09-01 17:51:15 +00:00
Key: key,
Value: val,
Session: event.Session,
},
}
write.Token = a.config.GetTokenForAgent()
var success bool
if err := a.RPC("KVS.Apply", &write, &success); err != nil {
2014-09-01 17:51:15 +00:00
return err
}
if !success {
2014-09-01 17:51:15 +00:00
return fmt.Errorf("write failed")
}
2014-09-01 17:51:15 +00:00
return nil
}