command/exec: Support foreign datacenters

This commit is contained in:
Armon Dadgar 2015-01-07 14:22:40 -08:00
parent bb3829c2d9
commit ce83322e16
2 changed files with 190 additions and 2 deletions

View File

@ -42,6 +42,14 @@ const (
// rExecQuietWait is how long we wait for no responses // rExecQuietWait is how long we wait for no responses
// before assuming the job is done. // before assuming the job is done.
rExecQuietWait = 2 * time.Second rExecQuietWait = 2 * time.Second
// rExecForeignTTL is how long we default the session TTL
// to when doing an exec in a foreign DC.
rExecForeignTTL = "15s"
// rExecRenewInterval is how often we renew the session TTL
// when doing an exec in a foreign DC.
rExecRenewInterval = 5 * time.Second
) )
// rExecConf is used to pass around configuration // rExecConf is used to pass around configuration
@ -49,6 +57,10 @@ type rExecConf struct {
datacenter string datacenter string
prefix string prefix string
foreignDC bool
localDC string
localNode string
node string node string
service string service string
tag string tag string
@ -111,6 +123,7 @@ type ExecCommand struct {
conf rExecConf conf rExecConf
client *consulapi.Client client *consulapi.Client
sessionID string sessionID string
stopCh chan struct{}
} }
func (c *ExecCommand) Run(args []string) int { func (c *ExecCommand) Run(args []string) int {
@ -166,13 +179,23 @@ func (c *ExecCommand) Run(args []string) int {
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1 return 1
} }
_, err = client.Agent().NodeName() info, err := client.Agent().Self()
if err != nil { if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err))
return 1 return 1
} }
c.client = client c.client = client
// Check if this is a foreign datacenter
if c.conf.datacenter != "" && c.conf.datacenter != info["Config"]["Datacenter"] {
if c.conf.verbose {
c.Ui.Info("Remote exec in foreign datacenter, using Session TTL")
}
c.conf.foreignDC = true
c.conf.localDC = info["Config"]["Datacenter"].(string)
c.conf.localNode = info["Config"]["NodeName"].(string)
}
// Create the job spec // Create the job spec
spec, err := c.makeRExecSpec() spec, err := c.makeRExecSpec()
if err != nil { if err != nil {
@ -418,16 +441,88 @@ func (conf *rExecConf) validate() error {
// createSession is used to create a new session for this command // createSession is used to create a new session for this command
func (c *ExecCommand) createSession() (string, error) { func (c *ExecCommand) createSession() (string, error) {
if c.conf.foreignDC {
id, err := c.createSessionForeign()
if err == nil {
c.stopCh = make(chan struct{})
go c.renewSession(id, c.stopCh)
}
return id, err
}
return c.createSessionLocal()
}
// createSessionLocal is used to create a new session in a local datacenter
// This is simpler since we can use the local agent to create the session.
func (c *ExecCommand) createSessionLocal() (string, error) {
session := c.client.Session() session := c.client.Session()
se := consulapi.SessionEntry{ se := consulapi.SessionEntry{
Name: "Remote Exec", Name: "Remote Exec",
Behavior: consulapi.SessionBehaviorDelete,
} }
id, _, err := session.Create(&se, nil) id, _, err := session.Create(&se, nil)
return id, err return id, err
} }
// createSessionLocal is used to create a new session in a foreign datacenter
// This is more complex since the local agent cannot be used to create
// a session, and we must associate with a node in the remote datacenter.
func (c *ExecCommand) createSessionForeign() (string, error) {
// Look for a remote node to bind to
health := c.client.Health()
services, _, err := health.Service("consul", "", true, nil)
if err != nil {
return "", fmt.Errorf("Failed to find Consul server in remote datacenter: %v", err)
}
if len(services) == 0 {
return "", fmt.Errorf("Failed to find Consul server in remote datacenter")
}
node := services[0].Node.Node
if c.conf.verbose {
c.Ui.Info(fmt.Sprintf("Binding session to remote node %s@%s",
node, c.conf.datacenter))
}
session := c.client.Session()
se := consulapi.SessionEntry{
Name: fmt.Sprintf("Remote Exec via %s@%s", c.conf.localNode, c.conf.localDC),
Node: node,
Checks: []string{},
Behavior: consulapi.SessionBehaviorDelete,
TTL: rExecForeignTTL,
}
id, _, err := session.CreateNoChecks(&se, nil)
return id, err
}
// renewSession is a long running routine that periodically renews
// the session TTL. This is used for foreign sessions where we depend
// on TTLs.
func (c *ExecCommand) renewSession(id string, stopCh chan struct{}) {
session := c.client.Session()
for {
select {
case <-time.After(rExecRenewInterval):
_, _, err := session.Renew(id, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Session renew failed: %v", err))
return
}
case <-stopCh:
return
}
}
}
// destroySession is used to destroy the associated session // destroySession is used to destroy the associated session
func (c *ExecCommand) destroySession() error { func (c *ExecCommand) destroySession() error {
// Stop the session renew if any
if c.stopCh != nil {
close(c.stopCh)
c.stopCh = nil
}
// Destroy the session explicitly
session := c.client.Session() session := c.client.Session()
_, err := session.Destroy(c.sessionID, nil) _, err := session.Destroy(c.sessionID, nil)
return err return err

View File

@ -1,11 +1,13 @@
package command package command
import ( import (
"fmt"
"strings" "strings"
"testing" "testing"
"time" "time"
consulapi "github.com/hashicorp/consul/api" consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/agent"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
) )
@ -33,6 +35,43 @@ func TestExecCommandRun(t *testing.T) {
} }
} }
func TestExecCommandRun_CrossDC(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
a2 := testAgentWithConfig(t, func(c *agent.Config) {
c.Datacenter = "dc2"
})
defer a2.Shutdown()
// Join over the WAN
wanAddr := fmt.Sprintf("%s:%d", a1.config.BindAddr, a1.config.Ports.SerfWan)
n, err := a2.agent.JoinWAN([]string{wanAddr})
if err != nil {
t.Fatalf("err: %v", err)
}
if n != 1 {
t.Fatalf("bad %d", n)
}
waitForLeader(t, a1.httpAddr)
waitForLeader(t, a2.httpAddr)
ui := new(cli.MockUi)
c := &ExecCommand{Ui: ui}
args := []string{"-http-addr=" + a1.httpAddr,
"-wait=400ms", "-datacenter=dc2", "uptime"}
code := c.Run(args)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
if !strings.Contains(ui.OutputWriter.String(), "load") {
t.Fatalf("bad: %#v", ui.OutputWriter.String())
}
}
func waitForLeader(t *testing.T, httpAddr string) { func waitForLeader(t *testing.T, httpAddr string) {
client, err := HTTPClient(httpAddr) client, err := HTTPClient(httpAddr)
if err != nil { if err != nil {
@ -128,6 +167,60 @@ func TestExecCommand_Sessions(t *testing.T) {
} }
} }
func TestExecCommand_Sessions_Foreign(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
client, err := HTTPClient(a1.httpAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
ui := new(cli.MockUi)
c := &ExecCommand{
Ui: ui,
client: client,
}
c.conf.foreignDC = true
c.conf.localDC = "dc1"
c.conf.localNode = "foo"
var id string
testutil.WaitForResult(func() (bool, error) {
id, err = c.createSession()
if err != nil && strings.Contains(err.Error(), "Failed to find Consul server") {
err = nil
}
return id != "", err
}, func(err error) {
t.Fatalf("err: %v", err)
})
se, _, err := client.Session().Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if se == nil || se.Name != "Remote Exec via foo@dc1" {
t.Fatalf("bad: %v", se)
}
c.sessionID = id
err = c.destroySession()
if err != nil {
t.Fatalf("err: %v", err)
}
se, _, err = client.Session().Info(id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if se != nil {
t.Fatalf("bad: %v", se)
}
}
func TestExecCommand_UploadDestroy(t *testing.T) { func TestExecCommand_UploadDestroy(t *testing.T) {
a1 := testAgent(t) a1 := testAgent(t)
defer a1.Shutdown() defer a1.Shutdown()