diff --git a/command/commands.go b/command/commands.go index b71e4187f4..2c9a2e5f4e 100644 --- a/command/commands.go +++ b/command/commands.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/hashicorp/consul/command/event" + execmd "github.com/hashicorp/consul/command/exec" "github.com/hashicorp/consul/command/join" "github.com/hashicorp/consul/command/validate" "github.com/hashicorp/consul/version" @@ -77,13 +78,7 @@ func init() { }, "exec": func() (cli.Command, error) { - return &ExecCommand{ - ShutdownCh: makeShutdownCh(), - BaseCommand: BaseCommand{ - Flags: FlagSetHTTP, - UI: ui, - }, - }, nil + return execmd.New(ui, makeShutdownCh()), nil }, "force-leave": func() (cli.Command, error) { diff --git a/command/exec.go b/command/exec/exec.go similarity index 84% rename from command/exec.go rename to command/exec/exec.go index 9d95028a2f..7a997964ca 100644 --- a/command/exec.go +++ b/command/exec/exec.go @@ -1,8 +1,9 @@ -package command +package exec import ( "bytes" "encoding/json" + "flag" "fmt" "io" "os" @@ -13,7 +14,8 @@ import ( "time" "unicode" - consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/command/flags" "github.com/mitchellh/cli" ) @@ -117,47 +119,55 @@ type rExecExit struct { Code int } -// ExecCommand is a Command implementation that is used to -// do remote execution of commands -type ExecCommand struct { - BaseCommand +func New(ui cli.Ui, shutdownCh <-chan struct{}) *cmd { + c := &cmd{UI: ui, shutdownCh: shutdownCh} + c.initFlags() + return c +} - ShutdownCh <-chan struct{} +type cmd struct { + UI cli.Ui + flags *flag.FlagSet + http *flags.HTTPFlags + + shutdownCh <-chan struct{} conf rExecConf - client *consulapi.Client + apiclient *api.Client sessionID string stopCh chan struct{} } -func (c *ExecCommand) initFlags() { - c.InitFlagSet() - c.FlagSet.StringVar(&c.conf.node, "node", "", +func (c *cmd) initFlags() { + c.flags = flag.NewFlagSet("", flag.ContinueOnError) + c.flags.StringVar(&c.conf.node, "node", "", "Regular expression to filter on node names.") - c.FlagSet.StringVar(&c.conf.service, "service", "", + c.flags.StringVar(&c.conf.service, "service", "", "Regular expression to filter on service instances.") - c.FlagSet.StringVar(&c.conf.tag, "tag", "", + c.flags.StringVar(&c.conf.tag, "tag", "", "Regular expression to filter on service tags. Must be used with -service.") - c.FlagSet.StringVar(&c.conf.prefix, "prefix", rExecPrefix, + c.flags.StringVar(&c.conf.prefix, "prefix", rExecPrefix, "Prefix in the KV store to use for request data.") - c.FlagSet.BoolVar(&c.conf.shell, "shell", true, + c.flags.BoolVar(&c.conf.shell, "shell", true, "Use a shell to run the command.") - c.FlagSet.DurationVar(&c.conf.wait, "wait", rExecQuietWait, + c.flags.DurationVar(&c.conf.wait, "wait", rExecQuietWait, "Period to wait with no responses before terminating execution.") - c.FlagSet.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, - "Period to wait for replication before firing event. This is an "+ - "optimization to allow stale reads to be performed.") - c.FlagSet.BoolVar(&c.conf.verbose, "verbose", false, + c.flags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, + "Period to wait for replication before firing event. This is an optimization to allow stale reads to be performed.") + c.flags.BoolVar(&c.conf.verbose, "verbose", false, "Enables verbose output.") + + c.http = &flags.HTTPFlags{} + flags.Merge(c.flags, c.http.ClientFlags()) + flags.Merge(c.flags, c.http.ServerFlags()) } -func (c *ExecCommand) Run(args []string) int { - c.initFlags() - if err := c.FlagSet.Parse(args); err != nil { +func (c *cmd) Run(args []string) int { + if err := c.flags.Parse(args); err != nil { return 1 } // Join the commands to execute - c.conf.cmd = strings.Join(c.FlagSet.Args(), " ") + c.conf.cmd = strings.Join(c.flags.Args(), " ") // If there is no command, read stdin for a script input if c.conf.cmd == "-" { @@ -178,7 +188,7 @@ func (c *ExecCommand) Run(args []string) int { c.conf.script = buf.Bytes() } else if !c.conf.shell { c.conf.cmd = "" - c.conf.args = c.FlagSet.Args() + c.conf.args = c.flags.Args() } // Ensure we have a command or script @@ -196,7 +206,7 @@ func (c *ExecCommand) Run(args []string) int { } // Create and test the HTTP client - client, err := c.HTTPClient() + client, err := c.http.APIClient() if err != nil { c.UI.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) return 1 @@ -206,10 +216,10 @@ func (c *ExecCommand) Run(args []string) int { c.UI.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) return 1 } - c.client = client + c.apiclient = client // Check if this is a foreign datacenter - if c.HTTPDatacenter() != "" && c.HTTPDatacenter() != info["Config"]["Datacenter"] { + if c.http.Datacenter() != "" && c.http.Datacenter() != info["Config"]["Datacenter"] { if c.conf.verbose { c.UI.Info("Remote exec in foreign datacenter, using Session TTL") } @@ -252,7 +262,7 @@ func (c *ExecCommand) Run(args []string) int { // largely this is a heuristic. select { case <-time.After(c.conf.replWait): - case <-c.ShutdownCh: + case <-c.shutdownCh: return 1 } @@ -270,8 +280,22 @@ func (c *ExecCommand) Run(args []string) int { return c.waitForJob() } +func (c *cmd) Synopsis() string { + return "Executes a command on Consul nodes" +} + +func (c *cmd) Help() string { + s := `Usage: consul exec [options] [-|command...] + + Evaluates a command on remote Consul nodes. The nodes responding can + be filtered using regular expressions on node name, service, and tag + definitions. If a command is '-', stdin will be read until EOF + and used as a script input. ` + return flags.Usage(s, c.flags, c.http.ClientFlags(), c.http.ServerFlags()) +} + // waitForJob is used to poll for results and wait until the job is terminated -func (c *ExecCommand) waitForJob() int { +func (c *cmd) waitForJob() int { // Although the session destroy is already deferred, we do it again here, // because invalidation of the session before destroyData() ensures there is // no race condition allowing an agent to upload data (the acquire will fail). @@ -337,7 +361,7 @@ OUTER: case <-errCh: return 1 - case <-c.ShutdownCh: + case <-c.shutdownCh: return 1 } } @@ -350,10 +374,10 @@ OUTER: // streamResults is used to perform blocking queries against the KV endpoint and stream in // notice of various events into waitForJob -func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart, +func (c *cmd) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart, outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) { - kv := c.client.KV() - opts := consulapi.QueryOptions{WaitTime: c.conf.wait} + kv := c.apiclient.KV() + opts := api.QueryOptions{WaitTime: c.conf.wait} dir := path.Join(c.conf.prefix, c.sessionID) + "/" seen := make(map[string]struct{}) @@ -465,7 +489,7 @@ func (conf *rExecConf) validate() error { } // createSession is used to create a new session for this command -func (c *ExecCommand) createSession() (string, error) { +func (c *cmd) createSession() (string, error) { var id string var err error if c.conf.foreignDC { @@ -482,11 +506,11 @@ func (c *ExecCommand) createSession() (string, error) { // createSessionLocal is used to create a new session in a local datacenter // This is simpler since we can use the local agent to create the session. -func (c *ExecCommand) createSessionLocal() (string, error) { - session := c.client.Session() - se := consulapi.SessionEntry{ +func (c *cmd) createSessionLocal() (string, error) { + session := c.apiclient.Session() + se := api.SessionEntry{ Name: "Remote Exec", - Behavior: consulapi.SessionBehaviorDelete, + Behavior: api.SessionBehaviorDelete, TTL: rExecTTL, } id, _, err := session.Create(&se, nil) @@ -496,9 +520,9 @@ func (c *ExecCommand) createSessionLocal() (string, error) { // createSessionLocal is used to create a new session in a foreign datacenter // This is more complex since the local agent cannot be used to create // a session, and we must associate with a node in the remote datacenter. -func (c *ExecCommand) createSessionForeign() (string, error) { +func (c *cmd) createSessionForeign() (string, error) { // Look for a remote node to bind to - health := c.client.Health() + health := c.apiclient.Health() services, _, err := health.Service("consul", "", true, nil) if err != nil { return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err) @@ -508,16 +532,15 @@ func (c *ExecCommand) createSessionForeign() (string, error) { } node := services[0].Node.Node if c.conf.verbose { - c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s", - node, c.HTTPDatacenter())) + c.UI.Info(fmt.Sprintf("Binding session to remote node %s@%s", node, c.http.Datacenter())) } - session := c.client.Session() - se := consulapi.SessionEntry{ + session := c.apiclient.Session() + se := api.SessionEntry{ Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC), Node: node, Checks: []string{}, - Behavior: consulapi.SessionBehaviorDelete, + Behavior: api.SessionBehaviorDelete, TTL: rExecTTL, } id, _, err := session.CreateNoChecks(&se, nil) @@ -527,8 +550,8 @@ func (c *ExecCommand) createSessionForeign() (string, error) { // renewSession is a long running routine that periodically renews // the session TTL. This is used for foreign sessions where we depend // on TTLs. -func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) { - session := c.client.Session() +func (c *cmd) renewSession(id string, stopCh chan struct{}) { + session := c.apiclient.Session() for { select { case <-time.After(rExecRenewInterval): @@ -544,7 +567,7 @@ func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) { } // destroySession is used to destroy the associated session -func (c *ExecCommand) destroySession() error { +func (c *cmd) destroySession() error { // Stop the session renew if any if c.stopCh != nil { close(c.stopCh) @@ -552,7 +575,7 @@ func (c *ExecCommand) destroySession() error { } // Destroy the session explicitly - session := c.client.Session() + session := c.apiclient.Session() _, err := session.Destroy(c.sessionID, nil) return err } @@ -560,7 +583,7 @@ func (c *ExecCommand) destroySession() error { // makeRExecSpec creates a serialized job specification // that can be uploaded which will be parsed by agents to // determine what to do. -func (c *ExecCommand) makeRExecSpec() ([]byte, error) { +func (c *cmd) makeRExecSpec() ([]byte, error) { spec := &rExecSpec{ Command: c.conf.cmd, Args: c.conf.args, @@ -571,9 +594,9 @@ func (c *ExecCommand) makeRExecSpec() ([]byte, error) { } // uploadPayload is used to upload the request payload -func (c *ExecCommand) uploadPayload(payload []byte) error { - kv := c.client.KV() - pair := consulapi.KVPair{ +func (c *cmd) uploadPayload(payload []byte) error { + kv := c.apiclient.KV() + pair := api.KVPair{ Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName), Value: payload, Session: c.sessionID, @@ -591,8 +614,8 @@ func (c *ExecCommand) uploadPayload(payload []byte) error { // destroyData is used to nuke all the data associated with // this remote exec. We just do a recursive delete of our // data directory. -func (c *ExecCommand) destroyData() error { - kv := c.client.KV() +func (c *cmd) destroyData() error { + kv := c.apiclient.KV() dir := path.Join(c.conf.prefix, c.sessionID) _, err := kv.DeleteTree(dir, nil) return err @@ -600,7 +623,7 @@ func (c *ExecCommand) destroyData() error { // fireEvent is used to fire the event that will notify nodes // about the remote execution. Returns the event ID or error -func (c *ExecCommand) fireEvent() (string, error) { +func (c *cmd) fireEvent() (string, error) { // Create the user event payload msg := &rExecEvent{ Prefix: c.conf.prefix, @@ -612,8 +635,8 @@ func (c *ExecCommand) fireEvent() (string, error) { } // Format the user event - event := c.client.Event() - params := &consulapi.UserEvent{ + event := c.apiclient.Event() + params := &api.UserEvent{ Name: "_rexec", Payload: buf, NodeFilter: c.conf.node, @@ -626,23 +649,6 @@ func (c *ExecCommand) fireEvent() (string, error) { return id, err } -func (c *ExecCommand) Synopsis() string { - return "Executes a command on Consul nodes" -} - -func (c *ExecCommand) Help() string { - c.initFlags() - return c.HelpCommand(` -Usage: consul exec [options] [-|command...] - - Evaluates a command on remote Consul nodes. The nodes responding can - be filtered using regular expressions on node name, service, and tag - definitions. If a command is '-', stdin will be read until EOF - and used as a script input. - -`) -} - // TargetedUI is a UI that wraps another UI implementation and modifies // the output to indicate a specific target. Specifically, all Say output // is prefixed with the target name. Message output is not prefixed but diff --git a/command/exec_test.go b/command/exec/exec_test.go similarity index 85% rename from command/exec_test.go rename to command/exec/exec_test.go index aa5b633d6c..c477d94be9 100644 --- a/command/exec_test.go +++ b/command/exec/exec_test.go @@ -1,4 +1,4 @@ -package command +package exec import ( "strings" @@ -11,21 +11,12 @@ import ( "github.com/mitchellh/cli" ) -func testExecCommand(t *testing.T) (*cli.MockUi, *ExecCommand) { - ui := cli.NewMockUi() - return ui, &ExecCommand{ - BaseCommand: BaseCommand{ - UI: ui, - Flags: FlagSetHTTP, - }, +func TestExecCommand_noTabs(t *testing.T) { + if strings.ContainsRune(New(nil, nil).Help(), '\t') { + t.Fatal("usage has tabs") } } -func TestExecCommand_implements(t *testing.T) { - t.Parallel() - var _ cli.Command = &ExecCommand{} -} - func TestExecCommandRun(t *testing.T) { t.Parallel() a := agent.NewTestAgent(t.Name(), ` @@ -33,7 +24,8 @@ func TestExecCommandRun(t *testing.T) { `) defer a.Shutdown() - ui, c := testExecCommand(t) + ui := cli.NewMockUi() + c := New(ui, nil) args := []string{"-http-addr=" + a.HTTPAddr(), "-wait=1s", "uptime"} code := c.Run(args) @@ -53,7 +45,8 @@ func TestExecCommandRun_NoShell(t *testing.T) { `) defer a.Shutdown() - ui, c := testExecCommand(t) + ui := cli.NewMockUi() + c := New(ui, nil) args := []string{"-http-addr=" + a.HTTPAddr(), "-shell=false", "-wait=1s", "uptime"} code := c.Run(args) @@ -94,7 +87,8 @@ func TestExecCommandRun_CrossDC(t *testing.T) { } }) - ui, c := testExecCommand(t) + ui := cli.NewMockUi() + c := New(ui, nil) args := []string{"-http-addr=" + a1.HTTPAddr(), "-wait=500ms", "-datacenter=dc2", "uptime"} code := c.Run(args) @@ -150,16 +144,16 @@ func TestExecCommand_Sessions(t *testing.T) { `) defer a.Shutdown() - client := a.Client() - _, c := testExecCommand(t) - c.client = client + ui := cli.NewMockUi() + c := New(ui, nil) + c.apiclient = a.Client() id, err := c.createSession() if err != nil { t.Fatalf("err: %v", err) } - se, _, err := client.Session().Info(id, nil) + se, _, err := a.Client().Session().Info(id, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -173,7 +167,7 @@ func TestExecCommand_Sessions(t *testing.T) { t.Fatalf("err: %v", err) } - se, _, err = client.Session().Info(id, nil) + se, _, err = a.Client().Session().Info(id, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -189,9 +183,9 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) { `) defer a.Shutdown() - client := a.Client() - _, c := testExecCommand(t) - c.client = client + ui := cli.NewMockUi() + c := New(ui, nil) + c.apiclient = a.Client() c.conf.foreignDC = true c.conf.localDC = "dc1" @@ -209,7 +203,7 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) { } }) - se, _, err := client.Session().Info(id, nil) + se, _, err := a.Client().Session().Info(id, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -223,7 +217,7 @@ func TestExecCommand_Sessions_Foreign(t *testing.T) { t.Fatalf("err: %v", err) } - se, _, err = client.Session().Info(id, nil) + se, _, err = a.Client().Session().Info(id, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -239,9 +233,9 @@ func TestExecCommand_UploadDestroy(t *testing.T) { `) defer a.Shutdown() - client := a.Client() - _, c := testExecCommand(t) - c.client = client + ui := cli.NewMockUi() + c := New(ui, nil) + c.apiclient = a.Client() id, err := c.createSession() if err != nil { @@ -263,7 +257,7 @@ func TestExecCommand_UploadDestroy(t *testing.T) { t.Fatalf("err: %v", err) } - pair, _, err := client.KV().Get("_rexec/"+id+"/job", nil) + pair, _, err := a.Client().KV().Get("_rexec/"+id+"/job", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -277,7 +271,7 @@ func TestExecCommand_UploadDestroy(t *testing.T) { t.Fatalf("err: %v", err) } - pair, _, err = client.KV().Get("_rexec/"+id+"/job", nil) + pair, _, err = a.Client().KV().Get("_rexec/"+id+"/job", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -294,9 +288,9 @@ func TestExecCommand_StreamResults(t *testing.T) { `) defer a.Shutdown() - client := a.Client() - _, c := testExecCommand(t) - c.client = client + ui := cli.NewMockUi() + c := New(ui, nil) + c.apiclient = a.Client() c.conf.prefix = "_rexec" id, err := c.createSession() @@ -315,7 +309,7 @@ func TestExecCommand_StreamResults(t *testing.T) { go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh) prefix := "_rexec/" + id + "/" - ok, _, err := client.KV().Acquire(&consulapi.KVPair{ + ok, _, err := a.Client().KV().Acquire(&consulapi.KVPair{ Key: prefix + "foo/ack", Session: id, }, nil) @@ -335,7 +329,7 @@ func TestExecCommand_StreamResults(t *testing.T) { t.Fatalf("timeout") } - ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{ Key: prefix + "foo/exit", Value: []byte("127"), Session: id, @@ -357,7 +351,7 @@ func TestExecCommand_StreamResults(t *testing.T) { } // Random key, should ignore - ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{ Key: prefix + "foo/random", Session: id, }, nil) @@ -369,7 +363,7 @@ func TestExecCommand_StreamResults(t *testing.T) { } // Output heartbeat - ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{ Key: prefix + "foo/out/00000", Session: id, }, nil) @@ -390,7 +384,7 @@ func TestExecCommand_StreamResults(t *testing.T) { } // Output value - ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + ok, _, err = a.Client().KV().Acquire(&consulapi.KVPair{ Key: prefix + "foo/out/00001", Value: []byte("test"), Session: id,