Adds monitor retries to the consul lock command.

This commit is contained in:
James Phillips 2016-01-05 17:59:58 -08:00
parent ca08ba3aee
commit 4afeddacc8
3 changed files with 177 additions and 75 deletions

View File

@ -69,17 +69,29 @@ Options:
-pass-stdin Pass stdin to child process. -pass-stdin Pass stdin to child process.
-try=duration Make a single attempt to acquire the lock, waiting -try=duration Make a single attempt to acquire the lock, waiting
up to the given duration (eg. "15s"). up to the given duration (eg. "15s").
-monitor-retry=n Retry up to n times if Consul returns a 500 error
while monitoring the lock. This allows riding out brief
periods of unavailability without causing leader
elections, but increases the amount of time required
to detect a lost lock in some cases. Defaults to 0.
-verbose Enables verbose output -verbose Enables verbose output
` `
return strings.TrimSpace(helpText) return strings.TrimSpace(helpText)
} }
func (c *LockCommand) Run(args []string) int { func (c *LockCommand) Run(args []string) int {
var lu *LockUnlock
return c.run(args, &lu)
}
// run exposes the underlying lock for testing.
func (c *LockCommand) run(args []string, lu **LockUnlock) int {
var childDone chan struct{} var childDone chan struct{}
var name, token string var name, token string
var limit int var limit int
var passStdin bool var passStdin bool
var try string var try string
var retry int
cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.IntVar(&limit, "n", 1, "") cmdFlags.IntVar(&limit, "n", 1, "")
@ -87,6 +99,7 @@ func (c *LockCommand) Run(args []string) int {
cmdFlags.StringVar(&token, "token", "", "") cmdFlags.StringVar(&token, "token", "", "")
cmdFlags.BoolVar(&passStdin, "pass-stdin", false, "") cmdFlags.BoolVar(&passStdin, "pass-stdin", false, "")
cmdFlags.StringVar(&try, "try", "", "") cmdFlags.StringVar(&try, "try", "", "")
cmdFlags.IntVar(&retry, "monitor-retry", 0, "")
cmdFlags.BoolVar(&c.verbose, "verbose", false, "") cmdFlags.BoolVar(&c.verbose, "verbose", false, "")
httpAddr := HTTPAddrFlag(cmdFlags) httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil { if err := cmdFlags.Parse(args); err != nil {
@ -135,6 +148,12 @@ func (c *LockCommand) Run(args []string) int {
oneshot = true oneshot = true
} }
// Check the retry parameter
if retry < 0 {
c.Ui.Error("Number for 'monitor-retry' must be >= 0")
return 1
}
// Create and test the HTTP client // Create and test the HTTP client
conf := api.DefaultConfig() conf := api.DefaultConfig()
conf.Address = *httpAddr conf.Address = *httpAddr
@ -151,11 +170,10 @@ func (c *LockCommand) Run(args []string) int {
} }
// Setup the lock or semaphore // Setup the lock or semaphore
var lu *LockUnlock
if limit == 1 { if limit == 1 {
lu, err = c.setupLock(client, prefix, name, oneshot, wait) *lu, err = c.setupLock(client, prefix, name, oneshot, wait, retry)
} else { } else {
lu, err = c.setupSemaphore(client, limit, prefix, name, oneshot, wait) *lu, err = c.setupSemaphore(client, limit, prefix, name, oneshot, wait, retry)
} }
if err != nil { if err != nil {
c.Ui.Error(fmt.Sprintf("Lock setup failed: %s", err)) c.Ui.Error(fmt.Sprintf("Lock setup failed: %s", err))
@ -166,7 +184,7 @@ func (c *LockCommand) Run(args []string) int {
if c.verbose { if c.verbose {
c.Ui.Info("Attempting lock acquisition") c.Ui.Info("Attempting lock acquisition")
} }
lockCh, err := lu.lockFn(c.ShutdownCh) lockCh, err := (*lu).lockFn(c.ShutdownCh)
if lockCh == nil { if lockCh == nil {
if err == nil { if err == nil {
c.Ui.Error("Shutdown triggered or timeout during lock acquisition") c.Ui.Error("Shutdown triggered or timeout during lock acquisition")
@ -219,14 +237,14 @@ func (c *LockCommand) Run(args []string) int {
RELEASE: RELEASE:
// Release the lock before termination // Release the lock before termination
if err := lu.unlockFn(); err != nil { if err := (*lu).unlockFn(); err != nil {
c.Ui.Error(fmt.Sprintf("Lock release failed: %s", err)) c.Ui.Error(fmt.Sprintf("Lock release failed: %s", err))
return 1 return 1
} }
// Cleanup the lock if no longer in use // Cleanup the lock if no longer in use
if err := lu.cleanupFn(); err != nil { if err := (*lu).cleanupFn(); err != nil {
if err != lu.inUseErr { if err != (*lu).inUseErr {
c.Ui.Error(fmt.Sprintf("Lock cleanup failed: %s", err)) c.Ui.Error(fmt.Sprintf("Lock cleanup failed: %s", err))
return 1 return 1
} else if c.verbose { } else if c.verbose {
@ -240,8 +258,11 @@ RELEASE:
// setupLock is used to setup a new Lock given the API client, the key prefix to // setupLock is used to setup a new Lock given the API client, the key prefix to
// operate on, and an optional session name. If oneshot is true then we will set // operate on, and an optional session name. If oneshot is true then we will set
// up for a single attempt at acquisition, using the given wait time. // up for a single attempt at acquisition, using the given wait time. The retry
func (c *LockCommand) setupLock(client *api.Client, prefix, name string, oneshot bool, wait time.Duration) (*LockUnlock, error) { // parameter sets how many 500 errors the lock monitor will tolerate before
// giving up the lock.
func (c *LockCommand) setupLock(client *api.Client, prefix, name string,
oneshot bool, wait time.Duration, retry int) (*LockUnlock, error) {
// Use the DefaultSemaphoreKey extension, this way if a lock and // Use the DefaultSemaphoreKey extension, this way if a lock and
// semaphore are both used at the same prefix, we will get a conflict // semaphore are both used at the same prefix, we will get a conflict
// which we can report to the user. // which we can report to the user.
@ -250,8 +271,9 @@ func (c *LockCommand) setupLock(client *api.Client, prefix, name string, oneshot
c.Ui.Info(fmt.Sprintf("Setting up lock at path: %s", key)) c.Ui.Info(fmt.Sprintf("Setting up lock at path: %s", key))
} }
opts := api.LockOptions{ opts := api.LockOptions{
Key: key, Key: key,
SessionName: name, SessionName: name,
MonitorRetries: retry,
} }
if oneshot { if oneshot {
opts.LockTryOnce = true opts.LockTryOnce = true
@ -266,21 +288,26 @@ func (c *LockCommand) setupLock(client *api.Client, prefix, name string, oneshot
unlockFn: l.Unlock, unlockFn: l.Unlock,
cleanupFn: l.Destroy, cleanupFn: l.Destroy,
inUseErr: api.ErrLockInUse, inUseErr: api.ErrLockInUse,
rawOpts: &opts,
} }
return lu, nil return lu, nil
} }
// setupSemaphore is used to setup a new Semaphore given the API client, key // setupSemaphore is used to setup a new Semaphore given the API client, key
// prefix, session name, and slot holder limit. If oneshot is true then we will // prefix, session name, and slot holder limit. If oneshot is true then we will
// set up for a single attempt at acquisition, using the given wait time. // set up for a single attempt at acquisition, using the given wait time. The
func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string, oneshot bool, wait time.Duration) (*LockUnlock, error) { // retry parameter sets how many 500 errors the lock monitor will tolerate
// before giving up the semaphore.
func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string,
oneshot bool, wait time.Duration, retry int) (*LockUnlock, error) {
if c.verbose { if c.verbose {
c.Ui.Info(fmt.Sprintf("Setting up semaphore (limit %d) at prefix: %s", limit, prefix)) c.Ui.Info(fmt.Sprintf("Setting up semaphore (limit %d) at prefix: %s", limit, prefix))
} }
opts := api.SemaphoreOptions{ opts := api.SemaphoreOptions{
Prefix: prefix, Prefix: prefix,
Limit: limit, Limit: limit,
SessionName: name, SessionName: name,
MonitorRetries: retry,
} }
if oneshot { if oneshot {
opts.SemaphoreTryOnce = true opts.SemaphoreTryOnce = true
@ -295,6 +322,7 @@ func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name
unlockFn: s.Release, unlockFn: s.Release,
cleanupFn: s.Destroy, cleanupFn: s.Destroy,
inUseErr: api.ErrSemaphoreInUse, inUseErr: api.ErrSemaphoreInUse,
rawOpts: &opts,
} }
return lu, nil return lu, nil
} }
@ -408,4 +436,5 @@ type LockUnlock struct {
unlockFn func() error unlockFn func() error
cleanupFn func() error cleanupFn func() error
inUseErr error inUseErr error
rawOpts interface{}
} }

View File

@ -5,9 +5,10 @@ import (
"io/ioutil" "io/ioutil"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"testing" "testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
) )
@ -15,17 +16,22 @@ func TestLockCommand_implements(t *testing.T) {
var _ cli.Command = &LockCommand{} var _ cli.Command = &LockCommand{}
} }
func TestLockCommand_BadArgs(t *testing.T) { func argFail(t *testing.T, args []string, expected string) {
ui := new(cli.MockUi) ui := new(cli.MockUi)
c := &LockCommand{Ui: ui} c := &LockCommand{Ui: ui}
if code := c.Run(args); code != 1 {
if code := c.Run([]string{"-try=blah"}); code != 1 {
t.Fatalf("expected return code 1, got %d", code) t.Fatalf("expected return code 1, got %d", code)
} }
if reason := ui.ErrorWriter.String(); !strings.Contains(reason, expected) {
if code := c.Run([]string{"-try=-10s"}); code != 1 { t.Fatalf("bad reason: got='%s', expected='%s'", reason, expected)
t.Fatalf("expected return code 1, got %d", code)
} }
}
func TestLockCommand_BadArgs(t *testing.T) {
argFail(t, []string{"-try=blah", "test/prefix", "date"}, "parsing duration")
argFail(t, []string{"-try=-10s", "test/prefix", "date"}, "must be positive")
argFail(t, []string{"-monitor-retry=-5", "test/prefix", "date"}, "must be >= 0")
} }
func TestLockCommand_Run(t *testing.T) { func TestLockCommand_Run(t *testing.T) {
@ -51,68 +57,130 @@ func TestLockCommand_Run(t *testing.T) {
} }
} }
func runTry(t *testing.T, n int) { func TestLockCommand_Try_Lock(t *testing.T) {
a1 := testAgent(t) a1 := testAgent(t)
defer a1.Shutdown() defer a1.Shutdown()
waitForLeader(t, a1.httpAddr) waitForLeader(t, a1.httpAddr)
// Define a long-running command. ui := new(cli.MockUi)
nArg := fmt.Sprintf("-n=%d", n) c := &LockCommand{Ui: ui}
args := []string{"-http-addr=" + a1.httpAddr, nArg, "-try=250ms", "test/prefix", "sleep 2"} filePath := filepath.Join(a1.dir, "test_touch")
touchCmd := fmt.Sprintf("touch '%s'", filePath)
args := []string{"-http-addr=" + a1.httpAddr, "-try=10s", "test/prefix", touchCmd}
// Run several commands at once. // Run the command.
var wg sync.WaitGroup var lu *LockUnlock
locked := make([]bool, n+1) code := c.run(args, &lu)
tried := make([]bool, n+1) if code != 0 {
for i := 0; i < n+1; i++ { t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
wg.Add(1)
go func(index int) {
ui := new(cli.MockUi)
c := &LockCommand{Ui: ui}
code := c.Run(append([]string{"-try=250ms"}, args...))
if code == 0 {
locked[index] = true
} else {
reason := ui.ErrorWriter.String()
if !strings.Contains(reason, "Shutdown triggered or timeout during lock acquisition") {
t.Fatalf("bad reason: %s", reason)
}
tried[index] = true
}
wg.Done()
}(i)
} }
wg.Wait() _, err := ioutil.ReadFile(filePath)
if err != nil {
// Tally up the outcomes. t.Fatalf("err: %v", err)
totalLocked := 0
totalTried := 0
for i := 0; i < n+1; i++ {
if locked[i] == tried[i] {
t.Fatalf("command %d didn't lock or try, or did both", i+1)
}
if locked[i] {
totalLocked++
}
if tried[i] {
totalTried++
}
} }
// We can't check exact counts because sometimes the try attempts may // Make sure the try options were set correctly.
// fail because they get woken up but need to do another try, but we opts, ok := lu.rawOpts.(*api.LockOptions)
// should get one of each outcome. if !ok {
if totalLocked == 0 || totalTried == 0 { t.Fatalf("bad type")
t.Fatalf("unexpected outcome: locked=%d, tried=%d", totalLocked, totalTried) }
if !opts.LockTryOnce || opts.LockWaitTime != 10*time.Second {
t.Fatalf("bad: %#v", opts)
} }
}
func TestLockCommand_Try_Lock(t *testing.T) {
runTry(t, 1)
} }
func TestLockCommand_Try_Semaphore(t *testing.T) { func TestLockCommand_Try_Semaphore(t *testing.T) {
runTry(t, 2) a1 := testAgent(t)
runTry(t, 3) defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := &LockCommand{Ui: ui}
filePath := filepath.Join(a1.dir, "test_touch")
touchCmd := fmt.Sprintf("touch '%s'", filePath)
args := []string{"-http-addr=" + a1.httpAddr, "-n=3", "-try=10s", "test/prefix", touchCmd}
// Run the command.
var lu *LockUnlock
code := c.run(args, &lu)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
_, err := ioutil.ReadFile(filePath)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the try options were set correctly.
opts, ok := lu.rawOpts.(*api.SemaphoreOptions)
if !ok {
t.Fatalf("bad type")
}
if !opts.SemaphoreTryOnce || opts.SemaphoreWaitTime != 10*time.Second {
t.Fatalf("bad: %#v", opts)
}
}
func TestLockCommand_MonitorRetry_Lock(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := &LockCommand{Ui: ui}
filePath := filepath.Join(a1.dir, "test_touch")
touchCmd := fmt.Sprintf("touch '%s'", filePath)
args := []string{"-http-addr=" + a1.httpAddr, "-monitor-retry=3", "test/prefix", touchCmd}
// Run the command.
var lu *LockUnlock
code := c.run(args, &lu)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
_, err := ioutil.ReadFile(filePath)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the monitor options were set correctly.
opts, ok := lu.rawOpts.(*api.LockOptions)
if !ok {
t.Fatalf("bad type")
}
if opts.MonitorRetries != 3 {
t.Fatalf("bad: %d", opts.MonitorRetries)
}
}
func TestLockCommand_MonitorRetry_Semaphore(t *testing.T) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
ui := new(cli.MockUi)
c := &LockCommand{Ui: ui}
filePath := filepath.Join(a1.dir, "test_touch")
touchCmd := fmt.Sprintf("touch '%s'", filePath)
args := []string{"-http-addr=" + a1.httpAddr, "-n=3", "-monitor-retry=3", "test/prefix", touchCmd}
// Run the command.
var lu *LockUnlock
code := c.run(args, &lu)
if code != 0 {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
_, err := ioutil.ReadFile(filePath)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the monitor options were set correctly.
opts, ok := lu.rawOpts.(*api.SemaphoreOptions)
if !ok {
t.Fatalf("bad type")
}
if opts.MonitorRetries != 3 {
t.Fatalf("bad: %d", opts.MonitorRetries)
}
} }

View File

@ -67,5 +67,10 @@ The list of available flags are:
unit suffix, such as "500ms". Valid time units are "ns", "us" (or "µs"), "ms", unit suffix, such as "500ms". Valid time units are "ns", "us" (or "µs"), "ms",
"s", "m", "h". "s", "m", "h".
* `-monitor-retry` - Retry up to this number of times if Consul returns a 500 error
while monitoring the lock. This allows riding out brief periods of unavailability
without causing leader elections, but increases the amount of time required
to detect a lost lock in some cases. Defaults to 0.
* `-verbose` - Enables verbose output. * `-verbose` - Enables verbose output.