From 89f258ca2ece94602dc703c8b773ebd08c4981f2 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 29 Aug 2014 11:24:01 -0700 Subject: [PATCH 01/16] agent: Adding remote exec configuration --- command/agent/config.go | 7 +++++++ command/agent/config_test.go | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/command/agent/config.go b/command/agent/config.go index f91853cdde..efb15e9757 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -235,6 +235,10 @@ type Config struct { // agent layer using the standard APIs. Watches []map[string]interface{} `mapstructure:"watches"` + // DisableRemoteExec is used to turn off the remote execution + // feature. This is for security to prevent unknown scripts from running. + DisableRemoteExec bool `mapstructure:"disable_remote_exec"` + // AEInterval controls the anti-entropy interval. This is how often // the agent attempts to reconcile it's local state with the server' // representation of our state. Defaults to every 60s. @@ -676,6 +680,9 @@ func MergeConfig(a, b *Config) *Config { if len(b.WatchPlans) != 0 { result.WatchPlans = append(result.WatchPlans, b.WatchPlans...) } + if b.DisableRemoteExec { + result.DisableRemoteExec = true + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 75c0610a9e..bf939a6981 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -405,6 +405,17 @@ func TestDecodeConfig(t *testing.T) { if !reflect.DeepEqual(out, exp) { t.Fatalf("bad: %#v", config) } + + // remote exec + input = `{"disable_remote_exec": true}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if !config.DisableRemoteExec { + t.Fatalf("bad: %#v", config) + } } func TestDecodeConfig_Service(t *testing.T) { @@ -566,6 +577,7 @@ func TestMergeConfig(t *testing.T) { "handler": "foobar", }, }, + DisableRemoteExec: true, } c := MergeConfig(a, b) From 6a6885ec183d42fdedb3f20a4569f769a5e63bed Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 29 Aug 2014 11:24:41 -0700 Subject: [PATCH 02/16] agent: Refactor msgpack serialization into util --- command/agent/user_event.go | 18 ++---------------- command/agent/util.go | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/command/agent/user_event.go b/command/agent/user_event.go index be1037d51d..4f3889b816 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -1,12 +1,10 @@ package agent import ( - "bytes" "fmt" "regexp" "github.com/hashicorp/consul/consul/structs" - "github.com/ugorji/go/codec" ) const ( @@ -79,7 +77,7 @@ func (a *Agent) UserEvent(dc string, params *UserEvent) error { // Format message params.ID = generateUUID() params.Version = userEventMaxVersion - payload, err := encodeUserEvent(¶ms) + payload, err := encodeMsgPack(¶ms) if err != nil { return fmt.Errorf("UserEvent encoding failed: %v", err) } @@ -114,7 +112,7 @@ func (a *Agent) handleEvents() { case e := <-a.eventCh: // Decode the event msg := new(UserEvent) - if err := decodeUserEvent(e.Payload, msg); err != nil { + if err := decodeMsgPack(e.Payload, msg); err != nil { a.logger.Printf("[ERR] agent: Failed to decode event: %v", err) continue } @@ -253,15 +251,3 @@ func (a *Agent) LastUserEvent() *UserEvent { idx := (((a.eventIndex - 1) % n) + n) % n return a.eventBuf[idx] } - -// Decode is used to decode a MsgPack encoded object -func decodeUserEvent(buf []byte, out interface{}) error { - return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) -} - -// encodeUserEvent is used to encode user event -func encodeUserEvent(msg interface{}) ([]byte, error) { - var buf bytes.Buffer - err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) - return buf.Bytes(), err -} diff --git a/command/agent/util.go b/command/agent/util.go index b753505b50..c91a6dbc44 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -1,6 +1,7 @@ package agent import ( + "bytes" crand "crypto/rand" "fmt" "math" @@ -9,6 +10,8 @@ import ( "os/exec" "runtime" "time" + + "github.com/ugorji/go/codec" ) const ( @@ -76,3 +79,15 @@ func generateUUID() string { buf[8:10], buf[10:16]) } + +// decodeMsgPack is used to decode a MsgPack encoded object +func decodeMsgPack(buf []byte, out interface{}) error { + return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) +} + +// encodeMsgPack is used to encode an object with msgpack +func encodeMsgPack(msg interface{}) ([]byte, error) { + var buf bytes.Buffer + err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) + return buf.Bytes(), err +} From 86a1a3a11e67372aa0d25c9c5d7da0f819158564 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 31 Aug 2014 21:50:09 -0700 Subject: [PATCH 03/16] command: Adding method to get client with datacenter --- command/rpc.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/command/rpc.go b/command/rpc.go index 97a31d083c..0975c6b4e3 100644 --- a/command/rpc.go +++ b/command/rpc.go @@ -27,7 +27,13 @@ func HTTPAddrFlag(f *flag.FlagSet) *string { // HTTPClient returns a new Consul HTTP client with the given address. func HTTPClient(addr string) (*consulapi.Client, error) { + return HTTPClientDC(addr, "") +} + +// HTTPClientDC returns a new Consul HTTP client with the given address and datacenter +func HTTPClientDC(addr, dc string) (*consulapi.Client, error) { conf := consulapi.DefaultConfig() conf.Address = addr + conf.Datacenter = dc return consulapi.NewClient(conf) } From 096e6fc886c274d440ce3ed79ce12742406452a0 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 31 Aug 2014 21:50:41 -0700 Subject: [PATCH 04/16] command/exec: First pass at exec command --- command/exec.go | 488 ++++++++++++++++++++++++++++++++++++++++++++++++ commands.go | 7 + 2 files changed, 495 insertions(+) create mode 100644 command/exec.go diff --git a/command/exec.go b/command/exec.go new file mode 100644 index 0000000000..1b87ce7930 --- /dev/null +++ b/command/exec.go @@ -0,0 +1,488 @@ +package command + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io" + "os" + "path" + "regexp" + "strconv" + "strings" + "time" + + "github.com/armon/consul-api" + "github.com/mitchellh/cli" +) + +const ( + // rExecFileName is the name of the file we append to + // the path, e.g. _rexec/session_id/job + rExecFileName = "job" + + // rExecAck is the suffix added to an ack path + rExecAckSuffix = "/ack" + + // rExecAck is the suffix added to an exit code + rExecExitSuffix = "/exit" + + // rExecOutputDivider is used to namespace the output + rExecOutputDivider = "/out/" +) + +// rExecConf is used to pass around configuration +type rExecConf struct { + datacenter string + prefix string + + node string + service string + tag string + + wait time.Duration + replWait time.Duration + + cmd string + script []byte + + verbose bool +} + +// rExecEvent is the event we broadcast using a user-event +type rExecEvent struct { + Prefix string + Session string +} + +// rExecSpec is the file we upload to specify the parameters +// of the remote execution. +type rExecSpec struct { + // Command is a single command to run directly in the shell + Command string `json:",omitempty"` + + // Script should be spilled to a file and executed + Script []byte `json:",omitempty"` + + // Wait is how long we are waiting on a quiet period to terminate + Wait time.Duration +} + +// rExecAck is used to transmit an acknowledgement +type rExecAck struct { + Node string +} + +// rExecHeart is used to transmit a heartbeat +type rExecHeart struct { + Node string +} + +// rExecOutput is used to transmit a chunk of output +type rExecOutput struct { + Node string + Output []byte +} + +// rExecExit is used to transmit an exit code +type rExecExit struct { + Node string + Code int +} + +// ExecCommand is a Command implementation that is used to +// do remote execution of commands +type ExecCommand struct { + ShutdownCh chan struct{} + Ui cli.Ui + conf rExecConf + client *consulapi.Client + sessionID string +} + +func (c *ExecCommand) Run(args []string) int { + cmdFlags := flag.NewFlagSet("exec", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.StringVar(&c.conf.datacenter, "datacenter", "", "") + cmdFlags.StringVar(&c.conf.node, "node", "", "") + cmdFlags.StringVar(&c.conf.service, "service", "", "") + cmdFlags.StringVar(&c.conf.tag, "tag", "", "") + cmdFlags.StringVar(&c.conf.prefix, "prefix", "_rexec", "") + cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", 100*time.Millisecond, "") + cmdFlags.DurationVar(&c.conf.wait, "wait", time.Second, "") + cmdFlags.BoolVar(&c.conf.verbose, "v", false, "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + // Join the commands to execute + c.conf.cmd = strings.Join(cmdFlags.Args(), " ") + + // If there is no command, read stdin for a script input + if c.conf.cmd == "" { + var buf bytes.Buffer + _, err := io.Copy(&buf, os.Stdin) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read stdin: %v", err)) + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + c.conf.script = buf.Bytes() + } + + // Ensure we have a command or script + if c.conf.cmd == "" && len(c.conf.script) == 0 { + c.Ui.Error("Must specify a command to execute") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + + // Validate the configuration + if err := c.conf.validate(); err != nil { + c.Ui.Error(err.Error()) + return 1 + } + + // Create and test the HTTP client + client, err := HTTPClientDC(*httpAddr, c.conf.datacenter) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + _, err = client.Agent().NodeName() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + c.client = client + + // Create the job spec + spec, err := c.makeRExecSpec() + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to create job spec: %s", err)) + return 1 + } + + // Create a session for this + c.sessionID, err = c.createSession() + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to create session: %s", err)) + return 1 + } + defer c.destroySession() + + // Upload the payload + if err := c.uploadPayload(spec); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to create job file: %s", err)) + return 1 + } + defer c.destroyData() + + // Wait for replication. This is done so that when the event is + // received, the job file can be read using a stale read. If the + // stale read fails, we expect a consistent read to be done, so + // largely this is a heuristic. + select { + case <-time.After(c.conf.replWait): + case <-c.ShutdownCh: + return 1 + } + + // Fire the event + id, err := c.fireEvent() + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to fire event: %s", err)) + return 1 + } + if c.conf.verbose { + c.Ui.Output(fmt.Sprintf("Fired remote execution event. ID: %s", id)) + } + + // Wait for the job to finish now + return c.waitForJob() +} + +// waitForJob is used to poll for results and wait until the job is terminated +func (c *ExecCommand) waitForJob() int { + // Although the session destroy is already deferred, we do it again here, + // because invalidation of the session before destroyData() ensures there is + // no race condition allowing an agent to upload data (the acquire will fail). + defer c.destroySession() + start := time.Now() + ackCh := make(chan rExecAck, 128) + heartCh := make(chan rExecHeart, 128) + outputCh := make(chan rExecOutput, 128) + exitCh := make(chan rExecExit, 128) + doneCh := make(chan struct{}) + defer close(doneCh) + go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh) + var ackCount, exitCount int +OUTER: + for { + // Determine wait time. We provide a larger window if we know about + // nodes which are still working. + waitIntv := c.conf.wait + if ackCount > exitCount { + waitIntv *= 4 + } + + select { + case e := <-ackCh: + ackCount++ + c.Ui.Output(fmt.Sprintf("Node %s: acknowledged event", e.Node)) + + case e := <-outputCh: + c.Ui.Output(fmt.Sprintf("Node %s: %s", e.Node, e.Output)) + + case e := <-exitCh: + exitCount++ + c.Ui.Output(fmt.Sprintf("Node %s: exited with code %d", e.Node, e.Code)) + + case <-time.After(waitIntv): + c.Ui.Output(fmt.Sprintf("%d nodes completed, %d nodes acknowledged", exitCount, ackCount)) + c.Ui.Output(fmt.Sprintf("Exec complete in %0.2f seconds", + float64(time.Now().Sub(start))/float64(time.Second))) + break OUTER + + case <-c.ShutdownCh: + return 1 + } + } + return 0 +} + +// streamResults is used to perform blocking queries against the KV endpoint and stream in +// notice of various events into waitForJob +func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart, + outputCh chan rExecOutput, exitCh chan rExecExit) { + kv := c.client.KV() + opts := consulapi.QueryOptions{WaitTime: c.conf.wait} + dir := path.Join(c.conf.prefix, c.sessionID) + "/" + seen := make(map[string]struct{}) + + for { + // Check if we've been signaled to exit + select { + case <-doneCh: + return + default: + } + + // Block on waiting for new keys + keys, qm, err := kv.Keys(dir, "", &opts) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read results: %s", err)) + goto ERR_EXIT + } + + // Fast-path the no-change case + if qm.LastIndex == opts.WaitIndex { + continue + } + opts.WaitIndex = qm.LastIndex + + // Handle each key + for _, key := range keys { + // Ignore if we've seen it + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + + // Trim the directory + full := key + key = strings.TrimPrefix(key, dir) + + // Handle the key type + switch { + case key == rExecFileName: + continue + case strings.HasSuffix(key, rExecAckSuffix): + ackCh <- rExecAck{Node: strings.TrimSuffix(key, rExecAckSuffix)} + + case strings.HasSuffix(key, rExecExitSuffix): + pair, _, err := kv.Get(full, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err)) + continue + } + code, err := strconv.ParseInt(string(pair.Value), 10, 32) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse exit code '%s': %v", pair.Value, err)) + continue + } + exitCh <- rExecExit{ + Node: strings.TrimSuffix(key, rExecExitSuffix), + Code: int(code), + } + + case strings.LastIndex(key, rExecOutputDivider) != -1: + pair, _, err := kv.Get(full, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err)) + continue + } + idx := strings.LastIndex(key, rExecOutputDivider) + node := key[:idx] + if len(pair.Value) == 0 { + heartCh <- rExecHeart{Node: node} + } else { + outputCh <- rExecOutput{Node: node, Output: pair.Value} + } + + default: + c.Ui.Error(fmt.Sprintf("Unknown key '%s', ignoring.", key)) + } + } + } + return + +ERR_EXIT: + select { + case c.ShutdownCh <- struct{}{}: + default: + } +} + +// validate checks that the configuration is sane +func (conf *rExecConf) validate() error { + // Validate the filters + if conf.node != "" { + if _, err := regexp.Compile(conf.node); err != nil { + return fmt.Errorf("Failed to compile node filter regexp: %v", err) + } + } + if conf.service != "" { + if _, err := regexp.Compile(conf.service); err != nil { + return fmt.Errorf("Failed to compile service filter regexp: %v", err) + } + } + if conf.tag != "" { + if _, err := regexp.Compile(conf.tag); err != nil { + return fmt.Errorf("Failed to compile tag filter regexp: %v", err) + } + } + if conf.tag != "" && conf.service == "" { + return fmt.Errorf("Cannot provide tag filter without service filter.") + } + return nil +} + +// createSession is used to create a new session for this command +func (c *ExecCommand) createSession() (string, error) { + session := c.client.Session() + se := consulapi.SessionEntry{ + Name: "Remote Exec", + } + id, _, err := session.Create(&se, nil) + return id, err +} + +// destroySession is used to destroy the associated session +func (c *ExecCommand) destroySession() error { + session := c.client.Session() + _, err := session.Destroy(c.sessionID, nil) + return err +} + +// makeRExecSpec creates a serialized job specification +// that can be uploaded which will be parsed by agents to +// determine what to do. +func (c *ExecCommand) makeRExecSpec() ([]byte, error) { + spec := &rExecSpec{ + Command: c.conf.cmd, + Script: c.conf.script, + Wait: c.conf.wait, + } + return json.Marshal(spec) +} + +// uploadPayload is used to upload the request payload +func (c *ExecCommand) uploadPayload(payload []byte) error { + kv := c.client.KV() + pair := consulapi.KVPair{ + Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName), + Value: payload, + Session: c.sessionID, + } + ok, _, err := kv.Acquire(&pair, nil) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("failed to acquire key %s", pair.Key) + } + return nil +} + +// destroyData is used to nuke all the data associated with +// this remote exec. We just do a recursive delete of our +// data directory. +func (c *ExecCommand) destroyData() error { + kv := c.client.KV() + dir := path.Join(c.conf.prefix, c.sessionID) + _, err := kv.DeleteTree(dir, nil) + return err +} + +// fireEvent is used to fire the event that will notify nodes +// about the remote execution. Returns the event ID or error +func (c *ExecCommand) fireEvent() (string, error) { + // Create the user event payload + msg := &rExecEvent{ + Prefix: c.conf.prefix, + Session: c.sessionID, + } + buf, err := json.Marshal(msg) + if err != nil { + return "", err + } + + // Format the user event + event := c.client.Event() + params := &consulapi.UserEvent{ + Name: "_rexec", + Payload: buf, + NodeFilter: c.conf.node, + ServiceFilter: c.conf.service, + TagFilter: c.conf.tag, + } + + // Fire the event + id, _, err := event.Fire(params, nil) + return id, err +} + +func (c *ExecCommand) Synopsis() string { + return "Executes a command on Consul nodes" +} + +func (c *ExecCommand) Help() string { + helpText := ` +Usage: consul exec [options] [command...] + + Evaluates a command on remote Consul nodes. The nodes responding can + be filtered using regular expressions on node name, service, and tag + definitions. If a command is not provided, stdin will be read until EOF + and used as a script input. + +Options: + + -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. + -datacenter="" Datacenter to dispatch in. Defaults to that of agent. + -prefix="_rexec" Prefix in the KV store to use for request data + -node="" Regular expression to filter on node names + -service="" Regular expression to filter on service instances + -tag="" Regular expression to filter on service tags. Must be used + with -service. + -wait=1s Period to wait with no responses before terminating execution. + -wait-repl=100ms Period to wait for replication before firing event. This is an + optimization to allow stale reads to be performed. + -v Enables verbose output +` + return strings.TrimSpace(helpText) +} diff --git a/commands.go b/commands.go index f68cf20801..820e64d1dd 100644 --- a/commands.go +++ b/commands.go @@ -31,6 +31,13 @@ func init() { }, nil }, + "exec": func() (cli.Command, error) { + return &command.ExecCommand{ + ShutdownCh: makeShutdownCh(), + Ui: ui, + }, nil + }, + "force-leave": func() (cli.Command, error) { return &command.ForceLeaveCommand{ Ui: ui, From 53777527e0553dfd9a51e899d2e03677b4144ac9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 31 Aug 2014 22:45:55 -0700 Subject: [PATCH 05/16] agent: First pass at remote exec support --- command/agent/remote_exec.go | 326 +++++++++++++++++++++++++++++++++++ command/agent/user_event.go | 17 +- 2 files changed, 342 insertions(+), 1 deletion(-) create mode 100644 command/agent/remote_exec.go diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go new file mode 100644 index 0000000000..d7d6986221 --- /dev/null +++ b/command/agent/remote_exec.go @@ -0,0 +1,326 @@ +package agent + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strconv" + "sync" + "syscall" + "time" + + "github.com/hashicorp/consul/consul/structs" +) + +const ( + // remoteExecFileName is the name of the file we append to + // the path, e.g. _rexec/session_id/job + remoteExecFileName = "job" + + // rExecAck is the suffix added to an ack path + remoteExecAckSuffix = "ack" + + // remoteExecAck is the suffix added to an exit code + remoteExecExitSuffix = "exit" + + // remoteExecOutputDivider is used to namespace the output + remoteExecOutputDivider = "out" + + // remoteExecOutputSize is the size we chunk output too + remoteExecOutputSize = 4 * 1024 + + // remoteExecOutputDeadline is how long we wait before uploading + // less than the chunk size + remoteExecOutputDeadline = 500 * time.Millisecond +) + +// remoteExecEvent is used as the payload of the user event to transmit +// what we need to know about the event +type remoteExecEvent struct { + Prefix string + Session string +} + +// remoteExecSpec is used as the specification of the remote exec. +// It is stored in the KV store +type remoteExecSpec struct { + Command string + Script []byte + Wait time.Duration +} + +type rexecWriter struct { + bufCh chan []byte + buf []byte + bufLen int + bufLock sync.Mutex + cancelCh chan struct{} + flush *time.Timer +} + +func (r *rexecWriter) Write(b []byte) (int, error) { + r.bufLock.Lock() + defer r.bufLock.Unlock() + if r.flush != nil { + r.flush.Stop() + r.flush = nil + } + inpLen := len(b) + +COPY: + remain := len(r.buf) - r.bufLen + if remain >= len(b) { + copy(r.buf[r.bufLen:], b) + r.bufLen += len(b) + } else { + copy(r.buf[r.bufLen:], b[:remain]) + b = b[remain:] + r.bufLen += remain + r.bufLock.Unlock() + r.flushBuf() + r.bufLock.Lock() + goto COPY + } + + r.flush = time.AfterFunc(remoteExecOutputDeadline, r.flushBuf) + return inpLen, nil +} + +func (r *rexecWriter) Close() { + r.flushBuf() + close(r.bufCh) +} + +func (r *rexecWriter) flushBuf() { + r.bufLock.Lock() + defer r.bufLock.Unlock() + if r.bufLen == 0 { + return + } + select { + case r.bufCh <- r.buf: + r.buf = make([]byte, remoteExecOutputSize) + r.bufLen = 0 + case <-r.cancelCh: + r.bufLen = 0 + } +} + +// handleRemoteExec is invoked when a new remote exec request is received +func (a *Agent) handleRemoteExec(msg *UserEvent) { + a.logger.Printf("[DEBUG] agent: received remote exec event (ID: %s)", msg.ID) + // Decode the event paylaod + var event remoteExecEvent + if err := json.Unmarshal(msg.Payload, &event); err != nil { + a.logger.Printf("[ERR] agent: failed to decode remote exec event: %v", err) + return + } + + // Read the job specification + var spec remoteExecSpec + if !a.remoteExecGetSpec(&event, &spec) { + return + } + + // Write the acknowledgement + if !a.remoteExecWriteAck(&event) { + return + } + + // Ensure we write out an exit code + exitCode := 0 + defer a.remoteExecWriteExitCode(&event, exitCode) + + // Check if this is a script, we may need to spill to disk + var script string + if len(spec.Script) != 0 { + tmpFile, err := ioutil.TempFile("", "rexec") + if err != nil { + a.logger.Printf("[DEBUG] agent: failed to make tmp file: %v", err) + exitCode = 255 + return + } + defer os.Remove(tmpFile.Name()) + os.Chmod(tmpFile.Name(), 0750) + tmpFile.Write(spec.Script) + tmpFile.Close() + script = tmpFile.Name() + } else { + script = spec.Command + } + + // Create the exec.Cmd + cmd, err := ExecScript(script) + if err != nil { + a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err) + exitCode = 255 + return + } + + // Setup the output streaming + writer := &rexecWriter{ + bufCh: make(chan []byte, 16), + buf: make([]byte, remoteExecOutputSize), + cancelCh: make(chan struct{}), + } + cmd.Stdout = writer + cmd.Stderr = writer + + // Start execution + err = cmd.Start() + if err != nil { + a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err) + exitCode = 255 + return + } + + // Wait for the process to exit + exitCh := make(chan int, 1) + go func() { + err := cmd.Wait() + writer.Close() + if err != nil { + exitCh <- 0 + return + } + + // Try to determine the exit code + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCh <- status.ExitStatus() + return + } + } + exitCh <- 1 + }() + + // Wait until we are complete, uploading as we go +WAIT: + for num := 0; ; num++ { + select { + case out := <-writer.bufCh: + if out == nil { + break WAIT + } + if !a.remoteExecWriteOutput(&event, num, out) { + close(writer.cancelCh) + exitCode = 255 + return + } + case <-time.After(spec.Wait): + // Acts like a heartbeat, since there is no output + a.remoteExecWriteOutput(&event, num, nil) + } + } + + // Get the exit code + exitCode = <-exitCh +} + +// remoteExecGetSpec is used to get the exec specification. +// Returns if execution should continue +func (a *Agent) remoteExecGetSpec(event *remoteExecEvent, spec *remoteExecSpec) bool { + get := structs.KeyRequest{ + Datacenter: a.config.Datacenter, + Key: path.Join(event.Prefix, event.Session, remoteExecFileName), + QueryOptions: structs.QueryOptions{ + AllowStale: true, // Stale read for scale! Retry on failure. + }, + } + var out structs.IndexedDirEntries +QUERY: + if err := a.RPC("KVS.Get", &get, &out); err != nil { + a.logger.Printf("[ERR] agent: failed to get remote exec job: %v", err) + return false + } + if len(out.Entries) == 0 { + // If the initial read was stale and had no data, retry as a consistent read + if get.QueryOptions.AllowStale { + a.logger.Printf("[DEBUG] agent: trying consistent fetch of remote exec job spec") + get.QueryOptions.AllowStale = false + goto QUERY + } else { + a.logger.Printf("[DEBUG] agent: remote exec aborted, job spec missing") + return false + } + } + if err := json.Unmarshal(out.Entries[0].Value, &spec); err != nil { + a.logger.Printf("[ERR] agent: failed to decode remote exec spec: %v", err) + return false + } + return true +} + +// remoteExecWriteAck is used to write an ack. Returns if execution should +// continue. +func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool { + write := structs.KVSRequest{ + Datacenter: a.config.Datacenter, + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: path.Join(event.Prefix, event.Session, + a.config.NodeName, remoteExecAckSuffix), + Session: event.Session, + }, + } + var success bool + if err := a.RPC("KVS.Apply", &write, &success); err != nil { + a.logger.Printf("[ERR] agent: failed to ack remote exec job: %v", err) + return false + } + if !success { + a.logger.Printf("[DEBUG] agent: remote exec aborted, ack failed") + return false + } + return true +} + +// remoteExecWriteOutput is used to write output +func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool { + outputNum := fmt.Sprintf("%05x", num) + key := path.Join(event.Prefix, event.Session, + a.config.NodeName, remoteExecOutputDivider, outputNum) + write := structs.KVSRequest{ + Datacenter: a.config.Datacenter, + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: key, + Value: output, + Session: event.Session, + }, + } + var success bool + if err := a.RPC("KVS.Apply", &write, &success); err != nil { + a.logger.Printf("[ERR] agent: failed to write output for remote exec job: %v", err) + return false + } + if !success { + a.logger.Printf("[DEBUG] agent: remote exec aborted, output write failed") + return false + } + return true +} + +// remoteExecWriteExitCode is used to write an exit code +func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) { + write := structs.KVSRequest{ + Datacenter: a.config.Datacenter, + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: path.Join(event.Prefix, event.Session, + a.config.NodeName, remoteExecExitSuffix), + Value: []byte(strconv.FormatInt(int64(exitCode), 10)), + Session: event.Session, + }, + } + var success bool + if err := a.RPC("KVS.Apply", &write, &success); err != nil { + a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err) + } + if !success { + a.logger.Printf("[DEBUG] agent: remote exec aborted, exit code write failed") + } +} diff --git a/command/agent/user_event.go b/command/agent/user_event.go index 4f3889b816..64891981f7 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -10,6 +10,9 @@ import ( const ( // userEventMaxVersion is the maximum protocol version we understand userEventMaxVersion = 1 + + // remoteExecName is the event name for a remote exec command + remoteExecName = "_rexec" ) // UserEventParam is used to parameterize a user event @@ -206,7 +209,19 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool { // ingestUserEvent is used to process an event that passes filtering func (a *Agent) ingestUserEvent(msg *UserEvent) { - a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID) + // Special handling for internal events + switch msg.Name { + case remoteExecName: + if a.config.DisableRemoteExec { + a.logger.Printf("[INFO] agent: ignoring remote exec event (%s), disabled.", msg.ID) + } else { + go a.handleRemoteExec(msg) + } + return + default: + a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID) + } + a.eventLock.Lock() defer func() { a.eventLock.Unlock() From 61a2170b7d2aa2940dc66c65d60f2f7d3bb1dc0e Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 31 Aug 2014 22:46:08 -0700 Subject: [PATCH 06/16] command/exec: Fixing use of shutdown ch --- command/exec.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/command/exec.go b/command/exec.go index 1b87ce7930..0ac1d728a8 100644 --- a/command/exec.go +++ b/command/exec.go @@ -94,7 +94,7 @@ type rExecExit struct { // ExecCommand is a Command implementation that is used to // do remote execution of commands type ExecCommand struct { - ShutdownCh chan struct{} + ShutdownCh <-chan struct{} Ui cli.Ui conf rExecConf client *consulapi.Client @@ -218,8 +218,9 @@ func (c *ExecCommand) waitForJob() int { outputCh := make(chan rExecOutput, 128) exitCh := make(chan rExecExit, 128) doneCh := make(chan struct{}) + errCh := make(chan struct{}, 1) defer close(doneCh) - go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh) + go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh) var ackCount, exitCount int OUTER: for { @@ -243,11 +244,14 @@ OUTER: c.Ui.Output(fmt.Sprintf("Node %s: exited with code %d", e.Node, e.Code)) case <-time.After(waitIntv): - c.Ui.Output(fmt.Sprintf("%d nodes completed, %d nodes acknowledged", exitCount, ackCount)) + c.Ui.Output(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount)) c.Ui.Output(fmt.Sprintf("Exec complete in %0.2f seconds", float64(time.Now().Sub(start))/float64(time.Second))) break OUTER + case <-errCh: + return 1 + case <-c.ShutdownCh: return 1 } @@ -258,7 +262,7 @@ OUTER: // streamResults is used to perform blocking queries against the KV endpoint and stream in // notice of various events into waitForJob func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart, - outputCh chan rExecOutput, exitCh chan rExecExit) { + outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) { kv := c.client.KV() opts := consulapi.QueryOptions{WaitTime: c.conf.wait} dir := path.Join(c.conf.prefix, c.sessionID) + "/" @@ -343,7 +347,7 @@ func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, h ERR_EXIT: select { - case c.ShutdownCh <- struct{}{}: + case errCh <- struct{}{}: default: } } From a0c6dbfe2aca40874fbc77dd62e5d4e448100491 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 10:42:35 -0700 Subject: [PATCH 07/16] agent: testing remote exec writer --- command/agent/remote_exec.go | 55 ++++++++++++---------- command/agent/remote_exec_test.go | 77 +++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 24 deletions(-) create mode 100644 command/agent/remote_exec_test.go diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go index d7d6986221..d8c8915d42 100644 --- a/command/agent/remote_exec.go +++ b/command/agent/remote_exec.go @@ -53,12 +53,15 @@ type remoteExecSpec struct { } type rexecWriter struct { - bufCh chan []byte - buf []byte - bufLen int - bufLock sync.Mutex - cancelCh chan struct{} - flush *time.Timer + BufCh chan []byte + BufSize int + BufIdle time.Duration + CancelCh chan struct{} + + buf []byte + bufLen int + bufLock sync.Mutex + flush *time.Timer } func (r *rexecWriter) Write(b []byte) (int, error) { @@ -69,10 +72,13 @@ func (r *rexecWriter) Write(b []byte) (int, error) { r.flush = nil } inpLen := len(b) + if r.buf == nil { + r.buf = make([]byte, r.BufSize) + } COPY: remain := len(r.buf) - r.bufLen - if remain >= len(b) { + if remain > len(b) { copy(r.buf[r.bufLen:], b) r.bufLen += len(b) } else { @@ -80,31 +86,30 @@ COPY: b = b[remain:] r.bufLen += remain r.bufLock.Unlock() - r.flushBuf() + r.Flush() r.bufLock.Lock() goto COPY } - r.flush = time.AfterFunc(remoteExecOutputDeadline, r.flushBuf) + r.flush = time.AfterFunc(r.BufIdle, r.Flush) return inpLen, nil } -func (r *rexecWriter) Close() { - r.flushBuf() - close(r.bufCh) -} - -func (r *rexecWriter) flushBuf() { +func (r *rexecWriter) Flush() { r.bufLock.Lock() defer r.bufLock.Unlock() + if r.flush != nil { + r.flush.Stop() + r.flush = nil + } if r.bufLen == 0 { return } select { - case r.bufCh <- r.buf: - r.buf = make([]byte, remoteExecOutputSize) + case r.BufCh <- r.buf[:r.bufLen]: + r.buf = make([]byte, r.BufSize) r.bufLen = 0 - case <-r.cancelCh: + case <-r.CancelCh: r.bufLen = 0 } } @@ -162,9 +167,10 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { // Setup the output streaming writer := &rexecWriter{ - bufCh: make(chan []byte, 16), - buf: make([]byte, remoteExecOutputSize), - cancelCh: make(chan struct{}), + BufCh: make(chan []byte, 16), + BufSize: remoteExecOutputSize, + BufIdle: remoteExecOutputDeadline, + CancelCh: make(chan struct{}), } cmd.Stdout = writer cmd.Stderr = writer @@ -181,7 +187,8 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { exitCh := make(chan int, 1) go func() { err := cmd.Wait() - writer.Close() + writer.Flush() + close(writer.BufCh) if err != nil { exitCh <- 0 return @@ -201,12 +208,12 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { WAIT: for num := 0; ; num++ { select { - case out := <-writer.bufCh: + case out := <-writer.BufCh: if out == nil { break WAIT } if !a.remoteExecWriteOutput(&event, num, out) { - close(writer.cancelCh) + close(writer.CancelCh) exitCode = 255 return } diff --git a/command/agent/remote_exec_test.go b/command/agent/remote_exec_test.go new file mode 100644 index 0000000000..af65e5427b --- /dev/null +++ b/command/agent/remote_exec_test.go @@ -0,0 +1,77 @@ +package agent + +import ( + "testing" + "time" +) + +func TestRexecWriter(t *testing.T) { + writer := &rexecWriter{ + BufCh: make(chan []byte, 16), + BufSize: 16, + BufIdle: 10 * time.Millisecond, + CancelCh: make(chan struct{}), + } + + // Write short, wait for idle + start := time.Now() + n, err := writer.Write([]byte("test")) + if err != nil { + t.Fatalf("err: %v", err) + } + if n != 4 { + t.Fatalf("bad: %v", n) + } + + select { + case b := <-writer.BufCh: + if len(b) != 4 { + t.Fatalf("Bad: %v", b) + } + if time.Now().Sub(start) < writer.BufIdle { + t.Fatalf("too early") + } + case <-time.After(2 * writer.BufIdle): + t.Fatalf("timeout") + } + + // Write in succession to prevent the timeout + writer.Write([]byte("test")) + time.Sleep(writer.BufIdle / 2) + writer.Write([]byte("test")) + time.Sleep(writer.BufIdle / 2) + start = time.Now() + writer.Write([]byte("test")) + + select { + case b := <-writer.BufCh: + if len(b) != 12 { + t.Fatalf("Bad: %v", b) + } + if time.Now().Sub(start) < writer.BufIdle { + t.Fatalf("too early") + } + case <-time.After(2 * writer.BufIdle): + t.Fatalf("timeout") + } + + // Write large values, multiple flushes required + writer.Write([]byte("01234567890123456789012345678901")) + + select { + case b := <-writer.BufCh: + if string(b) != "0123456789012345" { + t.Fatalf("bad: %s", b) + } + default: + t.Fatalf("should have buf") + } + select { + case b := <-writer.BufCh: + if string(b) != "6789012345678901" { + t.Fatalf("bad: %s", b) + } + default: + t.Fatalf("should have buf") + } +} From 9ba4f31fde6466a2ae1efe437f3d2367f4e83063 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 10:51:15 -0700 Subject: [PATCH 08/16] agent: Refactor remote exec write code --- command/agent/remote_exec.go | 55 +++++++++++------------------------- 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go index d8c8915d42..9d24a556ad 100644 --- a/command/agent/remote_exec.go +++ b/command/agent/remote_exec.go @@ -264,70 +264,49 @@ QUERY: // remoteExecWriteAck is used to write an ack. Returns if execution should // continue. func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool { - write := structs.KVSRequest{ - Datacenter: a.config.Datacenter, - Op: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: path.Join(event.Prefix, event.Session, - a.config.NodeName, remoteExecAckSuffix), - Session: event.Session, - }, - } - var success bool - if err := a.RPC("KVS.Apply", &write, &success); err != nil { + if err := a.remoteExecWriteKey(event, remoteExecAckSuffix, nil); err != nil { a.logger.Printf("[ERR] agent: failed to ack remote exec job: %v", err) return false } - if !success { - a.logger.Printf("[DEBUG] agent: remote exec aborted, ack failed") - return false - } return true } // remoteExecWriteOutput is used to write output func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool { - outputNum := fmt.Sprintf("%05x", num) - key := path.Join(event.Prefix, event.Session, - a.config.NodeName, remoteExecOutputDivider, outputNum) - write := structs.KVSRequest{ - Datacenter: a.config.Datacenter, - Op: structs.KVSLock, - DirEnt: structs.DirEntry{ - Key: key, - Value: output, - Session: event.Session, - }, - } - var success bool - if err := a.RPC("KVS.Apply", &write, &success); err != nil { + suffix := path.Join(remoteExecOutputDivider, fmt.Sprintf("%05x", num)) + if err := a.remoteExecWriteKey(event, suffix, output); err != nil { a.logger.Printf("[ERR] agent: failed to write output for remote exec job: %v", err) return false } - if !success { - a.logger.Printf("[DEBUG] agent: remote exec aborted, output write failed") - return false - } return true } // remoteExecWriteExitCode is used to write an exit code func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) { + val := []byte(strconv.FormatInt(int64(exitCode), 10)) + if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil { + a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err) + } +} + +// remoteExecWriteKey is used to write an output key for a remote exec job +func (a *Agent) remoteExecWriteKey(event *remoteExecEvent, suffix string, val []byte) error { + key := path.Join(event.Prefix, event.Session, a.config.NodeName, suffix) write := structs.KVSRequest{ Datacenter: a.config.Datacenter, Op: structs.KVSLock, DirEnt: structs.DirEntry{ - Key: path.Join(event.Prefix, event.Session, - a.config.NodeName, remoteExecExitSuffix), - Value: []byte(strconv.FormatInt(int64(exitCode), 10)), + Key: key, + Value: val, Session: event.Session, }, } var success bool if err := a.RPC("KVS.Apply", &write, &success); err != nil { - a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err) + return err } if !success { - a.logger.Printf("[DEBUG] agent: remote exec aborted, exit code write failed") + return fmt.Errorf("write failed") } + return nil } From 9b74b86709e695688939d9bf4902911365a63658 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 11:21:57 -0700 Subject: [PATCH 09/16] agent: Testing remote exec --- command/agent/remote_exec.go | 5 +- command/agent/remote_exec_test.go | 211 ++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 1 deletion(-) diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go index 9d24a556ad..f0fb198511 100644 --- a/command/agent/remote_exec.go +++ b/command/agent/remote_exec.go @@ -158,6 +158,7 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { } // Create the exec.Cmd + a.logger.Printf("[INFO] agent: remote exec '%s'", script) cmd, err := ExecScript(script) if err != nil { a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err) @@ -282,11 +283,13 @@ func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output [] } // remoteExecWriteExitCode is used to write an exit code -func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) { +func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) bool { val := []byte(strconv.FormatInt(int64(exitCode), 10)) if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil { a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err) + return false } + return true } // remoteExecWriteKey is used to write an output key for a remote exec job diff --git a/command/agent/remote_exec_test.go b/command/agent/remote_exec_test.go index af65e5427b..d431b1f809 100644 --- a/command/agent/remote_exec_test.go +++ b/command/agent/remote_exec_test.go @@ -1,8 +1,15 @@ package agent import ( + "bytes" + "encoding/json" + "os" + "reflect" "testing" "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" ) func TestRexecWriter(t *testing.T) { @@ -75,3 +82,207 @@ func TestRexecWriter(t *testing.T) { t.Fatalf("should have buf") } } + +func TestRemoteExecGetSpec(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + spec := &remoteExecSpec{ + Command: "uptime", + Script: []byte("#!/bin/bash"), + Wait: time.Second, + } + buf, err := json.Marshal(spec) + if err != nil { + t.Fatalf("err: %v", err) + } + key := "_rexec/" + event.Session + "/job" + setKV(t, agent, key, buf) + + var out remoteExecSpec + if !agent.remoteExecGetSpec(event, &out) { + t.Fatalf("bad") + } + if !reflect.DeepEqual(spec, &out) { + t.Fatalf("bad spec") + } +} + +func TestRemoteExecWrites(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + if !agent.remoteExecWriteAck(event) { + t.Fatalf("bad") + } + + output := []byte("testing") + if !agent.remoteExecWriteOutput(event, 0, output) { + t.Fatalf("bad") + } + if !agent.remoteExecWriteOutput(event, 10, output) { + t.Fatalf("bad") + } + + if !agent.remoteExecWriteExitCode(event, 1) { + t.Fatalf("bad") + } + + key := "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack" + d := getKV(t, agent, key) + if d == nil || d.Session != event.Session { + t.Fatalf("bad ack: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) { + t.Fatalf("bad output: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/0000a" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) { + t.Fatalf("bad output: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || string(d.Value) != "1" { + t.Fatalf("bad output: %#v", d) + } +} + +func TestHandleRemoteExec(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + spec := &remoteExecSpec{ + Command: "uptime", + Wait: time.Second, + } + buf, err := json.Marshal(spec) + if err != nil { + t.Fatalf("err: %v", err) + } + key := "_rexec/" + event.Session + "/job" + setKV(t, agent, key, buf) + + buf, err = json.Marshal(event) + if err != nil { + t.Fatalf("err: %v", err) + } + msg := &UserEvent{ + ID: generateUUID(), + Payload: buf, + } + + // Handle the event... + agent.handleRemoteExec(msg) + + // Verify we have an ack + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack" + d := getKV(t, agent, key) + if d == nil || d.Session != event.Session { + t.Fatalf("bad ack: %#v", d) + } + + // Verify we have output + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || + !bytes.Contains(d.Value, []byte("load")) { + t.Fatalf("bad output: %#v", d) + } + + // Verify we have an exit code + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || string(d.Value) != "0" { + t.Fatalf("bad output: %#v", d) + } +} + +func makeRexecSession(t *testing.T, agent *Agent) string { + args := structs.SessionRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.SessionCreate, + Session: structs.Session{ + Node: agent.config.NodeName, + LockDelay: 15 * time.Second, + }, + } + var out string + if err := agent.RPC("Session.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + return out +} + +func destroySession(t *testing.T, agent *Agent, session string) { + args := structs.SessionRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.SessionDestroy, + Session: structs.Session{ + ID: session, + }, + } + var out string + if err := agent.RPC("Session.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } +} + +func setKV(t *testing.T, agent *Agent, key string, val []byte) { + write := structs.KVSRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: key, + Value: val, + }, + } + var success bool + if err := agent.RPC("KVS.Apply", &write, &success); err != nil { + t.Fatalf("err: %v", err) + } +} + +func getKV(t *testing.T, agent *Agent, key string) *structs.DirEntry { + req := structs.KeyRequest{ + Datacenter: agent.config.Datacenter, + Key: key, + } + var out structs.IndexedDirEntries + if err := agent.RPC("KVS.Get", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.Entries) > 0 { + return out.Entries[0] + } + return nil +} From de4adc4f664dc5356301210f803c8efefeee383b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 11:30:56 -0700 Subject: [PATCH 10/16] command/exec: High level tests --- command/exec.go | 6 +++++- command/exec_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 command/exec_test.go diff --git a/command/exec.go b/command/exec.go index 0ac1d728a8..e4ec15596e 100644 --- a/command/exec.go +++ b/command/exec.go @@ -18,6 +18,10 @@ import ( ) const ( + // rExecPrefix is the prefix in the KV store used to + // store the remote exec data + rExecPrefix = "_rexec" + // rExecFileName is the name of the file we append to // the path, e.g. _rexec/session_id/job rExecFileName = "job" @@ -108,7 +112,7 @@ func (c *ExecCommand) Run(args []string) int { cmdFlags.StringVar(&c.conf.node, "node", "", "") cmdFlags.StringVar(&c.conf.service, "service", "", "") cmdFlags.StringVar(&c.conf.tag, "tag", "", "") - cmdFlags.StringVar(&c.conf.prefix, "prefix", "_rexec", "") + cmdFlags.StringVar(&c.conf.prefix, "prefix", rExecPrefix, "") cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", 100*time.Millisecond, "") cmdFlags.DurationVar(&c.conf.wait, "wait", time.Second, "") cmdFlags.BoolVar(&c.conf.verbose, "v", false, "") diff --git a/command/exec_test.go b/command/exec_test.go new file mode 100644 index 0000000000..366fb384c9 --- /dev/null +++ b/command/exec_test.go @@ -0,0 +1,48 @@ +package command + +import ( + "github.com/mitchellh/cli" + "strings" + "testing" + + "github.com/hashicorp/consul/testutil" +) + +func TestExecCommand_implements(t *testing.T) { + var _ cli.Command = &ExecCommand{} +} + +func TestExecCommandRun(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + ui := new(cli.MockUi) + c := &ExecCommand{Ui: ui} + args := []string{"-http-addr=" + a1.httpAddr, "-wait=400ms", "uptime"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), "load") { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} + +func waitForLeader(t *testing.T, httpAddr string) { + client, err := HTTPClient(httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult(func() (bool, error) { + _, qm, err := client.Catalog().Nodes(nil) + if err != nil { + return false, err + } + return qm.KnownLeader, nil + }, func(err error) { + t.Fatalf("failed to find leader: %v", err) + }) +} From 5b619eec96f3703a3c96b5161941aec8ecc46d8f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 13:02:15 -0700 Subject: [PATCH 11/16] command/exec: Testing exec --- command/exec.go | 12 +- command/exec_test.go | 273 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 281 insertions(+), 4 deletions(-) diff --git a/command/exec.go b/command/exec.go index e4ec15596e..2e075789da 100644 --- a/command/exec.go +++ b/command/exec.go @@ -125,7 +125,8 @@ func (c *ExecCommand) Run(args []string) int { c.conf.cmd = strings.Join(cmdFlags.Args(), " ") // If there is no command, read stdin for a script input - if c.conf.cmd == "" { + if c.conf.cmd == "-" { + c.conf.cmd = "" var buf bytes.Buffer _, err := io.Copy(&buf, os.Stdin) if err != nil { @@ -240,6 +241,11 @@ OUTER: ackCount++ c.Ui.Output(fmt.Sprintf("Node %s: acknowledged event", e.Node)) + case h := <-heartCh: + if c.conf.verbose { + c.Ui.Output(fmt.Sprintf("Node %s: heartbeated", h.Node)) + } + case e := <-outputCh: c.Ui.Output(fmt.Sprintf("Node %s: %s", e.Node, e.Output)) @@ -471,11 +477,11 @@ func (c *ExecCommand) Synopsis() string { func (c *ExecCommand) Help() string { helpText := ` -Usage: consul exec [options] [command...] +Usage: consul exec [options] [-|command...] Evaluates a command on remote Consul nodes. The nodes responding can be filtered using regular expressions on node name, service, and tag - definitions. If a command is not provided, stdin will be read until EOF + definitions. If a command is '-', stdin will be read until EOF and used as a script input. Options: diff --git a/command/exec_test.go b/command/exec_test.go index 366fb384c9..e48cd3abaa 100644 --- a/command/exec_test.go +++ b/command/exec_test.go @@ -1,11 +1,13 @@ package command import ( - "github.com/mitchellh/cli" "strings" "testing" + "time" + "github.com/armon/consul-api" "github.com/hashicorp/consul/testutil" + "github.com/mitchellh/cli" ) func TestExecCommand_implements(t *testing.T) { @@ -46,3 +48,272 @@ func waitForLeader(t *testing.T, httpAddr string) { t.Fatalf("failed to find leader: %v", err) }) } + +func TestExecCommand_Validate(t *testing.T) { + conf := &rExecConf{} + err := conf.validate() + if err != nil { + t.Fatalf("err: %v", err) + } + + conf.node = "(" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } + + conf.node = "" + conf.service = "(" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } + + conf.service = "()" + conf.tag = "(" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } + + conf.service = "" + conf.tag = "foo" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } +} + +func TestExecCommand_Sessions(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + client, err := HTTPClient(a1.httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + ui := new(cli.MockUi) + c := &ExecCommand{ + Ui: ui, + client: client, + } + + id, err := c.createSession() + if err != nil { + t.Fatalf("err: %v", err) + } + + se, _, err := client.Session().Info(id, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if se == nil || se.Name != "Remote Exec" { + t.Fatalf("bad: %v", se) + } + + c.sessionID = id + err = c.destroySession() + if err != nil { + t.Fatalf("err: %v", err) + } + + se, _, err = client.Session().Info(id, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if se != nil { + t.Fatalf("bad: %v", se) + } +} + +func TestExecCommand_UploadDestroy(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + client, err := HTTPClient(a1.httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + ui := new(cli.MockUi) + c := &ExecCommand{ + Ui: ui, + client: client, + } + + id, err := c.createSession() + if err != nil { + t.Fatalf("err: %v", err) + } + c.sessionID = id + + c.conf.prefix = "_rexec" + c.conf.cmd = "uptime" + c.conf.wait = time.Second + + buf, err := c.makeRExecSpec() + if err != nil { + t.Fatalf("err: %v", err) + } + + err = c.uploadPayload(buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + pair, _, err := client.KV().Get("_rexec/"+id+"/job", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if pair == nil || len(pair.Value) == 0 { + t.Fatalf("missing job spec") + } + + err = c.destroyData() + if err != nil { + t.Fatalf("err: %v", err) + } + + pair, _, err = client.KV().Get("_rexec/"+id+"/job", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if pair != nil { + t.Fatalf("should be destroyed") + } +} + +func TestExecCommand_StreamResults(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + client, err := HTTPClient(a1.httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + ui := new(cli.MockUi) + c := &ExecCommand{ + Ui: ui, + client: client, + } + c.conf.prefix = "_rexec" + + id, err := c.createSession() + if err != nil { + t.Fatalf("err: %v", err) + } + c.sessionID = id + + ackCh := make(chan rExecAck, 128) + heartCh := make(chan rExecHeart, 128) + outputCh := make(chan rExecOutput, 128) + exitCh := make(chan rExecExit, 128) + doneCh := make(chan struct{}) + errCh := make(chan struct{}, 1) + defer close(doneCh) + go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh) + + prefix := "_rexec/" + id + "/" + ok, _, err := client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/ack", + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case a := <-ackCh: + if a.Node != "foo" { + t.Fatalf("bad: %#v", a) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } + + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/exit", + Value: []byte("127"), + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case e := <-exitCh: + if e.Node != "foo" || e.Code != 127 { + t.Fatalf("bad: %#v", e) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } + + // Random key, should ignore + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/random", + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + // Output heartbeat + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/out/00000", + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case h := <-heartCh: + if h.Node != "foo" { + t.Fatalf("bad: %#v", h) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } + + // Output value + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/out/00001", + Value: []byte("test"), + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case o := <-outputCh: + if o.Node != "foo" || string(o.Output) != "test" { + t.Fatalf("bad: %#v", o) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } +} From bfb2b93d756ecff895355ed055a969f82dde6be2 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 13:05:22 -0700 Subject: [PATCH 12/16] command/exec: Fixing verbose flag --- command/exec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/command/exec.go b/command/exec.go index 2e075789da..272764b726 100644 --- a/command/exec.go +++ b/command/exec.go @@ -115,7 +115,7 @@ func (c *ExecCommand) Run(args []string) int { cmdFlags.StringVar(&c.conf.prefix, "prefix", rExecPrefix, "") cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", 100*time.Millisecond, "") cmdFlags.DurationVar(&c.conf.wait, "wait", time.Second, "") - cmdFlags.BoolVar(&c.conf.verbose, "v", false, "") + cmdFlags.BoolVar(&c.conf.verbose, "verbose", false, "") httpAddr := HTTPAddrFlag(cmdFlags) if err := cmdFlags.Parse(args); err != nil { return 1 @@ -496,7 +496,7 @@ Options: -wait=1s Period to wait with no responses before terminating execution. -wait-repl=100ms Period to wait for replication before firing event. This is an optimization to allow stale reads to be performed. - -v Enables verbose output + -verbose Enables verbose output ` return strings.TrimSpace(helpText) } From 32860747588d0439993eda24c5a07075c2c8f904 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 13:13:15 -0700 Subject: [PATCH 13/16] agent: Handle cancel for idle wait --- command/agent/remote_exec.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go index f0fb198511..64fec4599a 100644 --- a/command/agent/remote_exec.go +++ b/command/agent/remote_exec.go @@ -220,7 +220,11 @@ WAIT: } case <-time.After(spec.Wait): // Acts like a heartbeat, since there is no output - a.remoteExecWriteOutput(&event, num, nil) + if !a.remoteExecWriteOutput(&event, num, nil) { + close(writer.CancelCh) + exitCode = 255 + return + } } } From 6ef4f9fdc4e10066e2b0ef34fbbca6a0beee4f2d Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 13:13:24 -0700 Subject: [PATCH 14/16] command/exec: Tuning constants --- command/exec.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/command/exec.go b/command/exec.go index 272764b726..05531f1793 100644 --- a/command/exec.go +++ b/command/exec.go @@ -34,6 +34,13 @@ const ( // rExecOutputDivider is used to namespace the output rExecOutputDivider = "/out/" + + // rExecReplicationWait is how long we wait for replication + rExecReplicationWait = 200 * time.Millisecond + + // rExecQuietWait is how long we wait for no responses + // before assuming the job is done. + rExecQuietWait = 2 * time.Second ) // rExecConf is used to pass around configuration @@ -113,8 +120,8 @@ func (c *ExecCommand) Run(args []string) int { cmdFlags.StringVar(&c.conf.service, "service", "", "") cmdFlags.StringVar(&c.conf.tag, "tag", "", "") cmdFlags.StringVar(&c.conf.prefix, "prefix", rExecPrefix, "") - cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", 100*time.Millisecond, "") - cmdFlags.DurationVar(&c.conf.wait, "wait", time.Second, "") + cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, "") + cmdFlags.DurationVar(&c.conf.wait, "wait", rExecQuietWait, "") cmdFlags.BoolVar(&c.conf.verbose, "verbose", false, "") httpAddr := HTTPAddrFlag(cmdFlags) if err := cmdFlags.Parse(args); err != nil { @@ -233,7 +240,7 @@ OUTER: // nodes which are still working. waitIntv := c.conf.wait if ackCount > exitCount { - waitIntv *= 4 + waitIntv *= 2 } select { From a6f5e40eac9137ce375ddf7a4b5dafd0b4e658fd Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 14:44:41 -0700 Subject: [PATCH 15/16] command/exec: Improving output --- command/exec.go | 87 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 11 deletions(-) diff --git a/command/exec.go b/command/exec.go index 05531f1793..996bbd5b2d 100644 --- a/command/exec.go +++ b/command/exec.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "time" + "unicode" "github.com/armon/consul-api" "github.com/mitchellh/cli" @@ -186,6 +187,9 @@ func (c *ExecCommand) Run(args []string) int { return 1 } defer c.destroySession() + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Created remote execution session: %s", c.sessionID)) + } // Upload the payload if err := c.uploadPayload(spec); err != nil { @@ -193,6 +197,9 @@ func (c *ExecCommand) Run(args []string) int { return 1 } defer c.destroyData() + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Uploaded remote execution spec")) + } // Wait for replication. This is done so that when the event is // received, the job file can be read using a stale read. If the @@ -211,7 +218,7 @@ func (c *ExecCommand) Run(args []string) int { return 1 } if c.conf.verbose { - c.Ui.Output(fmt.Sprintf("Fired remote execution event. ID: %s", id)) + c.Ui.Info(fmt.Sprintf("Fired remote execution event: %s", id)) } // Wait for the job to finish now @@ -233,7 +240,9 @@ func (c *ExecCommand) waitForJob() int { errCh := make(chan struct{}, 1) defer close(doneCh) go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh) - var ackCount, exitCount int + target := &TargettedUi{Ui: c.Ui} + + var ackCount, exitCount, badExit int OUTER: for { // Determine wait time. We provide a larger window if we know about @@ -246,24 +255,35 @@ OUTER: select { case e := <-ackCh: ackCount++ - c.Ui.Output(fmt.Sprintf("Node %s: acknowledged event", e.Node)) + if c.conf.verbose { + target.Target = e.Node + target.Info("acknowledged") + } case h := <-heartCh: if c.conf.verbose { - c.Ui.Output(fmt.Sprintf("Node %s: heartbeated", h.Node)) + target.Target = h.Node + target.Info("heartbeat received") } case e := <-outputCh: - c.Ui.Output(fmt.Sprintf("Node %s: %s", e.Node, e.Output)) + target.Target = e.Node + target.Output(string(e.Output)) case e := <-exitCh: exitCount++ - c.Ui.Output(fmt.Sprintf("Node %s: exited with code %d", e.Node, e.Code)) + target.Target = e.Node + target.Info(fmt.Sprintf("finished with exit code %d", e.Code)) + if e.Code != 0 { + badExit++ + } case <-time.After(waitIntv): - c.Ui.Output(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount)) - c.Ui.Output(fmt.Sprintf("Exec complete in %0.2f seconds", - float64(time.Now().Sub(start))/float64(time.Second))) + c.Ui.Info(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount)) + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Completed in %0.2f seconds", + float64(time.Now().Sub(start))/float64(time.Second))) + } break OUTER case <-errCh: @@ -273,6 +293,10 @@ OUTER: return 1 } } + + if badExit > 0 { + return 2 + } return 0 } @@ -500,10 +524,51 @@ Options: -service="" Regular expression to filter on service instances -tag="" Regular expression to filter on service tags. Must be used with -service. - -wait=1s Period to wait with no responses before terminating execution. - -wait-repl=100ms Period to wait for replication before firing event. This is an + -wait=2s Period to wait with no responses before terminating execution. + -wait-repl=200ms Period to wait for replication before firing event. This is an optimization to allow stale reads to be performed. -verbose Enables verbose output ` return strings.TrimSpace(helpText) } + +// TargettedUi is a UI that wraps another UI implementation and modifies +// the output to indicate a specific target. Specifically, all Say output +// is prefixed with the target name. Message output is not prefixed but +// is offset by the length of the target so that output is lined up properly +// with Say output. Machine-readable output has the proper target set. +type TargettedUi struct { + Target string + Ui cli.Ui +} + +func (u *TargettedUi) Ask(query string) (string, error) { + return u.Ui.Ask(u.prefixLines(true, query)) +} + +func (u *TargettedUi) Info(message string) { + u.Ui.Info(u.prefixLines(true, message)) +} + +func (u *TargettedUi) Output(message string) { + u.Ui.Output(u.prefixLines(false, message)) +} + +func (u *TargettedUi) Error(message string) { + u.Ui.Error(u.prefixLines(true, message)) +} + +func (u *TargettedUi) prefixLines(arrow bool, message string) string { + arrowText := "==>" + if !arrow { + arrowText = strings.Repeat(" ", len(arrowText)) + } + + var result bytes.Buffer + + for _, line := range strings.Split(message, "\n") { + result.WriteString(fmt.Sprintf("%s %s: %s\n", arrowText, u.Target, line)) + } + + return strings.TrimRightFunc(result.String(), unicode.IsSpace) +} From 2d7be2043148dfd6f629f86d107703f74778bb28 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Sep 2014 15:03:37 -0700 Subject: [PATCH 16/16] website: Updating docs --- website/source/docs/agent/http.html.markdown | 3 +- .../source/docs/agent/options.html.markdown | 3 + .../source/docs/commands/exec.html.markdown | 62 +++++++++++++++++++ .../source/docs/commands/index.html.markdown | 2 + .../getting-started/install.html.markdown | 3 + website/source/layouts/docs.erb | 4 ++ 6 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 website/source/docs/commands/exec.html.markdown diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index e24bfeb5f8..929069fbea 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -1209,7 +1209,8 @@ can be specified using the "?dc=" query parameter. The fire endpoint expects a PUT request, with an optional body. The body contents are opaque to Consul, and become the "payload" -of the event. +of the event. Any names starting with the "_" prefix should be considered +reserved, and for Consul's internal use. The `?node=`, `?service=`, and `?tag=` query parameters may optionally be provided. They respectively provide a regular expression to filter diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 8d91bf0268..088d72712d 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -227,6 +227,9 @@ definitions support being updated during a reload. * `data_dir` - Equivalent to the `-data-dir` command-line flag. +* `disable_remote_exec` - Disables support for remote execution. When set to true, + the agent will ignore any incoming remote exec requests. + * `dns_config` - This object allows a number of sub-keys to be set which can tune how DNS queries are perfomed. See this guide on [DNS caching](/docs/guides/dns-cache.html). The following sub-keys are available: diff --git a/website/source/docs/commands/exec.html.markdown b/website/source/docs/commands/exec.html.markdown new file mode 100644 index 0000000000..540f95eeae --- /dev/null +++ b/website/source/docs/commands/exec.html.markdown @@ -0,0 +1,62 @@ +--- +layout: "docs" +page_title: "Commands: Exec" +sidebar_current: "docs-commands-exec" +--- + +# Consul Exec + +Command: `consul exec` + +The exec command provides a mechahanism for remote execution. For example, +this can be used to run the `uptime` command across all machines providing +the `web` service. + +Remote execution works by specifying a job which is stored in the KV store. +Agent's are informed about the new job using the [event system](/docs/commands/event.html), +which propogates messages via the [gossip protocol](/docs/internals/gossip.html). +As a result, delivery is best-effort, and there is **no guarantee** of execution. + +While events are purely gossip driven, remote execution relies on the KV store +as a message broker. As a result, the `exec` command will not be able to +properly function during a Consul outage. + +## Usage + +Usage: `consul exec [options] [-|command...]` + +The only required option is a command to execute. This is either given +as trailing arguments, or by specifying '-', stdin will be read to +completion as a script to evaluate. + +The list of available flags are: + +* `-http-addr` - Address to the HTTP server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8500" which is the default HTTP address of a Consul agent. + +* `-datacenter` - Datacenter to query. Defaults to that of agent. In version + 0.4, that is the only supported value. + +* `-prefix` - Key prefix in the KV store to use for storing request data. + Defaults to "_rexec". + +* `-node` - Regular expression to filter nodes which should evaluate the event. + +* `-service` - Regular expression to filter to only nodes with matching services. + +* `-tag` - Regular expression to filter to only nodes with a service that has + a matching tag. This must be used with `-service`. As an example, you may + do "-server mysql -tag slave". + +* `-wait` - Specifies the period of time in which no agent's respond before considering + the job finished. This is basically the quiescent time required to assume completion. + This period is not a hard deadline, and the command will wait longer depending on + various heuristics. + +* `-wait-repl` - Period to wait after writing the job specification for replication. + This is a heuristic value and enables agents to do a stale read of the job. Defaults + to 200msec. + +* `-verbose` - Enables verbose output. + diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index 3c43cac470..5adf67685e 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -25,6 +25,8 @@ usage: consul [--version] [--help] [] Available commands are: agent Runs a Consul agent + event Fire a new event + exec Executes a command on Consul nodes force-leave Forces a member of the cluster to enter the "left" state info Provides debugging information for operators join Tell Consul agent to join cluster diff --git a/website/source/intro/getting-started/install.html.markdown b/website/source/intro/getting-started/install.html.markdown index 9ef7acaaa3..e224f09eb1 100644 --- a/website/source/intro/getting-started/install.html.markdown +++ b/website/source/intro/getting-started/install.html.markdown @@ -49,6 +49,8 @@ usage: consul [--version] [--help] [] Available commands are: agent Runs a Consul agent + event Fire a new event + exec Executes a command on Consul nodes force-leave Forces a member of the cluster to enter the "left" state info Provides debugging information for operators join Tell Consul agent to join cluster @@ -56,6 +58,7 @@ Available commands are: leave Gracefully leaves the Consul cluster and shuts down members Lists the members of a Consul cluster monitor Stream logs from a Consul agent + reload Triggers the agent to reload configuration files version Prints the Consul version watch Watch for changes in Consul ``` diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 04855527a0..b123619e8b 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -61,6 +61,10 @@ > event + + + > + exec >