diff --git a/command/agent/config.go b/command/agent/config.go index f91853cdde..efb15e9757 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -235,6 +235,10 @@ type Config struct { // agent layer using the standard APIs. Watches []map[string]interface{} `mapstructure:"watches"` + // DisableRemoteExec is used to turn off the remote execution + // feature. This is for security to prevent unknown scripts from running. + DisableRemoteExec bool `mapstructure:"disable_remote_exec"` + // AEInterval controls the anti-entropy interval. This is how often // the agent attempts to reconcile it's local state with the server' // representation of our state. Defaults to every 60s. @@ -676,6 +680,9 @@ func MergeConfig(a, b *Config) *Config { if len(b.WatchPlans) != 0 { result.WatchPlans = append(result.WatchPlans, b.WatchPlans...) } + if b.DisableRemoteExec { + result.DisableRemoteExec = true + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 75c0610a9e..bf939a6981 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -405,6 +405,17 @@ func TestDecodeConfig(t *testing.T) { if !reflect.DeepEqual(out, exp) { t.Fatalf("bad: %#v", config) } + + // remote exec + input = `{"disable_remote_exec": true}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if !config.DisableRemoteExec { + t.Fatalf("bad: %#v", config) + } } func TestDecodeConfig_Service(t *testing.T) { @@ -566,6 +577,7 @@ func TestMergeConfig(t *testing.T) { "handler": "foobar", }, }, + DisableRemoteExec: true, } c := MergeConfig(a, b) diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go new file mode 100644 index 0000000000..64fec4599a --- /dev/null +++ b/command/agent/remote_exec.go @@ -0,0 +1,319 @@ +package agent + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strconv" + "sync" + "syscall" + "time" + + "github.com/hashicorp/consul/consul/structs" +) + +const ( + // remoteExecFileName is the name of the file we append to + // the path, e.g. _rexec/session_id/job + remoteExecFileName = "job" + + // rExecAck is the suffix added to an ack path + remoteExecAckSuffix = "ack" + + // remoteExecAck is the suffix added to an exit code + remoteExecExitSuffix = "exit" + + // remoteExecOutputDivider is used to namespace the output + remoteExecOutputDivider = "out" + + // remoteExecOutputSize is the size we chunk output too + remoteExecOutputSize = 4 * 1024 + + // remoteExecOutputDeadline is how long we wait before uploading + // less than the chunk size + remoteExecOutputDeadline = 500 * time.Millisecond +) + +// remoteExecEvent is used as the payload of the user event to transmit +// what we need to know about the event +type remoteExecEvent struct { + Prefix string + Session string +} + +// remoteExecSpec is used as the specification of the remote exec. +// It is stored in the KV store +type remoteExecSpec struct { + Command string + Script []byte + Wait time.Duration +} + +type rexecWriter struct { + BufCh chan []byte + BufSize int + BufIdle time.Duration + CancelCh chan struct{} + + buf []byte + bufLen int + bufLock sync.Mutex + flush *time.Timer +} + +func (r *rexecWriter) Write(b []byte) (int, error) { + r.bufLock.Lock() + defer r.bufLock.Unlock() + if r.flush != nil { + r.flush.Stop() + r.flush = nil + } + inpLen := len(b) + if r.buf == nil { + r.buf = make([]byte, r.BufSize) + } + +COPY: + remain := len(r.buf) - r.bufLen + if remain > len(b) { + copy(r.buf[r.bufLen:], b) + r.bufLen += len(b) + } else { + copy(r.buf[r.bufLen:], b[:remain]) + b = b[remain:] + r.bufLen += remain + r.bufLock.Unlock() + r.Flush() + r.bufLock.Lock() + goto COPY + } + + r.flush = time.AfterFunc(r.BufIdle, r.Flush) + return inpLen, nil +} + +func (r *rexecWriter) Flush() { + r.bufLock.Lock() + defer r.bufLock.Unlock() + if r.flush != nil { + r.flush.Stop() + r.flush = nil + } + if r.bufLen == 0 { + return + } + select { + case r.BufCh <- r.buf[:r.bufLen]: + r.buf = make([]byte, r.BufSize) + r.bufLen = 0 + case <-r.CancelCh: + r.bufLen = 0 + } +} + +// handleRemoteExec is invoked when a new remote exec request is received +func (a *Agent) handleRemoteExec(msg *UserEvent) { + a.logger.Printf("[DEBUG] agent: received remote exec event (ID: %s)", msg.ID) + // Decode the event paylaod + var event remoteExecEvent + if err := json.Unmarshal(msg.Payload, &event); err != nil { + a.logger.Printf("[ERR] agent: failed to decode remote exec event: %v", err) + return + } + + // Read the job specification + var spec remoteExecSpec + if !a.remoteExecGetSpec(&event, &spec) { + return + } + + // Write the acknowledgement + if !a.remoteExecWriteAck(&event) { + return + } + + // Ensure we write out an exit code + exitCode := 0 + defer a.remoteExecWriteExitCode(&event, exitCode) + + // Check if this is a script, we may need to spill to disk + var script string + if len(spec.Script) != 0 { + tmpFile, err := ioutil.TempFile("", "rexec") + if err != nil { + a.logger.Printf("[DEBUG] agent: failed to make tmp file: %v", err) + exitCode = 255 + return + } + defer os.Remove(tmpFile.Name()) + os.Chmod(tmpFile.Name(), 0750) + tmpFile.Write(spec.Script) + tmpFile.Close() + script = tmpFile.Name() + } else { + script = spec.Command + } + + // Create the exec.Cmd + a.logger.Printf("[INFO] agent: remote exec '%s'", script) + cmd, err := ExecScript(script) + if err != nil { + a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err) + exitCode = 255 + return + } + + // Setup the output streaming + writer := &rexecWriter{ + BufCh: make(chan []byte, 16), + BufSize: remoteExecOutputSize, + BufIdle: remoteExecOutputDeadline, + CancelCh: make(chan struct{}), + } + cmd.Stdout = writer + cmd.Stderr = writer + + // Start execution + err = cmd.Start() + if err != nil { + a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err) + exitCode = 255 + return + } + + // Wait for the process to exit + exitCh := make(chan int, 1) + go func() { + err := cmd.Wait() + writer.Flush() + close(writer.BufCh) + if err != nil { + exitCh <- 0 + return + } + + // Try to determine the exit code + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { + exitCh <- status.ExitStatus() + return + } + } + exitCh <- 1 + }() + + // Wait until we are complete, uploading as we go +WAIT: + for num := 0; ; num++ { + select { + case out := <-writer.BufCh: + if out == nil { + break WAIT + } + if !a.remoteExecWriteOutput(&event, num, out) { + close(writer.CancelCh) + exitCode = 255 + return + } + case <-time.After(spec.Wait): + // Acts like a heartbeat, since there is no output + if !a.remoteExecWriteOutput(&event, num, nil) { + close(writer.CancelCh) + exitCode = 255 + return + } + } + } + + // Get the exit code + exitCode = <-exitCh +} + +// remoteExecGetSpec is used to get the exec specification. +// Returns if execution should continue +func (a *Agent) remoteExecGetSpec(event *remoteExecEvent, spec *remoteExecSpec) bool { + get := structs.KeyRequest{ + Datacenter: a.config.Datacenter, + Key: path.Join(event.Prefix, event.Session, remoteExecFileName), + QueryOptions: structs.QueryOptions{ + AllowStale: true, // Stale read for scale! Retry on failure. + }, + } + var out structs.IndexedDirEntries +QUERY: + if err := a.RPC("KVS.Get", &get, &out); err != nil { + a.logger.Printf("[ERR] agent: failed to get remote exec job: %v", err) + return false + } + if len(out.Entries) == 0 { + // If the initial read was stale and had no data, retry as a consistent read + if get.QueryOptions.AllowStale { + a.logger.Printf("[DEBUG] agent: trying consistent fetch of remote exec job spec") + get.QueryOptions.AllowStale = false + goto QUERY + } else { + a.logger.Printf("[DEBUG] agent: remote exec aborted, job spec missing") + return false + } + } + if err := json.Unmarshal(out.Entries[0].Value, &spec); err != nil { + a.logger.Printf("[ERR] agent: failed to decode remote exec spec: %v", err) + return false + } + return true +} + +// remoteExecWriteAck is used to write an ack. Returns if execution should +// continue. +func (a *Agent) remoteExecWriteAck(event *remoteExecEvent) bool { + if err := a.remoteExecWriteKey(event, remoteExecAckSuffix, nil); err != nil { + a.logger.Printf("[ERR] agent: failed to ack remote exec job: %v", err) + return false + } + return true +} + +// remoteExecWriteOutput is used to write output +func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output []byte) bool { + suffix := path.Join(remoteExecOutputDivider, fmt.Sprintf("%05x", num)) + if err := a.remoteExecWriteKey(event, suffix, output); err != nil { + a.logger.Printf("[ERR] agent: failed to write output for remote exec job: %v", err) + return false + } + return true +} + +// remoteExecWriteExitCode is used to write an exit code +func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) bool { + val := []byte(strconv.FormatInt(int64(exitCode), 10)) + if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil { + a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err) + return false + } + return true +} + +// remoteExecWriteKey is used to write an output key for a remote exec job +func (a *Agent) remoteExecWriteKey(event *remoteExecEvent, suffix string, val []byte) error { + key := path.Join(event.Prefix, event.Session, a.config.NodeName, suffix) + write := structs.KVSRequest{ + Datacenter: a.config.Datacenter, + Op: structs.KVSLock, + DirEnt: structs.DirEntry{ + Key: key, + Value: val, + Session: event.Session, + }, + } + var success bool + if err := a.RPC("KVS.Apply", &write, &success); err != nil { + return err + } + if !success { + return fmt.Errorf("write failed") + } + return nil +} diff --git a/command/agent/remote_exec_test.go b/command/agent/remote_exec_test.go new file mode 100644 index 0000000000..d431b1f809 --- /dev/null +++ b/command/agent/remote_exec_test.go @@ -0,0 +1,288 @@ +package agent + +import ( + "bytes" + "encoding/json" + "os" + "reflect" + "testing" + "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" +) + +func TestRexecWriter(t *testing.T) { + writer := &rexecWriter{ + BufCh: make(chan []byte, 16), + BufSize: 16, + BufIdle: 10 * time.Millisecond, + CancelCh: make(chan struct{}), + } + + // Write short, wait for idle + start := time.Now() + n, err := writer.Write([]byte("test")) + if err != nil { + t.Fatalf("err: %v", err) + } + if n != 4 { + t.Fatalf("bad: %v", n) + } + + select { + case b := <-writer.BufCh: + if len(b) != 4 { + t.Fatalf("Bad: %v", b) + } + if time.Now().Sub(start) < writer.BufIdle { + t.Fatalf("too early") + } + case <-time.After(2 * writer.BufIdle): + t.Fatalf("timeout") + } + + // Write in succession to prevent the timeout + writer.Write([]byte("test")) + time.Sleep(writer.BufIdle / 2) + writer.Write([]byte("test")) + time.Sleep(writer.BufIdle / 2) + start = time.Now() + writer.Write([]byte("test")) + + select { + case b := <-writer.BufCh: + if len(b) != 12 { + t.Fatalf("Bad: %v", b) + } + if time.Now().Sub(start) < writer.BufIdle { + t.Fatalf("too early") + } + case <-time.After(2 * writer.BufIdle): + t.Fatalf("timeout") + } + + // Write large values, multiple flushes required + writer.Write([]byte("01234567890123456789012345678901")) + + select { + case b := <-writer.BufCh: + if string(b) != "0123456789012345" { + t.Fatalf("bad: %s", b) + } + default: + t.Fatalf("should have buf") + } + select { + case b := <-writer.BufCh: + if string(b) != "6789012345678901" { + t.Fatalf("bad: %s", b) + } + default: + t.Fatalf("should have buf") + } +} + +func TestRemoteExecGetSpec(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + spec := &remoteExecSpec{ + Command: "uptime", + Script: []byte("#!/bin/bash"), + Wait: time.Second, + } + buf, err := json.Marshal(spec) + if err != nil { + t.Fatalf("err: %v", err) + } + key := "_rexec/" + event.Session + "/job" + setKV(t, agent, key, buf) + + var out remoteExecSpec + if !agent.remoteExecGetSpec(event, &out) { + t.Fatalf("bad") + } + if !reflect.DeepEqual(spec, &out) { + t.Fatalf("bad spec") + } +} + +func TestRemoteExecWrites(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + if !agent.remoteExecWriteAck(event) { + t.Fatalf("bad") + } + + output := []byte("testing") + if !agent.remoteExecWriteOutput(event, 0, output) { + t.Fatalf("bad") + } + if !agent.remoteExecWriteOutput(event, 10, output) { + t.Fatalf("bad") + } + + if !agent.remoteExecWriteExitCode(event, 1) { + t.Fatalf("bad") + } + + key := "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack" + d := getKV(t, agent, key) + if d == nil || d.Session != event.Session { + t.Fatalf("bad ack: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) { + t.Fatalf("bad output: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/0000a" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) { + t.Fatalf("bad output: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || string(d.Value) != "1" { + t.Fatalf("bad output: %#v", d) + } +} + +func TestHandleRemoteExec(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + spec := &remoteExecSpec{ + Command: "uptime", + Wait: time.Second, + } + buf, err := json.Marshal(spec) + if err != nil { + t.Fatalf("err: %v", err) + } + key := "_rexec/" + event.Session + "/job" + setKV(t, agent, key, buf) + + buf, err = json.Marshal(event) + if err != nil { + t.Fatalf("err: %v", err) + } + msg := &UserEvent{ + ID: generateUUID(), + Payload: buf, + } + + // Handle the event... + agent.handleRemoteExec(msg) + + // Verify we have an ack + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack" + d := getKV(t, agent, key) + if d == nil || d.Session != event.Session { + t.Fatalf("bad ack: %#v", d) + } + + // Verify we have output + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || + !bytes.Contains(d.Value, []byte("load")) { + t.Fatalf("bad output: %#v", d) + } + + // Verify we have an exit code + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || string(d.Value) != "0" { + t.Fatalf("bad output: %#v", d) + } +} + +func makeRexecSession(t *testing.T, agent *Agent) string { + args := structs.SessionRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.SessionCreate, + Session: structs.Session{ + Node: agent.config.NodeName, + LockDelay: 15 * time.Second, + }, + } + var out string + if err := agent.RPC("Session.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + return out +} + +func destroySession(t *testing.T, agent *Agent, session string) { + args := structs.SessionRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.SessionDestroy, + Session: structs.Session{ + ID: session, + }, + } + var out string + if err := agent.RPC("Session.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } +} + +func setKV(t *testing.T, agent *Agent, key string, val []byte) { + write := structs.KVSRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: key, + Value: val, + }, + } + var success bool + if err := agent.RPC("KVS.Apply", &write, &success); err != nil { + t.Fatalf("err: %v", err) + } +} + +func getKV(t *testing.T, agent *Agent, key string) *structs.DirEntry { + req := structs.KeyRequest{ + Datacenter: agent.config.Datacenter, + Key: key, + } + var out structs.IndexedDirEntries + if err := agent.RPC("KVS.Get", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.Entries) > 0 { + return out.Entries[0] + } + return nil +} diff --git a/command/agent/user_event.go b/command/agent/user_event.go index be1037d51d..64891981f7 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -1,17 +1,18 @@ package agent import ( - "bytes" "fmt" "regexp" "github.com/hashicorp/consul/consul/structs" - "github.com/ugorji/go/codec" ) const ( // userEventMaxVersion is the maximum protocol version we understand userEventMaxVersion = 1 + + // remoteExecName is the event name for a remote exec command + remoteExecName = "_rexec" ) // UserEventParam is used to parameterize a user event @@ -79,7 +80,7 @@ func (a *Agent) UserEvent(dc string, params *UserEvent) error { // Format message params.ID = generateUUID() params.Version = userEventMaxVersion - payload, err := encodeUserEvent(¶ms) + payload, err := encodeMsgPack(¶ms) if err != nil { return fmt.Errorf("UserEvent encoding failed: %v", err) } @@ -114,7 +115,7 @@ func (a *Agent) handleEvents() { case e := <-a.eventCh: // Decode the event msg := new(UserEvent) - if err := decodeUserEvent(e.Payload, msg); err != nil { + if err := decodeMsgPack(e.Payload, msg); err != nil { a.logger.Printf("[ERR] agent: Failed to decode event: %v", err) continue } @@ -208,7 +209,19 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool { // ingestUserEvent is used to process an event that passes filtering func (a *Agent) ingestUserEvent(msg *UserEvent) { - a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID) + // Special handling for internal events + switch msg.Name { + case remoteExecName: + if a.config.DisableRemoteExec { + a.logger.Printf("[INFO] agent: ignoring remote exec event (%s), disabled.", msg.ID) + } else { + go a.handleRemoteExec(msg) + } + return + default: + a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID) + } + a.eventLock.Lock() defer func() { a.eventLock.Unlock() @@ -253,15 +266,3 @@ func (a *Agent) LastUserEvent() *UserEvent { idx := (((a.eventIndex - 1) % n) + n) % n return a.eventBuf[idx] } - -// Decode is used to decode a MsgPack encoded object -func decodeUserEvent(buf []byte, out interface{}) error { - return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) -} - -// encodeUserEvent is used to encode user event -func encodeUserEvent(msg interface{}) ([]byte, error) { - var buf bytes.Buffer - err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) - return buf.Bytes(), err -} diff --git a/command/agent/util.go b/command/agent/util.go index b753505b50..c91a6dbc44 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -1,6 +1,7 @@ package agent import ( + "bytes" crand "crypto/rand" "fmt" "math" @@ -9,6 +10,8 @@ import ( "os/exec" "runtime" "time" + + "github.com/ugorji/go/codec" ) const ( @@ -76,3 +79,15 @@ func generateUUID() string { buf[8:10], buf[10:16]) } + +// decodeMsgPack is used to decode a MsgPack encoded object +func decodeMsgPack(buf []byte, out interface{}) error { + return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) +} + +// encodeMsgPack is used to encode an object with msgpack +func encodeMsgPack(msg interface{}) ([]byte, error) { + var buf bytes.Buffer + err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) + return buf.Bytes(), err +} diff --git a/command/exec.go b/command/exec.go new file mode 100644 index 0000000000..996bbd5b2d --- /dev/null +++ b/command/exec.go @@ -0,0 +1,574 @@ +package command + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io" + "os" + "path" + "regexp" + "strconv" + "strings" + "time" + "unicode" + + "github.com/armon/consul-api" + "github.com/mitchellh/cli" +) + +const ( + // rExecPrefix is the prefix in the KV store used to + // store the remote exec data + rExecPrefix = "_rexec" + + // rExecFileName is the name of the file we append to + // the path, e.g. _rexec/session_id/job + rExecFileName = "job" + + // rExecAck is the suffix added to an ack path + rExecAckSuffix = "/ack" + + // rExecAck is the suffix added to an exit code + rExecExitSuffix = "/exit" + + // rExecOutputDivider is used to namespace the output + rExecOutputDivider = "/out/" + + // rExecReplicationWait is how long we wait for replication + rExecReplicationWait = 200 * time.Millisecond + + // rExecQuietWait is how long we wait for no responses + // before assuming the job is done. + rExecQuietWait = 2 * time.Second +) + +// rExecConf is used to pass around configuration +type rExecConf struct { + datacenter string + prefix string + + node string + service string + tag string + + wait time.Duration + replWait time.Duration + + cmd string + script []byte + + verbose bool +} + +// rExecEvent is the event we broadcast using a user-event +type rExecEvent struct { + Prefix string + Session string +} + +// rExecSpec is the file we upload to specify the parameters +// of the remote execution. +type rExecSpec struct { + // Command is a single command to run directly in the shell + Command string `json:",omitempty"` + + // Script should be spilled to a file and executed + Script []byte `json:",omitempty"` + + // Wait is how long we are waiting on a quiet period to terminate + Wait time.Duration +} + +// rExecAck is used to transmit an acknowledgement +type rExecAck struct { + Node string +} + +// rExecHeart is used to transmit a heartbeat +type rExecHeart struct { + Node string +} + +// rExecOutput is used to transmit a chunk of output +type rExecOutput struct { + Node string + Output []byte +} + +// rExecExit is used to transmit an exit code +type rExecExit struct { + Node string + Code int +} + +// ExecCommand is a Command implementation that is used to +// do remote execution of commands +type ExecCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui + conf rExecConf + client *consulapi.Client + sessionID string +} + +func (c *ExecCommand) Run(args []string) int { + cmdFlags := flag.NewFlagSet("exec", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.StringVar(&c.conf.datacenter, "datacenter", "", "") + cmdFlags.StringVar(&c.conf.node, "node", "", "") + cmdFlags.StringVar(&c.conf.service, "service", "", "") + cmdFlags.StringVar(&c.conf.tag, "tag", "", "") + cmdFlags.StringVar(&c.conf.prefix, "prefix", rExecPrefix, "") + cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, "") + cmdFlags.DurationVar(&c.conf.wait, "wait", rExecQuietWait, "") + cmdFlags.BoolVar(&c.conf.verbose, "verbose", false, "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + // Join the commands to execute + c.conf.cmd = strings.Join(cmdFlags.Args(), " ") + + // If there is no command, read stdin for a script input + if c.conf.cmd == "-" { + c.conf.cmd = "" + var buf bytes.Buffer + _, err := io.Copy(&buf, os.Stdin) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read stdin: %v", err)) + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + c.conf.script = buf.Bytes() + } + + // Ensure we have a command or script + if c.conf.cmd == "" && len(c.conf.script) == 0 { + c.Ui.Error("Must specify a command to execute") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + + // Validate the configuration + if err := c.conf.validate(); err != nil { + c.Ui.Error(err.Error()) + return 1 + } + + // Create and test the HTTP client + client, err := HTTPClientDC(*httpAddr, c.conf.datacenter) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + _, err = client.Agent().NodeName() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + c.client = client + + // Create the job spec + spec, err := c.makeRExecSpec() + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to create job spec: %s", err)) + return 1 + } + + // Create a session for this + c.sessionID, err = c.createSession() + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to create session: %s", err)) + return 1 + } + defer c.destroySession() + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Created remote execution session: %s", c.sessionID)) + } + + // Upload the payload + if err := c.uploadPayload(spec); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to create job file: %s", err)) + return 1 + } + defer c.destroyData() + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Uploaded remote execution spec")) + } + + // Wait for replication. This is done so that when the event is + // received, the job file can be read using a stale read. If the + // stale read fails, we expect a consistent read to be done, so + // largely this is a heuristic. + select { + case <-time.After(c.conf.replWait): + case <-c.ShutdownCh: + return 1 + } + + // Fire the event + id, err := c.fireEvent() + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to fire event: %s", err)) + return 1 + } + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Fired remote execution event: %s", id)) + } + + // Wait for the job to finish now + return c.waitForJob() +} + +// waitForJob is used to poll for results and wait until the job is terminated +func (c *ExecCommand) waitForJob() int { + // Although the session destroy is already deferred, we do it again here, + // because invalidation of the session before destroyData() ensures there is + // no race condition allowing an agent to upload data (the acquire will fail). + defer c.destroySession() + start := time.Now() + ackCh := make(chan rExecAck, 128) + heartCh := make(chan rExecHeart, 128) + outputCh := make(chan rExecOutput, 128) + exitCh := make(chan rExecExit, 128) + doneCh := make(chan struct{}) + errCh := make(chan struct{}, 1) + defer close(doneCh) + go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh) + target := &TargettedUi{Ui: c.Ui} + + var ackCount, exitCount, badExit int +OUTER: + for { + // Determine wait time. We provide a larger window if we know about + // nodes which are still working. + waitIntv := c.conf.wait + if ackCount > exitCount { + waitIntv *= 2 + } + + select { + case e := <-ackCh: + ackCount++ + if c.conf.verbose { + target.Target = e.Node + target.Info("acknowledged") + } + + case h := <-heartCh: + if c.conf.verbose { + target.Target = h.Node + target.Info("heartbeat received") + } + + case e := <-outputCh: + target.Target = e.Node + target.Output(string(e.Output)) + + case e := <-exitCh: + exitCount++ + target.Target = e.Node + target.Info(fmt.Sprintf("finished with exit code %d", e.Code)) + if e.Code != 0 { + badExit++ + } + + case <-time.After(waitIntv): + c.Ui.Info(fmt.Sprintf("%d / %d node(s) completed / acknowledged", exitCount, ackCount)) + if c.conf.verbose { + c.Ui.Info(fmt.Sprintf("Completed in %0.2f seconds", + float64(time.Now().Sub(start))/float64(time.Second))) + } + break OUTER + + case <-errCh: + return 1 + + case <-c.ShutdownCh: + return 1 + } + } + + if badExit > 0 { + return 2 + } + return 0 +} + +// streamResults is used to perform blocking queries against the KV endpoint and stream in +// notice of various events into waitForJob +func (c *ExecCommand) streamResults(doneCh chan struct{}, ackCh chan rExecAck, heartCh chan rExecHeart, + outputCh chan rExecOutput, exitCh chan rExecExit, errCh chan struct{}) { + kv := c.client.KV() + opts := consulapi.QueryOptions{WaitTime: c.conf.wait} + dir := path.Join(c.conf.prefix, c.sessionID) + "/" + seen := make(map[string]struct{}) + + for { + // Check if we've been signaled to exit + select { + case <-doneCh: + return + default: + } + + // Block on waiting for new keys + keys, qm, err := kv.Keys(dir, "", &opts) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read results: %s", err)) + goto ERR_EXIT + } + + // Fast-path the no-change case + if qm.LastIndex == opts.WaitIndex { + continue + } + opts.WaitIndex = qm.LastIndex + + // Handle each key + for _, key := range keys { + // Ignore if we've seen it + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + + // Trim the directory + full := key + key = strings.TrimPrefix(key, dir) + + // Handle the key type + switch { + case key == rExecFileName: + continue + case strings.HasSuffix(key, rExecAckSuffix): + ackCh <- rExecAck{Node: strings.TrimSuffix(key, rExecAckSuffix)} + + case strings.HasSuffix(key, rExecExitSuffix): + pair, _, err := kv.Get(full, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err)) + continue + } + code, err := strconv.ParseInt(string(pair.Value), 10, 32) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse exit code '%s': %v", pair.Value, err)) + continue + } + exitCh <- rExecExit{ + Node: strings.TrimSuffix(key, rExecExitSuffix), + Code: int(code), + } + + case strings.LastIndex(key, rExecOutputDivider) != -1: + pair, _, err := kv.Get(full, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read key '%s': %v", full, err)) + continue + } + idx := strings.LastIndex(key, rExecOutputDivider) + node := key[:idx] + if len(pair.Value) == 0 { + heartCh <- rExecHeart{Node: node} + } else { + outputCh <- rExecOutput{Node: node, Output: pair.Value} + } + + default: + c.Ui.Error(fmt.Sprintf("Unknown key '%s', ignoring.", key)) + } + } + } + return + +ERR_EXIT: + select { + case errCh <- struct{}{}: + default: + } +} + +// validate checks that the configuration is sane +func (conf *rExecConf) validate() error { + // Validate the filters + if conf.node != "" { + if _, err := regexp.Compile(conf.node); err != nil { + return fmt.Errorf("Failed to compile node filter regexp: %v", err) + } + } + if conf.service != "" { + if _, err := regexp.Compile(conf.service); err != nil { + return fmt.Errorf("Failed to compile service filter regexp: %v", err) + } + } + if conf.tag != "" { + if _, err := regexp.Compile(conf.tag); err != nil { + return fmt.Errorf("Failed to compile tag filter regexp: %v", err) + } + } + if conf.tag != "" && conf.service == "" { + return fmt.Errorf("Cannot provide tag filter without service filter.") + } + return nil +} + +// createSession is used to create a new session for this command +func (c *ExecCommand) createSession() (string, error) { + session := c.client.Session() + se := consulapi.SessionEntry{ + Name: "Remote Exec", + } + id, _, err := session.Create(&se, nil) + return id, err +} + +// destroySession is used to destroy the associated session +func (c *ExecCommand) destroySession() error { + session := c.client.Session() + _, err := session.Destroy(c.sessionID, nil) + return err +} + +// makeRExecSpec creates a serialized job specification +// that can be uploaded which will be parsed by agents to +// determine what to do. +func (c *ExecCommand) makeRExecSpec() ([]byte, error) { + spec := &rExecSpec{ + Command: c.conf.cmd, + Script: c.conf.script, + Wait: c.conf.wait, + } + return json.Marshal(spec) +} + +// uploadPayload is used to upload the request payload +func (c *ExecCommand) uploadPayload(payload []byte) error { + kv := c.client.KV() + pair := consulapi.KVPair{ + Key: path.Join(c.conf.prefix, c.sessionID, rExecFileName), + Value: payload, + Session: c.sessionID, + } + ok, _, err := kv.Acquire(&pair, nil) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("failed to acquire key %s", pair.Key) + } + return nil +} + +// destroyData is used to nuke all the data associated with +// this remote exec. We just do a recursive delete of our +// data directory. +func (c *ExecCommand) destroyData() error { + kv := c.client.KV() + dir := path.Join(c.conf.prefix, c.sessionID) + _, err := kv.DeleteTree(dir, nil) + return err +} + +// fireEvent is used to fire the event that will notify nodes +// about the remote execution. Returns the event ID or error +func (c *ExecCommand) fireEvent() (string, error) { + // Create the user event payload + msg := &rExecEvent{ + Prefix: c.conf.prefix, + Session: c.sessionID, + } + buf, err := json.Marshal(msg) + if err != nil { + return "", err + } + + // Format the user event + event := c.client.Event() + params := &consulapi.UserEvent{ + Name: "_rexec", + Payload: buf, + NodeFilter: c.conf.node, + ServiceFilter: c.conf.service, + TagFilter: c.conf.tag, + } + + // Fire the event + id, _, err := event.Fire(params, nil) + return id, err +} + +func (c *ExecCommand) Synopsis() string { + return "Executes a command on Consul nodes" +} + +func (c *ExecCommand) Help() string { + helpText := ` +Usage: consul exec [options] [-|command...] + + Evaluates a command on remote Consul nodes. The nodes responding can + be filtered using regular expressions on node name, service, and tag + definitions. If a command is '-', stdin will be read until EOF + and used as a script input. + +Options: + + -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. + -datacenter="" Datacenter to dispatch in. Defaults to that of agent. + -prefix="_rexec" Prefix in the KV store to use for request data + -node="" Regular expression to filter on node names + -service="" Regular expression to filter on service instances + -tag="" Regular expression to filter on service tags. Must be used + with -service. + -wait=2s Period to wait with no responses before terminating execution. + -wait-repl=200ms Period to wait for replication before firing event. This is an + optimization to allow stale reads to be performed. + -verbose Enables verbose output +` + return strings.TrimSpace(helpText) +} + +// TargettedUi is a UI that wraps another UI implementation and modifies +// the output to indicate a specific target. Specifically, all Say output +// is prefixed with the target name. Message output is not prefixed but +// is offset by the length of the target so that output is lined up properly +// with Say output. Machine-readable output has the proper target set. +type TargettedUi struct { + Target string + Ui cli.Ui +} + +func (u *TargettedUi) Ask(query string) (string, error) { + return u.Ui.Ask(u.prefixLines(true, query)) +} + +func (u *TargettedUi) Info(message string) { + u.Ui.Info(u.prefixLines(true, message)) +} + +func (u *TargettedUi) Output(message string) { + u.Ui.Output(u.prefixLines(false, message)) +} + +func (u *TargettedUi) Error(message string) { + u.Ui.Error(u.prefixLines(true, message)) +} + +func (u *TargettedUi) prefixLines(arrow bool, message string) string { + arrowText := "==>" + if !arrow { + arrowText = strings.Repeat(" ", len(arrowText)) + } + + var result bytes.Buffer + + for _, line := range strings.Split(message, "\n") { + result.WriteString(fmt.Sprintf("%s %s: %s\n", arrowText, u.Target, line)) + } + + return strings.TrimRightFunc(result.String(), unicode.IsSpace) +} diff --git a/command/exec_test.go b/command/exec_test.go new file mode 100644 index 0000000000..e48cd3abaa --- /dev/null +++ b/command/exec_test.go @@ -0,0 +1,319 @@ +package command + +import ( + "strings" + "testing" + "time" + + "github.com/armon/consul-api" + "github.com/hashicorp/consul/testutil" + "github.com/mitchellh/cli" +) + +func TestExecCommand_implements(t *testing.T) { + var _ cli.Command = &ExecCommand{} +} + +func TestExecCommandRun(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + ui := new(cli.MockUi) + c := &ExecCommand{Ui: ui} + args := []string{"-http-addr=" + a1.httpAddr, "-wait=400ms", "uptime"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), "load") { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} + +func waitForLeader(t *testing.T, httpAddr string) { + client, err := HTTPClient(httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForResult(func() (bool, error) { + _, qm, err := client.Catalog().Nodes(nil) + if err != nil { + return false, err + } + return qm.KnownLeader, nil + }, func(err error) { + t.Fatalf("failed to find leader: %v", err) + }) +} + +func TestExecCommand_Validate(t *testing.T) { + conf := &rExecConf{} + err := conf.validate() + if err != nil { + t.Fatalf("err: %v", err) + } + + conf.node = "(" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } + + conf.node = "" + conf.service = "(" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } + + conf.service = "()" + conf.tag = "(" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } + + conf.service = "" + conf.tag = "foo" + err = conf.validate() + if err == nil { + t.Fatalf("err: %v", err) + } +} + +func TestExecCommand_Sessions(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + client, err := HTTPClient(a1.httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + ui := new(cli.MockUi) + c := &ExecCommand{ + Ui: ui, + client: client, + } + + id, err := c.createSession() + if err != nil { + t.Fatalf("err: %v", err) + } + + se, _, err := client.Session().Info(id, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if se == nil || se.Name != "Remote Exec" { + t.Fatalf("bad: %v", se) + } + + c.sessionID = id + err = c.destroySession() + if err != nil { + t.Fatalf("err: %v", err) + } + + se, _, err = client.Session().Info(id, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if se != nil { + t.Fatalf("bad: %v", se) + } +} + +func TestExecCommand_UploadDestroy(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + client, err := HTTPClient(a1.httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + ui := new(cli.MockUi) + c := &ExecCommand{ + Ui: ui, + client: client, + } + + id, err := c.createSession() + if err != nil { + t.Fatalf("err: %v", err) + } + c.sessionID = id + + c.conf.prefix = "_rexec" + c.conf.cmd = "uptime" + c.conf.wait = time.Second + + buf, err := c.makeRExecSpec() + if err != nil { + t.Fatalf("err: %v", err) + } + + err = c.uploadPayload(buf) + if err != nil { + t.Fatalf("err: %v", err) + } + + pair, _, err := client.KV().Get("_rexec/"+id+"/job", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if pair == nil || len(pair.Value) == 0 { + t.Fatalf("missing job spec") + } + + err = c.destroyData() + if err != nil { + t.Fatalf("err: %v", err) + } + + pair, _, err = client.KV().Get("_rexec/"+id+"/job", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + if pair != nil { + t.Fatalf("should be destroyed") + } +} + +func TestExecCommand_StreamResults(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + client, err := HTTPClient(a1.httpAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + ui := new(cli.MockUi) + c := &ExecCommand{ + Ui: ui, + client: client, + } + c.conf.prefix = "_rexec" + + id, err := c.createSession() + if err != nil { + t.Fatalf("err: %v", err) + } + c.sessionID = id + + ackCh := make(chan rExecAck, 128) + heartCh := make(chan rExecHeart, 128) + outputCh := make(chan rExecOutput, 128) + exitCh := make(chan rExecExit, 128) + doneCh := make(chan struct{}) + errCh := make(chan struct{}, 1) + defer close(doneCh) + go c.streamResults(doneCh, ackCh, heartCh, outputCh, exitCh, errCh) + + prefix := "_rexec/" + id + "/" + ok, _, err := client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/ack", + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case a := <-ackCh: + if a.Node != "foo" { + t.Fatalf("bad: %#v", a) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } + + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/exit", + Value: []byte("127"), + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case e := <-exitCh: + if e.Node != "foo" || e.Code != 127 { + t.Fatalf("bad: %#v", e) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } + + // Random key, should ignore + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/random", + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + // Output heartbeat + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/out/00000", + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case h := <-heartCh: + if h.Node != "foo" { + t.Fatalf("bad: %#v", h) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } + + // Output value + ok, _, err = client.KV().Acquire(&consulapi.KVPair{ + Key: prefix + "foo/out/00001", + Value: []byte("test"), + Session: id, + }, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("should be ok bro") + } + + select { + case o := <-outputCh: + if o.Node != "foo" || string(o.Output) != "test" { + t.Fatalf("bad: %#v", o) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("timeout") + } +} diff --git a/command/rpc.go b/command/rpc.go index 97a31d083c..0975c6b4e3 100644 --- a/command/rpc.go +++ b/command/rpc.go @@ -27,7 +27,13 @@ func HTTPAddrFlag(f *flag.FlagSet) *string { // HTTPClient returns a new Consul HTTP client with the given address. func HTTPClient(addr string) (*consulapi.Client, error) { + return HTTPClientDC(addr, "") +} + +// HTTPClientDC returns a new Consul HTTP client with the given address and datacenter +func HTTPClientDC(addr, dc string) (*consulapi.Client, error) { conf := consulapi.DefaultConfig() conf.Address = addr + conf.Datacenter = dc return consulapi.NewClient(conf) } diff --git a/commands.go b/commands.go index f68cf20801..820e64d1dd 100644 --- a/commands.go +++ b/commands.go @@ -31,6 +31,13 @@ func init() { }, nil }, + "exec": func() (cli.Command, error) { + return &command.ExecCommand{ + ShutdownCh: makeShutdownCh(), + Ui: ui, + }, nil + }, + "force-leave": func() (cli.Command, error) { return &command.ForceLeaveCommand{ Ui: ui, diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index e24bfeb5f8..929069fbea 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -1209,7 +1209,8 @@ can be specified using the "?dc=" query parameter. The fire endpoint expects a PUT request, with an optional body. The body contents are opaque to Consul, and become the "payload" -of the event. +of the event. Any names starting with the "_" prefix should be considered +reserved, and for Consul's internal use. The `?node=`, `?service=`, and `?tag=` query parameters may optionally be provided. They respectively provide a regular expression to filter diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 8d91bf0268..088d72712d 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -227,6 +227,9 @@ definitions support being updated during a reload. * `data_dir` - Equivalent to the `-data-dir` command-line flag. +* `disable_remote_exec` - Disables support for remote execution. When set to true, + the agent will ignore any incoming remote exec requests. + * `dns_config` - This object allows a number of sub-keys to be set which can tune how DNS queries are perfomed. See this guide on [DNS caching](/docs/guides/dns-cache.html). The following sub-keys are available: diff --git a/website/source/docs/commands/exec.html.markdown b/website/source/docs/commands/exec.html.markdown new file mode 100644 index 0000000000..540f95eeae --- /dev/null +++ b/website/source/docs/commands/exec.html.markdown @@ -0,0 +1,62 @@ +--- +layout: "docs" +page_title: "Commands: Exec" +sidebar_current: "docs-commands-exec" +--- + +# Consul Exec + +Command: `consul exec` + +The exec command provides a mechahanism for remote execution. For example, +this can be used to run the `uptime` command across all machines providing +the `web` service. + +Remote execution works by specifying a job which is stored in the KV store. +Agent's are informed about the new job using the [event system](/docs/commands/event.html), +which propogates messages via the [gossip protocol](/docs/internals/gossip.html). +As a result, delivery is best-effort, and there is **no guarantee** of execution. + +While events are purely gossip driven, remote execution relies on the KV store +as a message broker. As a result, the `exec` command will not be able to +properly function during a Consul outage. + +## Usage + +Usage: `consul exec [options] [-|command...]` + +The only required option is a command to execute. This is either given +as trailing arguments, or by specifying '-', stdin will be read to +completion as a script to evaluate. + +The list of available flags are: + +* `-http-addr` - Address to the HTTP server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8500" which is the default HTTP address of a Consul agent. + +* `-datacenter` - Datacenter to query. Defaults to that of agent. In version + 0.4, that is the only supported value. + +* `-prefix` - Key prefix in the KV store to use for storing request data. + Defaults to "_rexec". + +* `-node` - Regular expression to filter nodes which should evaluate the event. + +* `-service` - Regular expression to filter to only nodes with matching services. + +* `-tag` - Regular expression to filter to only nodes with a service that has + a matching tag. This must be used with `-service`. As an example, you may + do "-server mysql -tag slave". + +* `-wait` - Specifies the period of time in which no agent's respond before considering + the job finished. This is basically the quiescent time required to assume completion. + This period is not a hard deadline, and the command will wait longer depending on + various heuristics. + +* `-wait-repl` - Period to wait after writing the job specification for replication. + This is a heuristic value and enables agents to do a stale read of the job. Defaults + to 200msec. + +* `-verbose` - Enables verbose output. + diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index 3c43cac470..5adf67685e 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -25,6 +25,8 @@ usage: consul [--version] [--help] [] Available commands are: agent Runs a Consul agent + event Fire a new event + exec Executes a command on Consul nodes force-leave Forces a member of the cluster to enter the "left" state info Provides debugging information for operators join Tell Consul agent to join cluster diff --git a/website/source/intro/getting-started/install.html.markdown b/website/source/intro/getting-started/install.html.markdown index 9ef7acaaa3..e224f09eb1 100644 --- a/website/source/intro/getting-started/install.html.markdown +++ b/website/source/intro/getting-started/install.html.markdown @@ -49,6 +49,8 @@ usage: consul [--version] [--help] [] Available commands are: agent Runs a Consul agent + event Fire a new event + exec Executes a command on Consul nodes force-leave Forces a member of the cluster to enter the "left" state info Provides debugging information for operators join Tell Consul agent to join cluster @@ -56,6 +58,7 @@ Available commands are: leave Gracefully leaves the Consul cluster and shuts down members Lists the members of a Consul cluster monitor Stream logs from a Consul agent + reload Triggers the agent to reload configuration files version Prints the Consul version watch Watch for changes in Consul ``` diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 04855527a0..b123619e8b 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -61,6 +61,10 @@ > event + + + > + exec >