diff --git a/command/agent/remote_exec.go b/command/agent/remote_exec.go index 9d24a556ad..f0fb198511 100644 --- a/command/agent/remote_exec.go +++ b/command/agent/remote_exec.go @@ -158,6 +158,7 @@ func (a *Agent) handleRemoteExec(msg *UserEvent) { } // Create the exec.Cmd + a.logger.Printf("[INFO] agent: remote exec '%s'", script) cmd, err := ExecScript(script) if err != nil { a.logger.Printf("[DEBUG] agent: failed to start remote exec: %v", err) @@ -282,11 +283,13 @@ func (a *Agent) remoteExecWriteOutput(event *remoteExecEvent, num int, output [] } // remoteExecWriteExitCode is used to write an exit code -func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) { +func (a *Agent) remoteExecWriteExitCode(event *remoteExecEvent, exitCode int) bool { val := []byte(strconv.FormatInt(int64(exitCode), 10)) if err := a.remoteExecWriteKey(event, remoteExecExitSuffix, val); err != nil { a.logger.Printf("[ERR] agent: failed to write exit code for remote exec job: %v", err) + return false } + return true } // remoteExecWriteKey is used to write an output key for a remote exec job diff --git a/command/agent/remote_exec_test.go b/command/agent/remote_exec_test.go index af65e5427b..d431b1f809 100644 --- a/command/agent/remote_exec_test.go +++ b/command/agent/remote_exec_test.go @@ -1,8 +1,15 @@ package agent import ( + "bytes" + "encoding/json" + "os" + "reflect" "testing" "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" ) func TestRexecWriter(t *testing.T) { @@ -75,3 +82,207 @@ func TestRexecWriter(t *testing.T) { t.Fatalf("should have buf") } } + +func TestRemoteExecGetSpec(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + spec := &remoteExecSpec{ + Command: "uptime", + Script: []byte("#!/bin/bash"), + Wait: time.Second, + } + buf, err := json.Marshal(spec) + if err != nil { + t.Fatalf("err: %v", err) + } + key := "_rexec/" + event.Session + "/job" + setKV(t, agent, key, buf) + + var out remoteExecSpec + if !agent.remoteExecGetSpec(event, &out) { + t.Fatalf("bad") + } + if !reflect.DeepEqual(spec, &out) { + t.Fatalf("bad spec") + } +} + +func TestRemoteExecWrites(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + if !agent.remoteExecWriteAck(event) { + t.Fatalf("bad") + } + + output := []byte("testing") + if !agent.remoteExecWriteOutput(event, 0, output) { + t.Fatalf("bad") + } + if !agent.remoteExecWriteOutput(event, 10, output) { + t.Fatalf("bad") + } + + if !agent.remoteExecWriteExitCode(event, 1) { + t.Fatalf("bad") + } + + key := "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack" + d := getKV(t, agent, key) + if d == nil || d.Session != event.Session { + t.Fatalf("bad ack: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) { + t.Fatalf("bad output: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/0000a" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || !bytes.Equal(d.Value, output) { + t.Fatalf("bad output: %#v", d) + } + + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || string(d.Value) != "1" { + t.Fatalf("bad output: %#v", d) + } +} + +func TestHandleRemoteExec(t *testing.T) { + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + + event := &remoteExecEvent{ + Prefix: "_rexec", + Session: makeRexecSession(t, agent), + } + defer destroySession(t, agent, event.Session) + + spec := &remoteExecSpec{ + Command: "uptime", + Wait: time.Second, + } + buf, err := json.Marshal(spec) + if err != nil { + t.Fatalf("err: %v", err) + } + key := "_rexec/" + event.Session + "/job" + setKV(t, agent, key, buf) + + buf, err = json.Marshal(event) + if err != nil { + t.Fatalf("err: %v", err) + } + msg := &UserEvent{ + ID: generateUUID(), + Payload: buf, + } + + // Handle the event... + agent.handleRemoteExec(msg) + + // Verify we have an ack + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/ack" + d := getKV(t, agent, key) + if d == nil || d.Session != event.Session { + t.Fatalf("bad ack: %#v", d) + } + + // Verify we have output + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/out/00000" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || + !bytes.Contains(d.Value, []byte("load")) { + t.Fatalf("bad output: %#v", d) + } + + // Verify we have an exit code + key = "_rexec/" + event.Session + "/" + agent.config.NodeName + "/exit" + d = getKV(t, agent, key) + if d == nil || d.Session != event.Session || string(d.Value) != "0" { + t.Fatalf("bad output: %#v", d) + } +} + +func makeRexecSession(t *testing.T, agent *Agent) string { + args := structs.SessionRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.SessionCreate, + Session: structs.Session{ + Node: agent.config.NodeName, + LockDelay: 15 * time.Second, + }, + } + var out string + if err := agent.RPC("Session.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + return out +} + +func destroySession(t *testing.T, agent *Agent, session string) { + args := structs.SessionRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.SessionDestroy, + Session: structs.Session{ + ID: session, + }, + } + var out string + if err := agent.RPC("Session.Apply", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } +} + +func setKV(t *testing.T, agent *Agent, key string, val []byte) { + write := structs.KVSRequest{ + Datacenter: agent.config.Datacenter, + Op: structs.KVSSet, + DirEnt: structs.DirEntry{ + Key: key, + Value: val, + }, + } + var success bool + if err := agent.RPC("KVS.Apply", &write, &success); err != nil { + t.Fatalf("err: %v", err) + } +} + +func getKV(t *testing.T, agent *Agent, key string) *structs.DirEntry { + req := structs.KeyRequest{ + Datacenter: agent.config.Datacenter, + Key: key, + } + var out structs.IndexedDirEntries + if err := agent.RPC("KVS.Get", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.Entries) > 0 { + return out.Entries[0] + } + return nil +}