mirror of
https://github.com/status-im/consul.git
synced 2025-01-13 07:14:37 +00:00
command/exec: First pass at exec command
This commit is contained in:
parent
86a1a3a11e
commit
096e6fc886
488
command/exec.go
Normal file
488
command/exec.go
Normal file
@ -0,0 +1,488 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/armon/consul-api"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
const (
|
||||
// rExecFileName is the name of the file we append to
|
||||
// the path, e.g. _rexec/session_id/job
|
||||
rExecFileName = "job"
|
||||
|
||||
// rExecAck is the suffix added to an ack path
|
||||
rExecAckSuffix = "/ack"
|
||||
|
||||
// rExecAck is the suffix added to an exit code
|
||||
rExecExitSuffix = "/exit"
|
||||
|
||||
// rExecOutputDivider is used to namespace the output
|
||||
rExecOutputDivider = "/out/"
|
||||
)
|
||||
|
||||
// rExecConf is used to pass around configuration
|
||||
type rExecConf struct {
|
||||
datacenter string
|
||||
prefix string
|
||||
|
||||
node string
|
||||
service string
|
||||
tag string
|
||||
|
||||
wait time.Duration
|
||||
replWait time.Duration
|
||||
|
||||
cmd string
|
||||
script []byte
|
||||
|
||||
verbose bool
|
||||
}
|
||||
|
||||
// rExecEvent is the event we broadcast using a user-event
|
||||
type rExecEvent struct {
|
||||
Prefix string
|
||||
Session string
|
||||
}
|
||||
|
||||
// rExecSpec is the file we upload to specify the parameters
|
||||
// of the remote execution.
|
||||
type rExecSpec struct {
|
||||
// Command is a single command to run directly in the shell
|
||||
Command string `json:",omitempty"`
|
||||
|
||||
// Script should be spilled to a file and executed
|
||||
Script []byte `json:",omitempty"`
|
||||
|
||||
// Wait is how long we are waiting on a quiet period to terminate
|
||||
Wait time.Duration
|
||||
}
|
||||
|
||||
// rExecAck is used to transmit an acknowledgement
|
||||
type rExecAck struct {
|
||||
Node string
|
||||
}
|
||||
|
||||
// rExecHeart is used to transmit a heartbeat
|
||||
type rExecHeart struct {
|
||||
Node string
|
||||
}
|
||||
|
||||
// rExecOutput is used to transmit a chunk of output
|
||||
type rExecOutput struct {
|
||||
Node string
|
||||
Output []byte
|
||||
}
|
||||
|
||||
// rExecExit is used to transmit an exit code
|
||||
type rExecExit struct {
|
||||
Node string
|
||||
Code int
|
||||
}
|
||||
|
||||
// ExecCommand is a Command implementation that is used to
|
||||
// do remote execution of commands
|
||||
type ExecCommand struct {
|
||||
ShutdownCh chan struct{}
|
||||
Ui cli.Ui
|
||||
conf rExecConf
|
||||
client *consulapi.Client
|
||||
sessionID string
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Run(args []string) int {
|
||||
cmdFlags := flag.NewFlagSet("exec", flag.ContinueOnError)
|
||||
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
cmdFlags.StringVar(&c.conf.datacenter, "datacenter", "", "")
|
||||
cmdFlags.StringVar(&c.conf.node, "node", "", "")
|
||||
cmdFlags.StringVar(&c.conf.service, "service", "", "")
|
||||
cmdFlags.StringVar(&c.conf.tag, "tag", "", "")
|
||||
cmdFlags.StringVar(&c.conf.prefix, "prefix", "_rexec", "")
|
||||
cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", 100*time.Millisecond, "")
|
||||
cmdFlags.DurationVar(&c.conf.wait, "wait", time.Second, "")
|
||||
cmdFlags.BoolVar(&c.conf.verbose, "v", false, "")
|
||||
httpAddr := HTTPAddrFlag(cmdFlags)
|
||||
if err := cmdFlags.Parse(args); err != nil {
|
||||
return 1
|
||||
}
|
||||
|
||||
// Join the commands to execute
|
||||
c.conf.cmd = strings.Join(cmdFlags.Args(), " ")
|
||||
|
||||
// If there is no command, read stdin for a script input
|
||||
if c.conf.cmd == "" {
|
||||
var buf bytes.Buffer
|
||||
_, err := io.Copy(&buf, os.Stdin)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read stdin: %v", err))
|
||||
c.Ui.Error("")
|
||||
c.Ui.Error(c.Help())
|
||||
return 1
|
||||
}
|
||||
c.conf.script = buf.Bytes()
|
||||
}
|
||||
|
||||
// Ensure we have a command or script
|
||||
if c.conf.cmd == "" && len(c.conf.script) == 0 {
|
||||
c.Ui.Error("Must specify a command to execute")
|
||||
c.Ui.Error("")
|
||||
c.Ui.Error(c.Help())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Validate the configuration
|
||||
if err := c.conf.validate(); err != nil {
|
||||
c.Ui.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Create and test the HTTP client
|
||||
client, err := HTTPClientDC(*httpAddr, c.conf.datacenter)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
_, err = client.Agent().NodeName()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
|
||||
return 1
|
||||
}
|
||||
c.client = client
|
||||
|
||||
// Create the job spec
|
||||
spec, err := c.makeRExecSpec()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to create job spec: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Create a session for this
|
||||
c.sessionID, err = c.createSession()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to create session: %s", err))
|
||||
return 1
|
||||
}
|
||||
defer c.destroySession()
|
||||
|
||||
// Upload the payload
|
||||
if err := c.uploadPayload(spec); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to create job file: %s", err))
|
||||
return 1
|
||||
}
|
||||
defer c.destroyData()
|
||||
|
||||
// Wait for replication. This is done so that when the event is
|
||||
// received, the job file can be read using a stale read. If the
|
||||
// stale read fails, we expect a consistent read to be done, so
|
||||
// largely this is a heuristic.
|
||||
select {
|
||||
case <-time.After(c.conf.replWait):
|
||||
case <-c.ShutdownCh:
|
||||
return 1
|
||||
}
|
||||
|
||||
// Fire the event
|
||||
id, err := c.fireEvent()
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to fire event: %s", err))
|
||||
return 1
|
||||
}
|
||||
if c.conf.verbose {
|
||||
c.Ui.Output(fmt.Sprintf("Fired remote execution event. ID: %s", id))
|
||||
}
|
||||
|
||||
// Wait for the job to finish now
|
||||
return c.waitForJob()
|
||||
}
|
||||
|
||||
// waitForJob is used to poll for results and wait until the job is terminated
|
||||
func (c *ExecCommand) waitForJob() int {
|
||||
// Although the session destroy is already deferred, we do it again here,
|
||||
// because invalidation of the session before destroyData() ensures there is
|
||||
// no race condition allowing an agent to upload data (the acquire will fail).
|
||||
defer c.destroySession()
|
||||
start := time.Now()
|
||||
ackCh := make(chan rExecAck, 128)
|
||||
heartCh := make(chan rExecHeart, 128)
|
||||
outputCh := make(chan rExecOutput, 128)
|
||||
exitCh := make(chan rExecExit, 128)
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh)
|
||||
var ackCount, exitCount int
|
||||
OUTER:
|
||||
for {
|
||||
// Determine wait time. We provide a larger window if we know about
|
||||
// nodes which are still working.
|
||||
waitIntv := c.conf.wait
|
||||
if ackCount > exitCount {
|
||||
waitIntv *= 4
|
||||
}
|
||||
|
||||
select {
|
||||
case e := <-ackCh:
|
||||
ackCount++
|
||||
c.Ui.Output(fmt.Sprintf("Node %s: acknowledged event", e.Node))
|
||||
|
||||
case e := <-outputCh:
|
||||
c.Ui.Output(fmt.Sprintf("Node %s: %s", e.Node, e.Output))
|
||||
|
||||
case e := <-exitCh:
|
||||
exitCount++
|
||||
c.Ui.Output(fmt.Sprintf("Node %s: exited with code %d", e.Node, e.Code))
|
||||
|
||||
case <-time.After(waitIntv):
|
||||
c.Ui.Output(fmt.Sprintf("%d nodes completed, %d nodes acknowledged", exitCount, ackCount))
|
||||
c.Ui.Output(fmt.Sprintf("Exec complete in %0.2f seconds",
|
||||
float64(time.Now().Sub(start))/float64(time.Second)))
|
||||
break OUTER
|
||||
|
||||
case <-c.ShutdownCh:
|
||||
return 1
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// streamResults is used to perform blocking queries against the KV endpoint and stream in
|
||||
// notice of various events into waitForJob
|
||||
func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
|
||||
outputCh chan rExecOutput, exitCh chan rExecExit) {
|
||||
kv := c.client.KV()
|
||||
opts := consulapi.QueryOptions{WaitTime: c.conf.wait}
|
||||
dir := path.Join(c.conf.prefix, c.sessionID) + "/"
|
||||
seen := make(map[string]struct{})
|
||||
|
||||
for {
|
||||
// Check if we've been signaled to exit
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Block on waiting for new keys
|
||||
keys, qm, err := kv.Keys(dir, "", &opts)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read results: %s", err))
|
||||
goto ERR_EXIT
|
||||
}
|
||||
|
||||
// Fast-path the no-change case
|
||||
if qm.LastIndex == opts.WaitIndex {
|
||||
continue
|
||||
}
|
||||
opts.WaitIndex = qm.LastIndex
|
||||
|
||||
// Handle each key
|
||||
for _, key := range keys {
|
||||
// Ignore if we've seen it
|
||||
if _, ok := seen[key]; ok {
|
||||
continue
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
|
||||
// Trim the directory
|
||||
full := key
|
||||
key = strings.TrimPrefix(key, dir)
|
||||
|
||||
// Handle the key type
|
||||
switch {
|
||||
case key == rExecFileName:
|
||||
continue
|
||||
case strings.HasSuffix(key, rExecAckSuffix):
|
||||
ackCh <- rExecAck{Node: strings.TrimSuffix(key, rExecAckSuffix)}
|
||||
|
||||
case strings.HasSuffix(key, rExecExitSuffix):
|
||||
pair, _, err := kv.Get(full, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
|
||||
continue
|
||||
}
|
||||
code, err := strconv.ParseInt(string(pair.Value), 10, 32)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to parse exit code '%s': %v", pair.Value, err))
|
||||
continue
|
||||
}
|
||||
exitCh <- rExecExit{
|
||||
Node: strings.TrimSuffix(key, rExecExitSuffix),
|
||||
Code: int(code),
|
||||
}
|
||||
|
||||
case strings.LastIndex(key, rExecOutputDivider) != -1:
|
||||
pair, _, err := kv.Get(full, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err))
|
||||
continue
|
||||
}
|
||||
idx := strings.LastIndex(key, rExecOutputDivider)
|
||||
node := key[:idx]
|
||||
if len(pair.Value) == 0 {
|
||||
heartCh <- rExecHeart{Node: node}
|
||||
} else {
|
||||
outputCh <- rExecOutput{Node: node, Output: pair.Value}
|
||||
}
|
||||
|
||||
default:
|
||||
c.Ui.Error(fmt.Sprintf("Unknown key '%s', ignoring.", key))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
ERR_EXIT:
|
||||
select {
|
||||
case c.ShutdownCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// validate checks that the configuration is sane
|
||||
func (conf *rExecConf) validate() error {
|
||||
// Validate the filters
|
||||
if conf.node != "" {
|
||||
if _, err := regexp.Compile(conf.node); err != nil {
|
||||
return fmt.Errorf("Failed to compile node filter regexp: %v", err)
|
||||
}
|
||||
}
|
||||
if conf.service != "" {
|
||||
if _, err := regexp.Compile(conf.service); err != nil {
|
||||
return fmt.Errorf("Failed to compile service filter regexp: %v", err)
|
||||
}
|
||||
}
|
||||
if conf.tag != "" {
|
||||
if _, err := regexp.Compile(conf.tag); err != nil {
|
||||
return fmt.Errorf("Failed to compile tag filter regexp: %v", err)
|
||||
}
|
||||
}
|
||||
if conf.tag != "" && conf.service == "" {
|
||||
return fmt.Errorf("Cannot provide tag filter without service filter.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createSession is used to create a new session for this command
|
||||
func (c *ExecCommand) createSession() (string, error) {
|
||||
session := c.client.Session()
|
||||
se := consulapi.SessionEntry{
|
||||
Name: "Remote Exec",
|
||||
}
|
||||
id, _, err := session.Create(&se, nil)
|
||||
return id, err
|
||||
}
|
||||
|
||||
// destroySession is used to destroy the associated session
|
||||
func (c *ExecCommand) destroySession() error {
|
||||
session := c.client.Session()
|
||||
_, err := session.Destroy(c.sessionID, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// makeRExecSpec creates a serialized job specification
|
||||
// that can be uploaded which will be parsed by agents to
|
||||
// determine what to do.
|
||||
func (c *ExecCommand) makeRExecSpec() ([]byte, error) {
|
||||
spec := &rExecSpec{
|
||||
Command: c.conf.cmd,
|
||||
Script: c.conf.script,
|
||||
Wait: c.conf.wait,
|
||||
}
|
||||
return json.Marshal(spec)
|
||||
}
|
||||
|
||||
// uploadPayload is used to upload the request payload
|
||||
func (c *ExecCommand) uploadPayload(payload []byte) error {
|
||||
kv := c.client.KV()
|
||||
pair := consulapi.KVPair{
|
||||
Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName),
|
||||
Value: payload,
|
||||
Session: c.sessionID,
|
||||
}
|
||||
ok, _, err := kv.Acquire(&pair, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to acquire key %s", pair.Key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// destroyData is used to nuke all the data associated with
|
||||
// this remote exec. We just do a recursive delete of our
|
||||
// data directory.
|
||||
func (c *ExecCommand) destroyData() error {
|
||||
kv := c.client.KV()
|
||||
dir := path.Join(c.conf.prefix, c.sessionID)
|
||||
_, err := kv.DeleteTree(dir, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// fireEvent is used to fire the event that will notify nodes
|
||||
// about the remote execution. Returns the event ID or error
|
||||
func (c *ExecCommand) fireEvent() (string, error) {
|
||||
// Create the user event payload
|
||||
msg := &rExecEvent{
|
||||
Prefix: c.conf.prefix,
|
||||
Session: c.sessionID,
|
||||
}
|
||||
buf, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Format the user event
|
||||
event := c.client.Event()
|
||||
params := &consulapi.UserEvent{
|
||||
Name: "_rexec",
|
||||
Payload: buf,
|
||||
NodeFilter: c.conf.node,
|
||||
ServiceFilter: c.conf.service,
|
||||
TagFilter: c.conf.tag,
|
||||
}
|
||||
|
||||
// Fire the event
|
||||
id, _, err := event.Fire(params, nil)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Synopsis() string {
|
||||
return "Executes a command on Consul nodes"
|
||||
}
|
||||
|
||||
func (c *ExecCommand) Help() string {
|
||||
helpText := `
|
||||
Usage: consul exec [options] [command...]
|
||||
|
||||
Evaluates a command on remote Consul nodes. The nodes responding can
|
||||
be filtered using regular expressions on node name, service, and tag
|
||||
definitions. If a command is not provided, stdin will be read until EOF
|
||||
and used as a script input.
|
||||
|
||||
Options:
|
||||
|
||||
-http-addr=127.0.0.1:8500 HTTP address of the Consul agent.
|
||||
-datacenter="" Datacenter to dispatch in. Defaults to that of agent.
|
||||
-prefix="_rexec" Prefix in the KV store to use for request data
|
||||
-node="" Regular expression to filter on node names
|
||||
-service="" Regular expression to filter on service instances
|
||||
-tag="" Regular expression to filter on service tags. Must be used
|
||||
with -service.
|
||||
-wait=1s Period to wait with no responses before terminating execution.
|
||||
-wait-repl=100ms Period to wait for replication before firing event. This is an
|
||||
optimization to allow stale reads to be performed.
|
||||
-v Enables verbose output
|
||||
`
|
||||
return strings.TrimSpace(helpText)
|
||||
}
|
@ -31,6 +31,13 @@ func init() {
|
||||
}, nil
|
||||
},
|
||||
|
||||
"exec": func() (cli.Command, error) {
|
||||
return &command.ExecCommand{
|
||||
ShutdownCh: makeShutdownCh(),
|
||||
Ui: ui,
|
||||
}, nil
|
||||
},
|
||||
|
||||
"force-leave": func() (cli.Command, error) {
|
||||
return &command.ForceLeaveCommand{
|
||||
Ui: ui,
|
||||
|
Loading…
x
Reference in New Issue
Block a user