mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
Merge pull request #312 from hashicorp/f-rexec
Adding support for 'exec' command
This commit is contained in:
commit
c534bc9602
@ -235,6 +235,10 @@ type Config struct {
|
||||
// agent layer using the standard APIs.
|
||||
Watches []map[string]interface{} `mapstructure:"watches"`
|
||||
|
||||
// DisableRemoteExec is used to turn off the remote execution
|
||||
// feature. This is for security to prevent unknown scripts from running.
|
||||
DisableRemoteExec bool `mapstructure:"disable_remote_exec"`
|
||||
|
||||
// AEInterval controls the anti-entropy interval. This is how often
|
||||
// the agent attempts to reconcile it's local state with the server'
|
||||
// representation of our state. Defaults to every 60s.
|
||||
@ -676,6 +680,9 @@ func MergeConfig(a, b *Config) *Config {
|
||||
if len(b.WatchPlans) != 0 {
|
||||
result.WatchPlans = append(result.WatchPlans, b.WatchPlans...)
|
||||
}
|
||||
if b.DisableRemoteExec {
|
||||
result.DisableRemoteExec = true
|
||||
}
|
||||
|
||||
// Copy the start join addresses
|
||||
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
|
||||
|
@ -405,6 +405,17 @@ func TestDecodeConfig(t *testing.T) {
|
||||
if !reflect.DeepEqual(out, exp) {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// remote exec
|
||||
input = `{"disable_remote_exec": true}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if !config.DisableRemoteExec {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeConfig_Service(t *testing.T) {
|
||||
@ -566,6 +577,7 @@ func TestMergeConfig(t *testing.T) {
|
||||
"handler": "foobar",
|
||||
},
|
||||
},
|
||||
DisableRemoteExec: true,
|
||||
}
|
||||
|
||||
c := MergeConfig(a, b)
|
||||
|
319
command/agent/remote_exec.go
Normal file
319
command/agent/remote_exec.go
Normal file
@ -0,0 +1,319 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// remoteExecFileName is the name of the file we append to
|
||||
// the path, e.g. _rexec/session_id/job
|
||||
remoteExecFileName = "job"
|
||||
|
||||
// rExecAck is the suffix added to an ack path
|
||||
remoteExecAckSuffix = "ack"
|
||||
|
||||
// remoteExecAck is the suffix added to an exit code
|
||||
remoteExecExitSuffix = "exit"
|
||||
|
||||
// remoteExecOutputDivider is used to namespace the output
|
||||
remoteExecOutputDivider = "out"
|
||||
|
||||
// remoteExecOutputSize is the size we chunk output too
|
||||
remoteExecOutputSize = 4 * 1024
|
||||
|
||||
// remoteExecOutputDeadline is how long we wait before uploading
|
||||
// less than the chunk size
|
||||
remoteExecOutputDeadline = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
// remoteExecEvent is used as the payload of the user event to transmit
|
||||
// what we need to know about the event
|
||||
type remoteExecEvent struct {
|
||||
Prefix string
|
||||
Session string
|
||||
}
|
||||
|
||||
// remoteExecSpec is used as the specification of the remote exec.
|
||||
// It is stored in the KV store
|
||||
type remoteExecSpec struct {
|
||||
Command string
|
||||
Script []byte
|
||||
Wait time.Duration
|
||||
}
|
||||
|
||||
type rexecWriter struct {
|
||||
BufCh chan []byte
|
||||
BufSize int
|
||||
BufIdle time.Duration
|
||||
CancelCh chan struct{}
|
||||
|
||||
buf []byte
|
||||
bufLen int
|
||||
bufLock sync.Mutex
|
||||
flush *time.Timer
|
||||
}
|
||||
|
||||
func (r *rexecWriter) Write(b []byte) (int, error) {
|
||||
r.bufLock.Lock()
|
||||
defer r.bufLock.Unlock()
|
||||
if r.flush != nil {
|
||||
r.flush.Stop()
|
||||
r.flush = nil
|
||||
}
|
||||
inpLen := len(b)
|
||||
if r.buf == nil {
|
||||
r.buf = make([]byte, r.BufSize)
|
||||
}
|
||||
|
||||
COPY:
|
||||
remain := len(r.buf) - r.bufLen
|
||||
if remain > len(b) {
|
||||
copy(r.buf[r.bufLen:], b)
|
||||
r.bufLen += len(b)
|
||||
} else {
|
||||
copy(r.buf[r.bufLen:], b[:remain])
|
||||
b = b[remain:]
|
||||
r.bufLen += remain
|
||||
r.bufLock.Unlock()
|
||||
r.Flush()
|
||||
r.bufLock.Lock()
|
||||
goto COPY
|
||||
}
|
||||
|
||||
r.flush = time.AfterFunc(r.BufIdle, r.Flush)
|
||||
return inpLen, nil
|
||||
}
|
||||
|
||||
func (r *rexecWriter) Flush() {
|
||||
r.bufLock.Lock()
|
||||
defer r.bufLock.Unlock()
|
||||
if r.flush != nil {
|
||||
r.flush.Stop()
|
||||
r.flush = nil
|
||||
}
|
||||
if r.bufLen == 0 {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case r.BufCh <- r.buf[:r.bufLen]:
|
||||
r.buf = make([]byte, r.BufSize)
|
||||
r.bufLen = 0
|
||||
case <-r.CancelCh:
|
||||
r.bufLen = 0
|
||||
}
|
||||
}
|
||||
|
||||
// handleRemoteExec is invoked when a new remote exec request is received
|
||||
func (a *Agent) handleRemoteExec(msg *UserEvent) {
|
||||
a.logger.Printf("[DEBUG] agent: received remote exec event (ID: %s)", msg.ID)
|
||||
// Decode the event paylaod
|
||||
var event remoteExecEvent
|
||||
if err := json.Unmarshal(msg.Payload, &event); err != nil {
|
||||
a.logger.Printf("[ERR] agent: failed to decode remote exec event: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Read the job specification
|
||||
var spec remoteExecSpec
|
||||
if !a.remoteExecGetSpec(&event, &spec) {
|
||||
return
|
||||
}
|
||||
|
||||
// Write the acknowledgement
|
||||
if !a.remoteExecWriteAck(&event) {
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure we write out an exit code
|
||||
exitCode := 0
|
||||
defer a.remoteExecWriteExitCode(&event, exitCode)
|
||||
|
||||
// Check if this is a script, we may need to spill to disk
|
||||
var script string
|
||||
if len(spec.Script) != 0 {
|
||||
tmpFile, err := ioutil.TempFile("", "rexec")
|
||||
if err != nil {
|
||||
a.logger.Printf("[DEBUG] agent: failed to make tmp file: %v", err)
|
||||
exitCode = 255
|
||||
return
|
||||
}
|
||||
defer os.Remove(tmpFile.Name())
|
||||
os.Chmod(tmpFile.Name(), 0750)
|
||||
tmpFile.Write(spec.Script)
|
||||
tmpFile.Close()
|
||||
script = tmpFile.Name()
|
||||
} else {
|
||||
script = spec.Command
|
||||
}
|
||||
|
||||
// Create the exec.Cmd
|
||||
a.logger.Printf("[INFO] agent: remote exec '%s'", script)
|
||||
cmd, err := ExecScript(script)
|
||||
if err != nil {
|
||||
a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err)
|
||||
exitCode = 255
|
||||
return
|
||||
}
|
||||
|
||||
// Setup the output streaming
|
||||
writer := &rexecWriter{
|
||||
BufCh: make(chan []byte, 16),
|
||||
BufSize: remoteExecOutputSize,
|
||||
BufIdle: remoteExecOutputDeadline,
|
||||
CancelCh: make(chan struct{}),
|
||||
}
|
||||
cmd.Stdout = writer
|
||||
cmd.Stderr = writer
|
||||
|
||||
// Start execution
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err)
|
||||
exitCode = 255
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for the process to exit
|
||||
exitCh := make(chan int, 1)
|
||||
go func() {
|
||||
err := cmd.Wait()
|
||||
writer.Flush()
|
||||
close(writer.BufCh)
|
||||
if err != nil {
|
||||
exitCh <- 0
|
||||
return
|
||||
}
|
||||
|
||||
// Try to determine the exit code
|
||||
if exitErr, ok := err.(*exec.ExitError); ok {
|
||||
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
||||
exitCh <- status.ExitStatus()
|
||||
return
|
||||
}
|
||||
}
|
||||
exitCh <- 1
|
||||
}()
|
||||
|
||||
// Wait until we are complete, uploading as we go
|
||||
WAIT:
|
||||
for num := 0; ; num++ {
|
||||
select {
|
||||
case out := <-writer.BufCh:
|
||||
if out == nil {
|
||||
break WAIT
|
||||
}
|
||||
if !a.remoteExecWriteOutput(&event, num, out) {
|
||||
close(writer.CancelCh)
|
||||
exitCode = 255
|
||||
return
|
||||
}
|
||||
case <-time.After(spec.Wait):
|
||||
// Acts like a heartbeat, since there is no output
|
||||
if !a.remoteExecWriteOutput(&event, num, nil) {
|
||||
close(writer.CancelCh)
|
||||
exitCode = 255
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the exit code
|
||||
exitCode = <-exitCh
|
||||
}
|
||||
|
||||
// remoteExecGetSpec is used to get the exec specification.
|
||||
// Returns if execution should continue
|
||||
func (a *Agent) remoteExecGetSpec(event *remoteExecEvent, spec *remoteExecSpec) bool {
|
||||
get := structs.KeyRequest{
|
||||
Datacenter: a.config.Datacenter,
|
||||
Key: path.Join(event.Prefix, event.Session, remoteExecFileName),
|
||||
QueryOptions: structs.QueryOptions{
|
||||
AllowStale: true, // Stale read for scale! Retry on failure.
|
||||
},
|
||||
}
|
||||
var out structs.IndexedDirEntries
|
||||
QUERY:
|
||||
if err := a.RPC("KVS.Get", &get, &out); err != nil {
|
||||
a.logger.Printf("[ERR] agent: failed to get remote exec job: %v", err)
|
||||
return false
|
||||
}
|
||||
if len(out.Entries) == 0 {
|
||||
// If the initial read was stale and had no data, retry as a consistent read
|
||||
if get.QueryOptions.AllowStale {
|
||||
a.logger.Printf("[DEBUG] agent: trying consistent fetch of remote exec job spec")
|
||||
get.QueryOptions.AllowStale = false
|
||||
goto QUERY
|
||||
} else {
|
||||
a.logger.Printf("[DEBUG] agent: remote exec aborted, job spec missing")
|
||||
return false
|
||||
}
|
||||
}
|
||||
if err := json.Unmarshal(out.Entries[0].Value, &spec); err != nil {
|
||||
a.logger.Printf("[ERR] agent: failed to decode remote exec spec: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// remoteExecWriteAck is used to write an ack. Returns if execution should
|
||||
// continue.
|
||||
func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool {
|
||||
if err := a.remoteExecWriteKey(event, remoteExecAckSuffix, nil); err != nil {
|
||||
a.logger.Printf("[ERR] agent: failed to ack remote exec job: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// remoteExecWriteOutput is used to write output
|
||||
func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool {
|
||||
suffix := path.Join(remoteExecOutputDivider, fmt.Sprintf("%05x", num))
|
||||
if err := a.remoteExecWriteKey(event, suffix, output); err != nil {
|
||||
a.logger.Printf("[ERR] agent: failed to write output for remote exec job: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// remoteExecWriteExitCode is used to write an exit code
|
||||
func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) bool {
|
||||
val := []byte(strconv.FormatInt(int64(exitCode), 10))
|
||||
if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil {
|
||||
a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// remoteExecWriteKey is used to write an output key for a remote exec job
|
||||
func (a *Agent) remoteExecWriteKey(event *remoteExecEvent, suffix string, val []byte) error {
|
||||
key := path.Join(event.Prefix, event.Session, a.config.NodeName, suffix)
|
||||
write := structs.KVSRequest{
|
||||
Datacenter: a.config.Datacenter,
|
||||
Op: structs.KVSLock,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: key,
|
||||
Value: val,
|
||||
Session: event.Session,
|
||||
},
|
||||
}
|
||||
var success bool
|
||||
if err := a.RPC("KVS.Apply", &write, &success); err != nil {
|
||||
return err
|
||||
}
|
||||
if !success {
|
||||
return fmt.Errorf("write failed")
|
||||
}
|
||||
return nil
|
||||
}
|
288
command/agent/remote_exec_test.go
Normal file
288
command/agent/remote_exec_test.go
Normal file
@ -0,0 +1,288 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
)
|
||||
|
||||
func TestRexecWriter(t *testing.T) {
|
||||
writer := &rexecWriter{
|
||||
BufCh: make(chan []byte, 16),
|
||||
BufSize: 16,
|
||||
BufIdle: 10 * time.Millisecond,
|
||||
CancelCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Write short, wait for idle
|
||||
start := time.Now()
|
||||
n, err := writer.Write([]byte("test"))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n != 4 {
|
||||
t.Fatalf("bad: %v", n)
|
||||
}
|
||||
|
||||
select {
|
||||
case b := <-writer.BufCh:
|
||||
if len(b) != 4 {
|
||||
t.Fatalf("Bad: %v", b)
|
||||
}
|
||||
if time.Now().Sub(start) < writer.BufIdle {
|
||||
t.Fatalf("too early")
|
||||
}
|
||||
case <-time.After(2 * writer.BufIdle):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
// Write in succession to prevent the timeout
|
||||
writer.Write([]byte("test"))
|
||||
time.Sleep(writer.BufIdle / 2)
|
||||
writer.Write([]byte("test"))
|
||||
time.Sleep(writer.BufIdle / 2)
|
||||
start = time.Now()
|
||||
writer.Write([]byte("test"))
|
||||
|
||||
select {
|
||||
case b := <-writer.BufCh:
|
||||
if len(b) != 12 {
|
||||
t.Fatalf("Bad: %v", b)
|
||||
}
|
||||
if time.Now().Sub(start) < writer.BufIdle {
|
||||
t.Fatalf("too early")
|
||||
}
|
||||
case <-time.After(2 * writer.BufIdle):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
// Write large values, multiple flushes required
|
||||
writer.Write([]byte("01234567890123456789012345678901"))
|
||||
|
||||
select {
|
||||
case b := <-writer.BufCh:
|
||||
if string(b) != "0123456789012345" {
|
||||
t.Fatalf("bad: %s", b)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("should have buf")
|
||||
}
|
||||
select {
|
||||
case b := <-writer.BufCh:
|
||||
if string(b) != "6789012345678901" {
|
||||
t.Fatalf("bad: %s", b)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("should have buf")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteExecGetSpec(t *testing.T) {
|
||||
dir, agent := makeAgent(t, nextConfig())
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
event := &remoteExecEvent{
|
||||
Prefix: "_rexec",
|
||||
Session: makeRexecSession(t, agent),
|
||||
}
|
||||
defer destroySession(t, agent, event.Session)
|
||||
|
||||
spec := &remoteExecSpec{
|
||||
Command: "uptime",
|
||||
Script: []byte("#!/bin/bash"),
|
||||
Wait: time.Second,
|
||||
}
|
||||
buf, err := json.Marshal(spec)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
key := "_rexec/" + event.Session + "/job"
|
||||
setKV(t, agent, key, buf)
|
||||
|
||||
var out remoteExecSpec
|
||||
if !agent.remoteExecGetSpec(event, &out) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if !reflect.DeepEqual(spec, &out) {
|
||||
t.Fatalf("bad spec")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteExecWrites(t *testing.T) {
|
||||
dir, agent := makeAgent(t, nextConfig())
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
event := &remoteExecEvent{
|
||||
Prefix: "_rexec",
|
||||
Session: makeRexecSession(t, agent),
|
||||
}
|
||||
defer destroySession(t, agent, event.Session)
|
||||
|
||||
if !agent.remoteExecWriteAck(event) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
output := []byte("testing")
|
||||
if !agent.remoteExecWriteOutput(event, 0, output) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if !agent.remoteExecWriteOutput(event, 10, output) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
if !agent.remoteExecWriteExitCode(event, 1) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
key := "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack"
|
||||
d := getKV(t, agent, key)
|
||||
if d == nil || d.Session != event.Session {
|
||||
t.Fatalf("bad ack: %#v", d)
|
||||
}
|
||||
|
||||
key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000"
|
||||
d = getKV(t, agent, key)
|
||||
if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) {
|
||||
t.Fatalf("bad output: %#v", d)
|
||||
}
|
||||
|
||||
key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/0000a"
|
||||
d = getKV(t, agent, key)
|
||||
if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) {
|
||||
t.Fatalf("bad output: %#v", d)
|
||||
}
|
||||
|
||||
key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit"
|
||||
d = getKV(t, agent, key)
|
||||
if d == nil || d.Session != event.Session || string(d.Value) != "1" {
|
||||
t.Fatalf("bad output: %#v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleRemoteExec(t *testing.T) {
|
||||
dir, agent := makeAgent(t, nextConfig())
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
event := &remoteExecEvent{
|
||||
Prefix: "_rexec",
|
||||
Session: makeRexecSession(t, agent),
|
||||
}
|
||||
defer destroySession(t, agent, event.Session)
|
||||
|
||||
spec := &remoteExecSpec{
|
||||
Command: "uptime",
|
||||
Wait: time.Second,
|
||||
}
|
||||
buf, err := json.Marshal(spec)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
key := "_rexec/" + event.Session + "/job"
|
||||
setKV(t, agent, key, buf)
|
||||
|
||||
buf, err = json.Marshal(event)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
msg := &UserEvent{
|
||||
ID: generateUUID(),
|
||||
Payload: buf,
|
||||
}
|
||||
|
||||
// Handle the event...
|
||||
agent.handleRemoteExec(msg)
|
||||
|
||||
// Verify we have an ack
|
||||
key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack"
|
||||
d := getKV(t, agent, key)
|
||||
if d == nil || d.Session != event.Session {
|
||||
t.Fatalf("bad ack: %#v", d)
|
||||
}
|
||||
|
||||
// Verify we have output
|
||||
key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000"
|
||||
d = getKV(t, agent, key)
|
||||
if d == nil || d.Session != event.Session ||
|
||||
!bytes.Contains(d.Value, []byte("load")) {
|
||||
t.Fatalf("bad output: %#v", d)
|
||||
}
|
||||
|
||||
// Verify we have an exit code
|
||||
key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit"
|
||||
d = getKV(t, agent, key)
|
||||
if d == nil || d.Session != event.Session || string(d.Value) != "0" {
|
||||
t.Fatalf("bad output: %#v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func makeRexecSession(t *testing.T, agent *Agent) string {
|
||||
args := structs.SessionRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Op: structs.SessionCreate,
|
||||
Session: structs.Session{
|
||||
Node: agent.config.NodeName,
|
||||
LockDelay: 15 * time.Second,
|
||||
},
|
||||
}
|
||||
var out string
|
||||
if err := agent.RPC("Session.Apply", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func destroySession(t *testing.T, agent *Agent, session string) {
|
||||
args := structs.SessionRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Op: structs.SessionDestroy,
|
||||
Session: structs.Session{
|
||||
ID: session,
|
||||
},
|
||||
}
|
||||
var out string
|
||||
if err := agent.RPC("Session.Apply", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func setKV(t *testing.T, agent *Agent, key string, val []byte) {
|
||||
write := structs.KVSRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Op: structs.KVSSet,
|
||||
DirEnt: structs.DirEntry{
|
||||
Key: key,
|
||||
Value: val,
|
||||
},
|
||||
}
|
||||
var success bool
|
||||
if err := agent.RPC("KVS.Apply", &write, &success); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func getKV(t *testing.T, agent *Agent, key string) *structs.DirEntry {
|
||||
req := structs.KeyRequest{
|
||||
Datacenter: agent.config.Datacenter,
|
||||
Key: key,
|
||||
}
|
||||
var out structs.IndexedDirEntries
|
||||
if err := agent.RPC("KVS.Get", &req, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(out.Entries) > 0 {
|
||||
return out.Entries[0]
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,17 +1,18 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
const (
|
||||
// userEventMaxVersion is the maximum protocol version we understand
|
||||
userEventMaxVersion = 1
|
||||
|
||||
// remoteExecName is the event name for a remote exec command
|
||||
remoteExecName = "_rexec"
|
||||
)
|
||||
|
||||
// UserEventParam is used to parameterize a user event
|
||||
@ -79,7 +80,7 @@ func (a *Agent) UserEvent(dc string, params *UserEvent) error {
|
||||
// Format message
|
||||
params.ID = generateUUID()
|
||||
params.Version = userEventMaxVersion
|
||||
payload, err := encodeUserEvent(¶ms)
|
||||
payload, err := encodeMsgPack(¶ms)
|
||||
if err != nil {
|
||||
return fmt.Errorf("UserEvent encoding failed: %v", err)
|
||||
}
|
||||
@ -114,7 +115,7 @@ func (a *Agent) handleEvents() {
|
||||
case e := <-a.eventCh:
|
||||
// Decode the event
|
||||
msg := new(UserEvent)
|
||||
if err := decodeUserEvent(e.Payload, msg); err != nil {
|
||||
if err := decodeMsgPack(e.Payload, msg); err != nil {
|
||||
a.logger.Printf("[ERR] agent: Failed to decode event: %v", err)
|
||||
continue
|
||||
}
|
||||
@ -208,7 +209,19 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool {
|
||||
|
||||
// ingestUserEvent is used to process an event that passes filtering
|
||||
func (a *Agent) ingestUserEvent(msg *UserEvent) {
|
||||
a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID)
|
||||
// Special handling for internal events
|
||||
switch msg.Name {
|
||||
case remoteExecName:
|
||||
if a.config.DisableRemoteExec {
|
||||
a.logger.Printf("[INFO] agent: ignoring remote exec event (%s), disabled.", msg.ID)
|
||||
} else {
|
||||
go a.handleRemoteExec(msg)
|
||||
}
|
||||
return
|
||||
default:
|
||||
a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID)
|
||||
}
|
||||
|
||||
a.eventLock.Lock()
|
||||
defer func() {
|
||||
a.eventLock.Unlock()
|
||||
@ -253,15 +266,3 @@ func (a *Agent) LastUserEvent() *UserEvent {
|
||||
idx := (((a.eventIndex - 1) % n) + n) % n
|
||||
return a.eventBuf[idx]
|
||||
}
|
||||
|
||||
// Decode is used to decode a MsgPack encoded object
|
||||
func decodeUserEvent(buf []byte, out interface{}) error {
|
||||
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
|
||||
}
|
||||
|
||||
// encodeUserEvent is used to encode user event
|
||||
func encodeUserEvent(msg interface{}) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg)
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
crand "crypto/rand"
|
||||
"fmt"
|
||||
"math"
|
||||
@ -9,6 +10,8 @@ import (
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -76,3 +79,15 @@ func generateUUID() string {
|
||||
buf[8:10],
|
||||
buf[10:16])
|
||||
}
|
||||
|
||||
// decodeMsgPack is used to decode a MsgPack encoded object
|
||||
func decodeMsgPack(buf []byte, out interface{}) error {
|
||||
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
|
||||
}
|
||||
|
||||
// encodeMsgPack is used to encode an object with msgpack
|
||||
func encodeMsgPack(msg interface{}) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg)
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
574
command/exec.go
Normal file
574
command/exec.go
Normal file
@ -0,0 +1,574 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/armon/consul-api"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
const (
|
||||
// rExecPrefix is the prefix in the KV store used to
|
||||
// store the remote exec data
|
||||
rExecPrefix = "_rexec"
|
||||
|
||||
// rExecFileName is the name of the file we append to
|
||||
// the path, e.g. _rexec/session_id/job
|
||||
rExecFileName = "job"
|
||||
|
||||
// rExecAck is the suffix added to an ack path
|
||||
rExecAckSuffix = "/ack"
|
||||
|
||||
// rExecAck is the suffix added to an exit code
|
||||
rExecExitSuffix = "/exit"
|
||||
|
||||
// rExecOutputDivider is used to namespace the output
|
||||
rExecOutputDivider = "/out/"
|
||||
|
||||
// rExecReplicationWait is how long we wait for replication
|
||||
rExecReplicationWait = 200 * time.Millisecond
|
||||
|
||||
// rExecQuietWait is how long we wait for no responses
|
||||
// before assuming the job is done.
|
||||
rExecQuietWait = 2 * time.Second
|
||||
)
|
||||
|
||||
// rExecConf is used to pass around configuration
|
||||
type rExecConf struct {
|
||||
datacenter string
|
||||
prefix string
|
||||
|
||||
node string
|
||||
service string
|
||||
tag string
|
||||
|
||||
wait time.Duration
|
||||
replWait time.Duration
|
||||
|
||||
cmd string
|
||||
script []byte
|
||||
|
||||
verbose bool
|
||||
}
|
||||
|
||||
// rExecEvent is the event we broadcast using a user-event
|
||||
type rExecEvent struct {
|
||||
Prefix string
|
||||
Session string
|
||||
}
|
||||
|
||||
// rExecSpec is the file we upload to specify the parameters
|
||||
// of the remote execution.
|
||||
type rExecSpec struct {
|
||||
// Command is a single command to run directly in the shell
|
||||
Command string `json:",omitempty"`
|
||||
|
||||
// Script should be spilled to a file and executed
|
||||
Script []byte `json:",omitempty"`
|
||||
|
||||
// Wait is how long we are waiting on a quiet period to terminate
|
||||
Wait time.Duration
|
||||
}
|
||||
|
||||
// rExecAck is used to transmit an acknowledgement
|
||||
type rExecAck struct {
|
||||
Node string
|
||||
}
|
||||
|
||||
// rExecHeart is used to transmit a heartbeat
|
||||
type rExecHeart struct {
|
||||
Node string
|
||||
}
|
||||
|
||||
// rExecOutput is used to transmit a chunk of output
|
||||
type rExecOutput struct {
|
||||
Node string
|
||||
Output []byte
|
||||
}
|
||||
|
||||
// rExecExit is used to transmit an exit code
|
||||
type rExecExit struct {
|
||||
Node string
|
||||
Code int
|
||||
}
|
||||
|
||||
// ExecCommand is a Command implementation that is used to
|
||||
// do remote execution of commands
|
||||
type ExecCommand struct {
|
||||
ShutdownCh <-chan struct{}
|
||||
Ui cli.Ui
|
||||
conf rExecConf
|
||||
client *consulapi.Client
|
||||
sessionID string
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Run(args []string) int {
|
||||
cmdFlags := flag.NewFlagSet("exec", flag.ContinueOnError)
|
||||
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
cmdFlags.StringVar(&c.conf.datacenter, "datacenter", "", "")
|
||||
cmdFlags.StringVar(&c.conf.node, "node", "", "")
|
||||
cmdFlags.StringVar(&c.conf.service, "service", "", "")
|
||||
cmdFlags.StringVar(&c.conf.tag, "tag", "", "")
|
||||
cmdFlags.StringVar(&c.conf.prefix, "prefix", rExecPrefix, "")
|
||||
cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, "")
|
||||
cmdFlags.DurationVar(&c.conf.wait, "wait", rExecQuietWait, "")
|
||||
cmdFlags.BoolVar(&c.conf.verbose, "verbose", false, "")
|
||||
httpAddr := HTTPAddrFlag(cmdFlags)
|
||||
if err := cmdFlags.Parse(args); err != nil {
|
||||
return 1
|
||||
}
|
||||
|
||||
// Join the commands to execute
|
||||
c.conf.cmd = strings.Join(cmdFlags.Args(), " ")
|
||||
|
||||
// If there is no command, read stdin for a script input
|
||||
if c.conf.cmd == "-" {
|
||||
c.conf.cmd = ""
|
||||
var buf bytes.Buffer
|
||||
_, err := io.Copy(&buf, os.Stdin)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read stdin: %v", err))
|
||||
c.Ui.Error("")
|
||||
c.Ui.Error(c.Help())
|
||||
return 1
|
||||
}
|
||||
c.conf.script = buf.Bytes()
|
||||
}
|
||||
|
||||
// Ensure we have a command or script
|
||||
if c.conf.cmd == "" && len(c.conf.script) == 0 {
|
||||
c.Ui.Error("Must specify a command to execute")
|
||||
c.Ui.Error("")
|
||||
c.Ui.Error(c.Help())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Validate the configuration
|
||||
if err := c.conf.validate(); err != nil {
|
||||
c.Ui.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Create and test the HTTP client
|
||||
client, err := HTTPClientDC(*httpAddr, c.conf.datacenter)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
_, err = client.Agent().NodeName()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
c.client = client
|
||||
|
||||
// Create the job spec
|
||||
spec, err := c.makeRExecSpec()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to create job spec: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Create a session for this
|
||||
c.sessionID, err = c.createSession()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to create session: %s", err))
|
||||
return 1
|
||||
}
|
||||
defer c.destroySession()
|
||||
if c.conf.verbose {
|
||||
c.Ui.Info(fmt.Sprintf("Created remote execution session: %s", c.sessionID))
|
||||
}
|
||||
|
||||
// Upload the payload
|
||||
if err := c.uploadPayload(spec); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to create job file: %s", err))
|
||||
return 1
|
||||
}
|
||||
defer c.destroyData()
|
||||
if c.conf.verbose {
|
||||
c.Ui.Info(fmt.Sprintf("Uploaded remote execution spec"))
|
||||
}
|
||||
|
||||
// Wait for replication. This is done so that when the event is
|
||||
// received, the job file can be read using a stale read. If the
|
||||
// stale read fails, we expect a consistent read to be done, so
|
||||
// largely this is a heuristic.
|
||||
select {
|
||||
case <-time.After(c.conf.replWait):
|
||||
case <-c.ShutdownCh:
|
||||
return 1
|
||||
}
|
||||
|
||||
// Fire the event
|
||||
id, err := c.fireEvent()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to fire event: %s", err))
|
||||
return 1
|
||||
}
|
||||
if c.conf.verbose {
|
||||
c.Ui.Info(fmt.Sprintf("Fired remote execution event: %s", id))
|
||||
}
|
||||
|
||||
// Wait for the job to finish now
|
||||
return c.waitForJob()
|
||||
}
|
||||
|
||||
// waitForJob is used to poll for results and wait until the job is terminated
|
||||
func (c *ExecCommand) waitForJob() int {
|
||||
// Although the session destroy is already deferred, we do it again here,
|
||||
// because invalidation of the session before destroyData() ensures there is
|
||||
// no race condition allowing an agent to upload data (the acquire will fail).
|
||||
defer c.destroySession()
|
||||
start := time.Now()
|
||||
ackCh := make(chan rExecAck, 128)
|
||||
heartCh := make(chan rExecHeart, 128)
|
||||
outputCh := make(chan rExecOutput, 128)
|
||||
exitCh := make(chan rExecExit, 128)
|
||||
doneCh := make(chan struct{})
|
||||
errCh := make(chan struct{}, 1)
|
||||
defer close(doneCh)
|
||||
go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh)
|
||||
target := &TargettedUi{Ui: c.Ui}
|
||||
|
||||
var ackCount, exitCount, badExit int
|
||||
OUTER:
|
||||
for {
|
||||
// Determine wait time. We provide a larger window if we know about
|
||||
// nodes which are still working.
|
||||
waitIntv := c.conf.wait
|
||||
if ackCount > exitCount {
|
||||
waitIntv *= 2
|
||||
}
|
||||
|
||||
select {
|
||||
case e := <-ackCh:
|
||||
ackCount++
|
||||
if c.conf.verbose {
|
||||
target.Target = e.Node
|
||||
target.Info("acknowledged")
|
||||
}
|
||||
|
||||
case h := <-heartCh:
|
||||
if c.conf.verbose {
|
||||
target.Target = h.Node
|
||||
target.Info("heartbeat received")
|
||||
}
|
||||
|
||||
case e := <-outputCh:
|
||||
target.Target = e.Node
|
||||
target.Output(string(e.Output))
|
||||
|
||||
case e := <-exitCh:
|
||||
exitCount++
|
||||
target.Target = e.Node
|
||||
target.Info(fmt.Sprintf("finished with exit code %d", e.Code))
|
||||
if e.Code != 0 {
|
||||
badExit++
|
||||
}
|
||||
|
||||
case <-time.After(waitIntv):
|
||||
c.Ui.Info(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount))
|
||||
if c.conf.verbose {
|
||||
c.Ui.Info(fmt.Sprintf("Completed in %0.2f seconds",
|
||||
float64(time.Now().Sub(start))/float64(time.Second)))
|
||||
}
|
||||
break OUTER
|
||||
|
||||
case <-errCh:
|
||||
return 1
|
||||
|
||||
case <-c.ShutdownCh:
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
if badExit > 0 {
|
||||
return 2
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// streamResults is used to perform blocking queries against the KV endpoint and stream in
|
||||
// notice of various events into waitForJob
|
||||
func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
|
||||
outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) {
|
||||
kv := c.client.KV()
|
||||
opts := consulapi.QueryOptions{WaitTime: c.conf.wait}
|
||||
dir := path.Join(c.conf.prefix, c.sessionID) + "/"
|
||||
seen := make(map[string]struct{})
|
||||
|
||||
for {
|
||||
// Check if we've been signaled to exit
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Block on waiting for new keys
|
||||
keys, qm, err := kv.Keys(dir, "", &opts)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read results: %s", err))
|
||||
goto ERR_EXIT
|
||||
}
|
||||
|
||||
// Fast-path the no-change case
|
||||
if qm.LastIndex == opts.WaitIndex {
|
||||
continue
|
||||
}
|
||||
opts.WaitIndex = qm.LastIndex
|
||||
|
||||
// Handle each key
|
||||
for _, key := range keys {
|
||||
// Ignore if we've seen it
|
||||
if _, ok := seen[key]; ok {
|
||||
continue
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
|
||||
// Trim the directory
|
||||
full := key
|
||||
key = strings.TrimPrefix(key, dir)
|
||||
|
||||
// Handle the key type
|
||||
switch {
|
||||
case key == rExecFileName:
|
||||
continue
|
||||
case strings.HasSuffix(key, rExecAckSuffix):
|
||||
ackCh <- rExecAck{Node: strings.TrimSuffix(key, rExecAckSuffix)}
|
||||
|
||||
case strings.HasSuffix(key, rExecExitSuffix):
|
||||
pair, _, err := kv.Get(full, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
|
||||
continue
|
||||
}
|
||||
code, err := strconv.ParseInt(string(pair.Value), 10, 32)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to parse exit code '%s': %v", pair.Value, err))
|
||||
continue
|
||||
}
|
||||
exitCh <- rExecExit{
|
||||
Node: strings.TrimSuffix(key, rExecExitSuffix),
|
||||
Code: int(code),
|
||||
}
|
||||
|
||||
case strings.LastIndex(key, rExecOutputDivider) != -1:
|
||||
pair, _, err := kv.Get(full, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
|
||||
continue
|
||||
}
|
||||
idx := strings.LastIndex(key, rExecOutputDivider)
|
||||
node := key[:idx]
|
||||
if len(pair.Value) == 0 {
|
||||
heartCh <- rExecHeart{Node: node}
|
||||
} else {
|
||||
outputCh <- rExecOutput{Node: node, Output: pair.Value}
|
||||
}
|
||||
|
||||
default:
|
||||
c.Ui.Error(fmt.Sprintf("Unknown key '%s', ignoring.", key))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
ERR_EXIT:
|
||||
select {
|
||||
case errCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// validate checks that the configuration is sane
|
||||
func (conf *rExecConf) validate() error {
|
||||
// Validate the filters
|
||||
if conf.node != "" {
|
||||
if _, err := regexp.Compile(conf.node); err != nil {
|
||||
return fmt.Errorf("Failed to compile node filter regexp: %v", err)
|
||||
}
|
||||
}
|
||||
if conf.service != "" {
|
||||
if _, err := regexp.Compile(conf.service); err != nil {
|
||||
return fmt.Errorf("Failed to compile service filter regexp: %v", err)
|
||||
}
|
||||
}
|
||||
if conf.tag != "" {
|
||||
if _, err := regexp.Compile(conf.tag); err != nil {
|
||||
return fmt.Errorf("Failed to compile tag filter regexp: %v", err)
|
||||
}
|
||||
}
|
||||
if conf.tag != "" && conf.service == "" {
|
||||
return fmt.Errorf("Cannot provide tag filter without service filter.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createSession is used to create a new session for this command
|
||||
func (c *ExecCommand) createSession() (string, error) {
|
||||
session := c.client.Session()
|
||||
se := consulapi.SessionEntry{
|
||||
Name: "Remote Exec",
|
||||
}
|
||||
id, _, err := session.Create(&se, nil)
|
||||
return id, err
|
||||
}
|
||||
|
||||
// destroySession is used to destroy the associated session
|
||||
func (c *ExecCommand) destroySession() error {
|
||||
session := c.client.Session()
|
||||
_, err := session.Destroy(c.sessionID, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// makeRExecSpec creates a serialized job specification
|
||||
// that can be uploaded which will be parsed by agents to
|
||||
// determine what to do.
|
||||
func (c *ExecCommand) makeRExecSpec() ([]byte, error) {
|
||||
spec := &rExecSpec{
|
||||
Command: c.conf.cmd,
|
||||
Script: c.conf.script,
|
||||
Wait: c.conf.wait,
|
||||
}
|
||||
return json.Marshal(spec)
|
||||
}
|
||||
|
||||
// uploadPayload is used to upload the request payload
|
||||
func (c *ExecCommand) uploadPayload(payload []byte) error {
|
||||
kv := c.client.KV()
|
||||
pair := consulapi.KVPair{
|
||||
Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName),
|
||||
Value: payload,
|
||||
Session: c.sessionID,
|
||||
}
|
||||
ok, _, err := kv.Acquire(&pair, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to acquire key %s", pair.Key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// destroyData is used to nuke all the data associated with
|
||||
// this remote exec. We just do a recursive delete of our
|
||||
// data directory.
|
||||
func (c *ExecCommand) destroyData() error {
|
||||
kv := c.client.KV()
|
||||
dir := path.Join(c.conf.prefix, c.sessionID)
|
||||
_, err := kv.DeleteTree(dir, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// fireEvent is used to fire the event that will notify nodes
|
||||
// about the remote execution. Returns the event ID or error
|
||||
func (c *ExecCommand) fireEvent() (string, error) {
|
||||
// Create the user event payload
|
||||
msg := &rExecEvent{
|
||||
Prefix: c.conf.prefix,
|
||||
Session: c.sessionID,
|
||||
}
|
||||
buf, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Format the user event
|
||||
event := c.client.Event()
|
||||
params := &consulapi.UserEvent{
|
||||
Name: "_rexec",
|
||||
Payload: buf,
|
||||
NodeFilter: c.conf.node,
|
||||
ServiceFilter: c.conf.service,
|
||||
TagFilter: c.conf.tag,
|
||||
}
|
||||
|
||||
// Fire the event
|
||||
id, _, err := event.Fire(params, nil)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Synopsis() string {
|
||||
return "Executes a command on Consul nodes"
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: consul exec [options] [-|command...]
|
||||
|
||||
Evaluates a command on remote Consul nodes. The nodes responding can
|
||||
be filtered using regular expressions on node name, service, and tag
|
||||
definitions. If a command is '-', stdin will be read until EOF
|
||||
and used as a script input.
|
||||
|
||||
Options:
|
||||
|
||||
-http-addr=127.0.0.1:8500 HTTP address of the Consul agent.
|
||||
-datacenter="" Datacenter to dispatch in. Defaults to that of agent.
|
||||
-prefix="_rexec" Prefix in the KV store to use for request data
|
||||
-node="" Regular expression to filter on node names
|
||||
-service="" Regular expression to filter on service instances
|
||||
-tag="" Regular expression to filter on service tags. Must be used
|
||||
with -service.
|
||||
-wait=2s Period to wait with no responses before terminating execution.
|
||||
-wait-repl=200ms Period to wait for replication before firing event. This is an
|
||||
optimization to allow stale reads to be performed.
|
||||
-verbose Enables verbose output
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
||||
|
||||
// TargettedUi is a UI that wraps another UI implementation and modifies
|
||||
// the output to indicate a specific target. Specifically, all Say output
|
||||
// is prefixed with the target name. Message output is not prefixed but
|
||||
// is offset by the length of the target so that output is lined up properly
|
||||
// with Say output. Machine-readable output has the proper target set.
|
||||
type TargettedUi struct {
|
||||
Target string
|
||||
Ui cli.Ui
|
||||
}
|
||||
|
||||
func (u *TargettedUi) Ask(query string) (string, error) {
|
||||
return u.Ui.Ask(u.prefixLines(true, query))
|
||||
}
|
||||
|
||||
func (u *TargettedUi) Info(message string) {
|
||||
u.Ui.Info(u.prefixLines(true, message))
|
||||
}
|
||||
|
||||
func (u *TargettedUi) Output(message string) {
|
||||
u.Ui.Output(u.prefixLines(false, message))
|
||||
}
|
||||
|
||||
func (u *TargettedUi) Error(message string) {
|
||||
u.Ui.Error(u.prefixLines(true, message))
|
||||
}
|
||||
|
||||
func (u *TargettedUi) prefixLines(arrow bool, message string) string {
|
||||
arrowText := "==>"
|
||||
if !arrow {
|
||||
arrowText = strings.Repeat(" ", len(arrowText))
|
||||
}
|
||||
|
||||
var result bytes.Buffer
|
||||
|
||||
for _, line := range strings.Split(message, "\n") {
|
||||
result.WriteString(fmt.Sprintf("%s %s: %s\n", arrowText, u.Target, line))
|
||||
}
|
||||
|
||||
return strings.TrimRightFunc(result.String(), unicode.IsSpace)
|
||||
}
|
319
command/exec_test.go
Normal file
319
command/exec_test.go
Normal file
@ -0,0 +1,319 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/armon/consul-api"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func TestExecCommand_implements(t *testing.T) {
|
||||
var _ cli.Command = &ExecCommand{}
|
||||
}
|
||||
|
||||
func TestExecCommandRun(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := &ExecCommand{Ui: ui}
|
||||
args := []string{"-http-addr=" + a1.httpAddr, "-wait=400ms", "uptime"}
|
||||
|
||||
code := c.Run(args)
|
||||
if code != 0 {
|
||||
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||
}
|
||||
|
||||
if !strings.Contains(ui.OutputWriter.String(), "load") {
|
||||
t.Fatalf("bad: %#v", ui.OutputWriter.String())
|
||||
}
|
||||
}
|
||||
|
||||
func waitForLeader(t *testing.T, httpAddr string) {
|
||||
client, err := HTTPClient(httpAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
_, qm, err := client.Catalog().Nodes(nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return qm.KnownLeader, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("failed to find leader: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestExecCommand_Validate(t *testing.T) {
|
||||
conf := &rExecConf{}
|
||||
err := conf.validate()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
conf.node = "("
|
||||
err = conf.validate()
|
||||
if err == nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
conf.node = ""
|
||||
conf.service = "("
|
||||
err = conf.validate()
|
||||
if err == nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
conf.service = "()"
|
||||
conf.tag = "("
|
||||
err = conf.validate()
|
||||
if err == nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
conf.service = ""
|
||||
conf.tag = "foo"
|
||||
err = conf.validate()
|
||||
if err == nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecCommand_Sessions(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
|
||||
client, err := HTTPClient(a1.httpAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := &ExecCommand{
|
||||
Ui: ui,
|
||||
client: client,
|
||||
}
|
||||
|
||||
id, err := c.createSession()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
se, _, err := client.Session().Info(id, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if se == nil || se.Name != "Remote Exec" {
|
||||
t.Fatalf("bad: %v", se)
|
||||
}
|
||||
|
||||
c.sessionID = id
|
||||
err = c.destroySession()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
se, _, err = client.Session().Info(id, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if se != nil {
|
||||
t.Fatalf("bad: %v", se)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecCommand_UploadDestroy(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
|
||||
client, err := HTTPClient(a1.httpAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := &ExecCommand{
|
||||
Ui: ui,
|
||||
client: client,
|
||||
}
|
||||
|
||||
id, err := c.createSession()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
c.sessionID = id
|
||||
|
||||
c.conf.prefix = "_rexec"
|
||||
c.conf.cmd = "uptime"
|
||||
c.conf.wait = time.Second
|
||||
|
||||
buf, err := c.makeRExecSpec()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = c.uploadPayload(buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
pair, _, err := client.KV().Get("_rexec/"+id+"/job", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if pair == nil || len(pair.Value) == 0 {
|
||||
t.Fatalf("missing job spec")
|
||||
}
|
||||
|
||||
err = c.destroyData()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
pair, _, err = client.KV().Get("_rexec/"+id+"/job", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if pair != nil {
|
||||
t.Fatalf("should be destroyed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExecCommand_StreamResults(t *testing.T) {
|
||||
a1 := testAgent(t)
|
||||
defer a1.Shutdown()
|
||||
waitForLeader(t, a1.httpAddr)
|
||||
|
||||
client, err := HTTPClient(a1.httpAddr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := &ExecCommand{
|
||||
Ui: ui,
|
||||
client: client,
|
||||
}
|
||||
c.conf.prefix = "_rexec"
|
||||
|
||||
id, err := c.createSession()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
c.sessionID = id
|
||||
|
||||
ackCh := make(chan rExecAck, 128)
|
||||
heartCh := make(chan rExecHeart, 128)
|
||||
outputCh := make(chan rExecOutput, 128)
|
||||
exitCh := make(chan rExecExit, 128)
|
||||
doneCh := make(chan struct{})
|
||||
errCh := make(chan struct{}, 1)
|
||||
defer close(doneCh)
|
||||
go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh)
|
||||
|
||||
prefix := "_rexec/" + id + "/"
|
||||
ok, _, err := client.KV().Acquire(&consulapi.KVPair{
|
||||
Key: prefix + "foo/ack",
|
||||
Session: id,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("should be ok bro")
|
||||
}
|
||||
|
||||
select {
|
||||
case a := <-ackCh:
|
||||
if a.Node != "foo" {
|
||||
t.Fatalf("bad: %#v", a)
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
|
||||
Key: prefix + "foo/exit",
|
||||
Value: []byte("127"),
|
||||
Session: id,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("should be ok bro")
|
||||
}
|
||||
|
||||
select {
|
||||
case e := <-exitCh:
|
||||
if e.Node != "foo" || e.Code != 127 {
|
||||
t.Fatalf("bad: %#v", e)
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
// Random key, should ignore
|
||||
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
|
||||
Key: prefix + "foo/random",
|
||||
Session: id,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("should be ok bro")
|
||||
}
|
||||
|
||||
// Output heartbeat
|
||||
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
|
||||
Key: prefix + "foo/out/00000",
|
||||
Session: id,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("should be ok bro")
|
||||
}
|
||||
|
||||
select {
|
||||
case h := <-heartCh:
|
||||
if h.Node != "foo" {
|
||||
t.Fatalf("bad: %#v", h)
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
// Output value
|
||||
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
|
||||
Key: prefix + "foo/out/00001",
|
||||
Value: []byte("test"),
|
||||
Session: id,
|
||||
}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatalf("should be ok bro")
|
||||
}
|
||||
|
||||
select {
|
||||
case o := <-outputCh:
|
||||
if o.Node != "foo" || string(o.Output) != "test" {
|
||||
t.Fatalf("bad: %#v", o)
|
||||
}
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
@ -27,7 +27,13 @@ func HTTPAddrFlag(f *flag.FlagSet) *string {
|
||||
|
||||
// HTTPClient returns a new Consul HTTP client with the given address.
|
||||
func HTTPClient(addr string) (*consulapi.Client, error) {
|
||||
return HTTPClientDC(addr, "")
|
||||
}
|
||||
|
||||
// HTTPClientDC returns a new Consul HTTP client with the given address and datacenter
|
||||
func HTTPClientDC(addr, dc string) (*consulapi.Client, error) {
|
||||
conf := consulapi.DefaultConfig()
|
||||
conf.Address = addr
|
||||
conf.Datacenter = dc
|
||||
return consulapi.NewClient(conf)
|
||||
}
|
||||
|
@ -31,6 +31,13 @@ func init() {
|
||||
}, nil
|
||||
},
|
||||
|
||||
"exec": func() (cli.Command, error) {
|
||||
return &command.ExecCommand{
|
||||
ShutdownCh: makeShutdownCh(),
|
||||
Ui: ui,
|
||||
}, nil
|
||||
},
|
||||
|
||||
"force-leave": func() (cli.Command, error) {
|
||||
return &command.ForceLeaveCommand{
|
||||
Ui: ui,
|
||||
|
@ -1209,7 +1209,8 @@ can be specified using the "?dc=" query parameter.
|
||||
|
||||
The fire endpoint expects a PUT request, with an optional body.
|
||||
The body contents are opaque to Consul, and become the "payload"
|
||||
of the event.
|
||||
of the event. Any names starting with the "_" prefix should be considered
|
||||
reserved, and for Consul's internal use.
|
||||
|
||||
The `?node=`, `?service=`, and `?tag=` query parameters may optionally
|
||||
be provided. They respectively provide a regular expression to filter
|
||||
|
@ -227,6 +227,9 @@ definitions support being updated during a reload.
|
||||
|
||||
* `data_dir` - Equivalent to the `-data-dir` command-line flag.
|
||||
|
||||
* `disable_remote_exec` - Disables support for remote execution. When set to true,
|
||||
the agent will ignore any incoming remote exec requests.
|
||||
|
||||
* `dns_config` - This object allows a number of sub-keys to be set which can tune
|
||||
how DNS queries are perfomed. See this guide on [DNS caching](/docs/guides/dns-cache.html).
|
||||
The following sub-keys are available:
|
||||
|
62
website/source/docs/commands/exec.html.markdown
Normal file
62
website/source/docs/commands/exec.html.markdown
Normal file
@ -0,0 +1,62 @@
|
||||
---
|
||||
layout: "docs"
|
||||
page_title: "Commands: Exec"
|
||||
sidebar_current: "docs-commands-exec"
|
||||
---
|
||||
|
||||
# Consul Exec
|
||||
|
||||
Command: `consul exec`
|
||||
|
||||
The exec command provides a mechahanism for remote execution. For example,
|
||||
this can be used to run the `uptime` command across all machines providing
|
||||
the `web` service.
|
||||
|
||||
Remote execution works by specifying a job which is stored in the KV store.
|
||||
Agent's are informed about the new job using the [event system](/docs/commands/event.html),
|
||||
which propogates messages via the [gossip protocol](/docs/internals/gossip.html).
|
||||
As a result, delivery is best-effort, and there is **no guarantee** of execution.
|
||||
|
||||
While events are purely gossip driven, remote execution relies on the KV store
|
||||
as a message broker. As a result, the `exec` command will not be able to
|
||||
properly function during a Consul outage.
|
||||
|
||||
## Usage
|
||||
|
||||
Usage: `consul exec [options] [-|command...]`
|
||||
|
||||
The only required option is a command to execute. This is either given
|
||||
as trailing arguments, or by specifying '-', stdin will be read to
|
||||
completion as a script to evaluate.
|
||||
|
||||
The list of available flags are:
|
||||
|
||||
* `-http-addr` - Address to the HTTP server of the agent you want to contact
|
||||
to send this command. If this isn't specified, the command will contact
|
||||
"127.0.0.1:8500" which is the default HTTP address of a Consul agent.
|
||||
|
||||
* `-datacenter` - Datacenter to query. Defaults to that of agent. In version
|
||||
0.4, that is the only supported value.
|
||||
|
||||
* `-prefix` - Key prefix in the KV store to use for storing request data.
|
||||
Defaults to "_rexec".
|
||||
|
||||
* `-node` - Regular expression to filter nodes which should evaluate the event.
|
||||
|
||||
* `-service` - Regular expression to filter to only nodes with matching services.
|
||||
|
||||
* `-tag` - Regular expression to filter to only nodes with a service that has
|
||||
a matching tag. This must be used with `-service`. As an example, you may
|
||||
do "-server mysql -tag slave".
|
||||
|
||||
* `-wait` - Specifies the period of time in which no agent's respond before considering
|
||||
the job finished. This is basically the quiescent time required to assume completion.
|
||||
This period is not a hard deadline, and the command will wait longer depending on
|
||||
various heuristics.
|
||||
|
||||
* `-wait-repl` - Period to wait after writing the job specification for replication.
|
||||
This is a heuristic value and enables agents to do a stale read of the job. Defaults
|
||||
to 200msec.
|
||||
|
||||
* `-verbose` - Enables verbose output.
|
||||
|
@ -25,6 +25,8 @@ usage: consul [--version] [--help] <command> [<args>]
|
||||
|
||||
Available commands are:
|
||||
agent Runs a Consul agent
|
||||
event Fire a new event
|
||||
exec Executes a command on Consul nodes
|
||||
force-leave Forces a member of the cluster to enter the "left" state
|
||||
info Provides debugging information for operators
|
||||
join Tell Consul agent to join cluster
|
||||
|
@ -49,6 +49,8 @@ usage: consul [--version] [--help] <command> [<args>]
|
||||
|
||||
Available commands are:
|
||||
agent Runs a Consul agent
|
||||
event Fire a new event
|
||||
exec Executes a command on Consul nodes
|
||||
force-leave Forces a member of the cluster to enter the "left" state
|
||||
info Provides debugging information for operators
|
||||
join Tell Consul agent to join cluster
|
||||
@ -56,6 +58,7 @@ Available commands are:
|
||||
leave Gracefully leaves the Consul cluster and shuts down
|
||||
members Lists the members of a Consul cluster
|
||||
monitor Stream logs from a Consul agent
|
||||
reload Triggers the agent to reload configuration files
|
||||
version Prints the Consul version
|
||||
watch Watch for changes in Consul
|
||||
```
|
||||
|
@ -61,6 +61,10 @@
|
||||
|
||||
<li<%= sidebar_current("docs-commands-event") %>>
|
||||
<a href="/docs/commands/event.html">event</a>
|
||||
</li>
|
||||
|
||||
<li<%= sidebar_current("docs-commands-exec") %>>
|
||||
<a href="/docs/commands/exec.html">exec</a>
|
||||
</li>
|
||||
|
||||
<li<%= sidebar_current("docs-commands-forceleave") %>>
|
||||
|
Loading…
x
Reference in New Issue
Block a user