commands: move exec command to separate pkg

This commit is contained in:
Frank Schroeder 2017-10-11 14:51:22 +02:00 committed by Frank Schröder
parent 85bfd8f339
commit 0ada23f92f
3 changed files with 117 additions and 122 deletions

View File

@ -9,6 +9,7 @@ import (
"syscall"
"github.com/hashicorp/consul/command/event"
execmd "github.com/hashicorp/consul/command/exec"
"github.com/hashicorp/consul/command/join"
"github.com/hashicorp/consul/command/validate"
"github.com/hashicorp/consul/version"
@ -77,13 +78,7 @@ func init() {
},
"exec": func() (cli.Command, error) {
return &ExecCommand{
ShutdownCh: makeShutdownCh(),
BaseCommand: BaseCommand{
Flags: FlagSetHTTP,
UI: ui,
},
}, nil
return execmd.New(ui, makeShutdownCh()), nil
},
"force-leave": func() (cli.Command, error) {

View File

@ -1,8 +1,9 @@
package command
package exec
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"os"
@ -13,7 +14,8 @@ import (
"time"
"unicode"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/mitchellh/cli"
)
@ -117,47 +119,55 @@ type rExecExit struct {
Code int
}
// ExecCommand is a Command implementation that is used to
// do remote execution of commands
type ExecCommand struct {
BaseCommand
func New(ui cli.Ui, shutdownCh <-chan struct{}) *cmd {
c := &cmd{UI: ui, shutdownCh: shutdownCh}
c.initFlags()
return c
}
ShutdownCh <-chan struct{}
type cmd struct {
UI cli.Ui
flags *flag.FlagSet
http *flags.HTTPFlags
shutdownCh <-chan struct{}
conf rExecConf
client *consulapi.Client
apiclient *api.Client
sessionID string
stopCh chan struct{}
}
func (c *ExecCommand) initFlags() {
c.InitFlagSet()
c.FlagSet.StringVar(&c.conf.node, "node", "",
func (c *cmd) initFlags() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
c.flags.StringVar(&c.conf.node, "node", "",
"Regular expression to filter on node names.")
c.FlagSet.StringVar(&c.conf.service, "service", "",
c.flags.StringVar(&c.conf.service, "service", "",
"Regular expression to filter on service instances.")
c.FlagSet.StringVar(&c.conf.tag, "tag", "",
c.flags.StringVar(&c.conf.tag, "tag", "",
"Regular expression to filter on service tags. Must be used with -service.")
c.FlagSet.StringVar(&c.conf.prefix, "prefix", rExecPrefix,
c.flags.StringVar(&c.conf.prefix, "prefix", rExecPrefix,
"Prefix in the KV store to use for request data.")
c.FlagSet.BoolVar(&c.conf.shell, "shell", true,
c.flags.BoolVar(&c.conf.shell, "shell", true,
"Use a shell to run the command.")
c.FlagSet.DurationVar(&c.conf.wait, "wait", rExecQuietWait,
c.flags.DurationVar(&c.conf.wait, "wait", rExecQuietWait,
"Period to wait with no responses before terminating execution.")
c.FlagSet.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait,
"Period to wait for replication before firing event. This is an "+
"optimization to allow stale reads to be performed.")
c.FlagSet.BoolVar(&c.conf.verbose, "verbose", false,
c.flags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait,
"Period to wait for replication before firing event. This is an optimization to allow stale reads to be performed.")
c.flags.BoolVar(&c.conf.verbose, "verbose", false,
"Enables verbose output.")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
flags.Merge(c.flags, c.http.ServerFlags())
}
func (c *ExecCommand) Run(args []string) int {
c.initFlags()
if err := c.FlagSet.Parse(args); err != nil {
func (c *cmd) Run(args []string) int {
if err := c.flags.Parse(args); err != nil {
return 1
}
// Join the commands to execute
c.conf.cmd = strings.Join(c.FlagSet.Args(), " ")
c.conf.cmd = strings.Join(c.flags.Args(), " ")
// If there is no command, read stdin for a script input
if c.conf.cmd == "-" {
@ -178,7 +188,7 @@ func (c *ExecCommand) Run(args []string) int {
c.conf.script = buf.Bytes()
} else if !c.conf.shell {
c.conf.cmd = ""
c.conf.args = c.FlagSet.Args()
c.conf.args = c.flags.Args()
}
// Ensure we have a command or script
@ -196,7 +206,7 @@ func (c *ExecCommand) Run(args []string) int {
}
// Create and test the HTTP client
client, err := c.HTTPClient()
client, err := c.http.APIClient()
if err != nil {
c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1
@ -206,10 +216,10 @@ func (c *ExecCommand) Run(args []string) int {
c.UI.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
return 1
}
c.client = client
c.apiclient = client
// Check if this is a foreign datacenter
if c.HTTPDatacenter() != "" && c.HTTPDatacenter() != info["Config"]["Datacenter"] {
if c.http.Datacenter() != "" && c.http.Datacenter() != info["Config"]["Datacenter"] {
if c.conf.verbose {
c.UI.Info("Remote exec in foreign datacenter, using Session TTL")
}
@ -252,7 +262,7 @@ func (c *ExecCommand) Run(args []string) int {
// largely this is a heuristic.
select {
case <-time.After(c.conf.replWait):
case <-c.ShutdownCh:
case <-c.shutdownCh:
return 1
}
@ -270,8 +280,22 @@ func (c *ExecCommand) Run(args []string) int {
return c.waitForJob()
}
func (c *cmd) Synopsis() string {
return "Executes a command on Consul nodes"
}
func (c *cmd) Help() string {
s := `Usage: consul exec [options] [-|command...]
Evaluates a command on remote Consul nodes. The nodes responding can
be filtered using regular expressions on node name, service, and tag
definitions. If a command is '-', stdin will be read until EOF
and used as a script input. `
return flags.Usage(s, c.flags, c.http.ClientFlags(), c.http.ServerFlags())
}
// waitForJob is used to poll for results and wait until the job is terminated
func (c *ExecCommand) waitForJob() int {
func (c *cmd) waitForJob() int {
// Although the session destroy is already deferred, we do it again here,
// because invalidation of the session before destroyData() ensures there is
// no race condition allowing an agent to upload data (the acquire will fail).
@ -337,7 +361,7 @@ OUTER:
case <-errCh:
return 1
case <-c.ShutdownCh:
case <-c.shutdownCh:
return 1
}
}
@ -350,10 +374,10 @@ OUTER:
// streamResults is used to perform blocking queries against the KV endpoint and stream in
// notice of various events into waitForJob
func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
func (c *cmd) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart,
outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) {
kv := c.client.KV()
opts := consulapi.QueryOptions{WaitTime: c.conf.wait}
kv := c.apiclient.KV()
opts := api.QueryOptions{WaitTime: c.conf.wait}
dir := path.Join(c.conf.prefix, c.sessionID) + "/"
seen := make(map[string]struct{})
@ -465,7 +489,7 @@ func (conf *rExecConf) validate() error {
}
// createSession is used to create a new session for this command
func (c *ExecCommand) createSession() (string, error) {
func (c *cmd) createSession() (string, error) {
var id string
var err error
if c.conf.foreignDC {
@ -482,11 +506,11 @@ func (c *ExecCommand) createSession() (string, error) {
// createSessionLocal is used to create a new session in a local datacenter
// This is simpler since we can use the local agent to create the session.
func (c *ExecCommand) createSessionLocal() (string, error) {
session := c.client.Session()
se := consulapi.SessionEntry{
func (c *cmd) createSessionLocal() (string, error) {
session := c.apiclient.Session()
se := api.SessionEntry{
Name: "Remote Exec",
Behavior: consulapi.SessionBehaviorDelete,
Behavior: api.SessionBehaviorDelete,
TTL: rExecTTL,
}
id, _, err := session.Create(&se, nil)
@ -496,9 +520,9 @@ func (c *ExecCommand) createSessionLocal() (string, error) {
// createSessionLocal is used to create a new session in a foreign datacenter
// This is more complex since the local agent cannot be used to create
// a session, and we must associate with a node in the remote datacenter.
func (c *ExecCommand) createSessionForeign() (string, error) {
func (c *cmd) createSessionForeign() (string, error) {
// Look for a remote node to bind to
health := c.client.Health()
health := c.apiclient.Health()
services, _, err := health.Service("consul", "", true, nil)
if err != nil {
return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err)
@ -508,16 +532,15 @@ func (c *ExecCommand) createSessionForeign() (string, error) {
}
node := services[0].Node.Node
if c.conf.verbose {
c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s",
node, c.HTTPDatacenter()))
c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s", node, c.http.Datacenter()))
}
session := c.client.Session()
se := consulapi.SessionEntry{
session := c.apiclient.Session()
se := api.SessionEntry{
Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC),
Node: node,
Checks: []string{},
Behavior: consulapi.SessionBehaviorDelete,
Behavior: api.SessionBehaviorDelete,
TTL: rExecTTL,
}
id, _, err := session.CreateNoChecks(&se, nil)
@ -527,8 +550,8 @@ func (c *ExecCommand) createSessionForeign() (string, error) {
// renewSession is a long running routine that periodically renews
// the session TTL. This is used for foreign sessions where we depend
// on TTLs.
func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) {
session := c.client.Session()
func (c *cmd) renewSession(id string, stopCh chan struct{}) {
session := c.apiclient.Session()
for {
select {
case <-time.After(rExecRenewInterval):
@ -544,7 +567,7 @@ func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) {
}
// destroySession is used to destroy the associated session
func (c *ExecCommand) destroySession() error {
func (c *cmd) destroySession() error {
// Stop the session renew if any
if c.stopCh != nil {
close(c.stopCh)
@ -552,7 +575,7 @@ func (c *ExecCommand) destroySession() error {
}
// Destroy the session explicitly
session := c.client.Session()
session := c.apiclient.Session()
_, err := session.Destroy(c.sessionID, nil)
return err
}
@ -560,7 +583,7 @@ func (c *ExecCommand) destroySession() error {
// makeRExecSpec creates a serialized job specification
// that can be uploaded which will be parsed by agents to
// determine what to do.
func (c *ExecCommand) makeRExecSpec() ([]byte, error) {
func (c *cmd) makeRExecSpec() ([]byte, error) {
spec := &rExecSpec{
Command: c.conf.cmd,
Args: c.conf.args,
@ -571,9 +594,9 @@ func (c *ExecCommand) makeRExecSpec() ([]byte, error) {
}
// uploadPayload is used to upload the request payload
func (c *ExecCommand) uploadPayload(payload []byte) error {
kv := c.client.KV()
pair := consulapi.KVPair{
func (c *cmd) uploadPayload(payload []byte) error {
kv := c.apiclient.KV()
pair := api.KVPair{
Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName),
Value: payload,
Session: c.sessionID,
@ -591,8 +614,8 @@ func (c *ExecCommand) uploadPayload(payload []byte) error {
// destroyData is used to nuke all the data associated with
// this remote exec. We just do a recursive delete of our
// data directory.
func (c *ExecCommand) destroyData() error {
kv := c.client.KV()
func (c *cmd) destroyData() error {
kv := c.apiclient.KV()
dir := path.Join(c.conf.prefix, c.sessionID)
_, err := kv.DeleteTree(dir, nil)
return err
@ -600,7 +623,7 @@ func (c *ExecCommand) destroyData() error {
// fireEvent is used to fire the event that will notify nodes
// about the remote execution. Returns the event ID or error
func (c *ExecCommand) fireEvent() (string, error) {
func (c *cmd) fireEvent() (string, error) {
// Create the user event payload
msg := &rExecEvent{
Prefix: c.conf.prefix,
@ -612,8 +635,8 @@ func (c *ExecCommand) fireEvent() (string, error) {
}
// Format the user event
event := c.client.Event()
params := &consulapi.UserEvent{
event := c.apiclient.Event()
params := &api.UserEvent{
Name: "_rexec",
Payload: buf,
NodeFilter: c.conf.node,
@ -626,23 +649,6 @@ func (c *ExecCommand) fireEvent() (string, error) {
return id, err
}
func (c *ExecCommand) Synopsis() string {
return "Executes a command on Consul nodes"
}
func (c *ExecCommand) Help() string {
c.initFlags()
return c.HelpCommand(`
Usage: consul exec [options] [-|command...]
Evaluates a command on remote Consul nodes. The nodes responding can
be filtered using regular expressions on node name, service, and tag
definitions. If a command is '-', stdin will be read until EOF
and used as a script input.
`)
}
// TargetedUI is a UI that wraps another UI implementation and modifies
// the output to indicate a specific target. Specifically, all Say output
// is prefixed with the target name. Message output is not prefixed but

View File

@ -1,4 +1,4 @@
package command
package exec
import (
"strings"
@ -11,21 +11,12 @@ import (
"github.com/mitchellh/cli"
)
func testExecCommand(t *testing.T) (*cli.MockUi, *ExecCommand) {
ui := cli.NewMockUi()
return ui, &ExecCommand{
BaseCommand: BaseCommand{
UI: ui,
Flags: FlagSetHTTP,
},
func TestExecCommand_noTabs(t *testing.T) {
if strings.ContainsRune(New(nil, nil).Help(), '\t') {
t.Fatal("usage has tabs")
}
}
func TestExecCommand_implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &ExecCommand{}
}
func TestExecCommandRun(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), `
@ -33,7 +24,8 @@ func TestExecCommandRun(t *testing.T) {
`)
defer a.Shutdown()
ui, c := testExecCommand(t)
ui := cli.NewMockUi()
c := New(ui, nil)
args := []string{"-http-addr=" + a.HTTPAddr(), "-wait=1s", "uptime"}
code := c.Run(args)
@ -53,7 +45,8 @@ func TestExecCommandRun_NoShell(t *testing.T) {
`)
defer a.Shutdown()
ui, c := testExecCommand(t)
ui := cli.NewMockUi()
c := New(ui, nil)
args := []string{"-http-addr=" + a.HTTPAddr(), "-shell=false", "-wait=1s", "uptime"}
code := c.Run(args)
@ -94,7 +87,8 @@ func TestExecCommandRun_CrossDC(t *testing.T) {
}
})
ui, c := testExecCommand(t)
ui := cli.NewMockUi()
c := New(ui, nil)
args := []string{"-http-addr=" + a1.HTTPAddr(), "-wait=500ms", "-datacenter=dc2", "uptime"}
code := c.Run(args)
@ -150,16 +144,16 @@ func TestExecCommand_Sessions(t *testing.T) {
`)
defer a.Shutdown()
client := a.Client()
_, c := testExecCommand(t)
c.client = client
ui := cli.NewMockUi()
c := New(ui, nil)
c.apiclient = a.Client()
id, err := c.createSession()
if err != nil {
t.Fatalf("err: %v", err)
}
se, _, err := client.Session().Info(id, nil)
se, _, err := a.Client().Session().Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -173,7 +167,7 @@ func TestExecCommand_Sessions(t *testing.T) {
t.Fatalf("err: %v", err)
}
se, _, err = client.Session().Info(id, nil)
se, _, err = a.Client().Session().Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -189,9 +183,9 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) {
`)
defer a.Shutdown()
client := a.Client()
_, c := testExecCommand(t)
c.client = client
ui := cli.NewMockUi()
c := New(ui, nil)
c.apiclient = a.Client()
c.conf.foreignDC = true
c.conf.localDC = "dc1"
@ -209,7 +203,7 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) {
}
})
se, _, err := client.Session().Info(id, nil)
se, _, err := a.Client().Session().Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -223,7 +217,7 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) {
t.Fatalf("err: %v", err)
}
se, _, err = client.Session().Info(id, nil)
se, _, err = a.Client().Session().Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -239,9 +233,9 @@ func TestExecCommand_UploadDestroy(t *testing.T) {
`)
defer a.Shutdown()
client := a.Client()
_, c := testExecCommand(t)
c.client = client
ui := cli.NewMockUi()
c := New(ui, nil)
c.apiclient = a.Client()
id, err := c.createSession()
if err != nil {
@ -263,7 +257,7 @@ func TestExecCommand_UploadDestroy(t *testing.T) {
t.Fatalf("err: %v", err)
}
pair, _, err := client.KV().Get("_rexec/"+id+"/job", nil)
pair, _, err := a.Client().KV().Get("_rexec/"+id+"/job", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -277,7 +271,7 @@ func TestExecCommand_UploadDestroy(t *testing.T) {
t.Fatalf("err: %v", err)
}
pair, _, err = client.KV().Get("_rexec/"+id+"/job", nil)
pair, _, err = a.Client().KV().Get("_rexec/"+id+"/job", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -294,9 +288,9 @@ func TestExecCommand_StreamResults(t *testing.T) {
`)
defer a.Shutdown()
client := a.Client()
_, c := testExecCommand(t)
c.client = client
ui := cli.NewMockUi()
c := New(ui, nil)
c.apiclient = a.Client()
c.conf.prefix = "_rexec"
id, err := c.createSession()
@ -315,7 +309,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh)
prefix := "_rexec/" + id + "/"
ok, _, err := client.KV().Acquire(&consulapi.KVPair{
ok, _, err := a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/ack",
Session: id,
}, nil)
@ -335,7 +329,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
t.Fatalf("timeout")
}
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/exit",
Value: []byte("127"),
Session: id,
@ -357,7 +351,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
}
// Random key, should ignore
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/random",
Session: id,
}, nil)
@ -369,7 +363,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
}
// Output heartbeat
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/out/00000",
Session: id,
}, nil)
@ -390,7 +384,7 @@ func TestExecCommand_StreamResults(t *testing.T) {
}
// Output value
ok, _, err = client.KV().Acquire(&consulapi.KVPair{
ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{
Key: prefix + "foo/out/00001",
Value: []byte("test"),
Session: id,