Adds "try" support to locks and semaphores.

This commit is contained in:
James Phillips 2016-01-05 16:40:35 -08:00
parent 901261ab30
commit 8caa9e4c7e
7 changed files with 314 additions and 18 deletions

View File

@ -76,6 +76,8 @@ type LockOptions struct {
SessionTTL string // Optional, defaults to DefaultLockSessionTTL SessionTTL string // Optional, defaults to DefaultLockSessionTTL
MonitorRetries int // Optional, defaults to 0 which means no retries MonitorRetries int // Optional, defaults to 0 which means no retries
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
LockWaitTime time.Duration // Optional, defaults to DefaultLockWaitTime
LockTryOnce bool // Optional, defaults to false which means try forever
} }
// LockKey returns a handle to a lock struct which can be used // LockKey returns a handle to a lock struct which can be used
@ -108,6 +110,9 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
if opts.MonitorRetryTime == 0 { if opts.MonitorRetryTime == 0 {
opts.MonitorRetryTime = DefaultMonitorRetryTime opts.MonitorRetryTime = DefaultMonitorRetryTime
} }
if opts.LockWaitTime == 0 {
opts.LockWaitTime = DefaultLockWaitTime
}
l := &Lock{ l := &Lock{
c: c, c: c,
opts: opts, opts: opts,
@ -158,9 +163,10 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Setup the query options // Setup the query options
kv := l.c.KV() kv := l.c.KV()
qOpts := &QueryOptions{ qOpts := &QueryOptions{
WaitTime: DefaultLockWaitTime, WaitTime: l.opts.LockWaitTime,
} }
attempts := 0
WAIT: WAIT:
// Check if we should quit // Check if we should quit
select { select {
@ -169,6 +175,12 @@ WAIT:
default: default:
} }
// See if we completed a one-shot.
if attempts > 0 && l.opts.LockTryOnce {
return nil, nil
}
attempts++
// Look for an existing lock, blocking until not taken // Look for an existing lock, blocking until not taken
pair, meta, err := kv.Get(l.opts.Key, qOpts) pair, meta, err := kv.Get(l.opts.Key, qOpts)
if err != nil { if err != nil {

View File

@ -488,3 +488,72 @@ func TestLock_MonitorRetry(t *testing.T) {
t.Fatalf("should not be leader") t.Fatalf("should not be leader")
} }
} }
func TestLock_OneShot(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
// Set up a lock as a one-shot.
opts := &LockOptions{
Key: "test/lock",
LockTryOnce: true,
}
lock, err := c.LockOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the default got set.
if lock.opts.LockWaitTime != DefaultLockWaitTime {
t.Fatalf("bad: %d", lock.opts.LockWaitTime)
}
// Now set a custom time for the test.
opts.LockWaitTime = 250 * time.Millisecond
lock, err = c.LockOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
if lock.opts.LockWaitTime != 250*time.Millisecond {
t.Fatalf("bad: %d", lock.opts.LockWaitTime)
}
// Should get the lock.
ch, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch == nil {
t.Fatalf("not leader")
}
// Now try with another session.
contender, err := c.LockOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
start := time.Now()
ch, err = contender.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch != nil {
t.Fatalf("should not be leader")
}
if diff := time.Now().Sub(start); diff > 2*contender.opts.LockWaitTime {
t.Fatalf("took too long: %9.6f", diff.Seconds())
}
// Unlock and then make sure the contender can get it.
if err := lock.Unlock(); err != nil {
t.Fatalf("err: %v", err)
}
ch, err = contender.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch == nil {
t.Fatalf("should be leader")
}
}

View File

@ -63,12 +63,14 @@ type Semaphore struct {
// SemaphoreOptions is used to parameterize the Semaphore // SemaphoreOptions is used to parameterize the Semaphore
type SemaphoreOptions struct { type SemaphoreOptions struct {
Prefix string // Must be set and have write permissions Prefix string // Must be set and have write permissions
Limit int // Must be set, and be positive Limit int // Must be set, and be positive
Value []byte // Optional, value to associate with the contender entry Value []byte // Optional, value to associate with the contender entry
Session string // Optional, created if not specified Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL SessionTTL string // Optional, defaults to DefaultLockSessionTTL
SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime
SemaphoreTryOnce bool // Optional, defaults to false which means try forever
} }
// semaphoreLock is written under the DefaultSemaphoreKey and // semaphoreLock is written under the DefaultSemaphoreKey and
@ -115,6 +117,9 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
return nil, fmt.Errorf("invalid SessionTTL: %v", err) return nil, fmt.Errorf("invalid SessionTTL: %v", err)
} }
} }
if opts.SemaphoreWaitTime == 0 {
opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime
}
s := &Semaphore{ s := &Semaphore{
c: c, c: c,
opts: opts, opts: opts,
@ -172,9 +177,10 @@ func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
// Setup the query options // Setup the query options
qOpts := &QueryOptions{ qOpts := &QueryOptions{
WaitTime: DefaultSemaphoreWaitTime, WaitTime: s.opts.SemaphoreWaitTime,
} }
attempts := 0
WAIT: WAIT:
// Check if we should quit // Check if we should quit
select { select {
@ -183,6 +189,12 @@ WAIT:
default: default:
} }
// See if we completed a one-shot.
if attempts > 0 && s.opts.SemaphoreTryOnce {
return nil, nil
}
attempts++
// Read the prefix // Read the prefix
pairs, meta, err := kv.List(s.opts.Prefix, qOpts) pairs, meta, err := kv.List(s.opts.Prefix, qOpts)
if err != nil { if err != nil {

View File

@ -311,3 +311,86 @@ func TestSemaphore_Conflict(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }
func TestSemaphore_OneShot(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
// Set up a semaphore as a one-shot.
opts := &SemaphoreOptions{
Prefix: "test/sema/.lock",
Limit: 2,
SemaphoreTryOnce: true,
}
sema, err := c.SemaphoreOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the default got set.
if sema.opts.SemaphoreWaitTime != DefaultSemaphoreWaitTime {
t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
}
// Now set a custom time for the test.
opts.SemaphoreWaitTime = 250 * time.Millisecond
sema, err = c.SemaphoreOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
if sema.opts.SemaphoreWaitTime != 250*time.Millisecond {
t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
}
// Should acquire the semaphore.
ch, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch == nil {
t.Fatalf("should have acquired the semaphore")
}
// Try with another session.
another, err := c.SemaphoreOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
ch, err = another.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch == nil {
t.Fatalf("should have acquired the semaphore")
}
// Try with a third one that shouldn't get it.
contender, err := c.SemaphoreOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
start := time.Now()
ch, err = contender.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch != nil {
t.Fatalf("should not have acquired the semaphore")
}
if diff := time.Now().Sub(start); diff > 2*contender.opts.SemaphoreWaitTime {
t.Fatalf("took too long: %9.6f", diff.Seconds())
}
// Give up a slot and make sure the third one can get it.
if err := another.Release(); err != nil {
t.Fatalf("err: %v", err)
}
ch, err = contender.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch == nil {
t.Fatalf("should have acquired the semaphore")
}
}

View File

@ -47,6 +47,7 @@ Usage: consul lock [options] prefix child...
disrupted the child process will be sent a SIGTERM signal and given disrupted the child process will be sent a SIGTERM signal and given
time to gracefully exit. After the grace period expires the process time to gracefully exit. After the grace period expires the process
will be hard terminated. will be hard terminated.
For Consul agents on Windows, the child process is always hard For Consul agents on Windows, the child process is always hard
terminated with a SIGKILL, since Windows has no POSIX compatible terminated with a SIGKILL, since Windows has no POSIX compatible
notion for SIGTERM. notion for SIGTERM.
@ -66,6 +67,8 @@ Options:
-name="" Optional name to associate with lock session. -name="" Optional name to associate with lock session.
-token="" ACL token to use. Defaults to that of agent. -token="" ACL token to use. Defaults to that of agent.
-pass-stdin Pass stdin to child process. -pass-stdin Pass stdin to child process.
-try=duration Make a single attempt to acquire the lock, waiting
up to the given duration (eg. "15s").
-verbose Enables verbose output -verbose Enables verbose output
` `
return strings.TrimSpace(helpText) return strings.TrimSpace(helpText)
@ -76,12 +79,14 @@ func (c *LockCommand) Run(args []string) int {
var name, token string var name, token string
var limit int var limit int
var passStdin bool var passStdin bool
var try string
cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.IntVar(&limit, "n", 1, "") cmdFlags.IntVar(&limit, "n", 1, "")
cmdFlags.StringVar(&name, "name", "", "") cmdFlags.StringVar(&name, "name", "", "")
cmdFlags.StringVar(&token, "token", "", "") cmdFlags.StringVar(&token, "token", "", "")
cmdFlags.BoolVar(&passStdin, "pass-stdin", false, "") cmdFlags.BoolVar(&passStdin, "pass-stdin", false, "")
cmdFlags.StringVar(&try, "try", "", "")
cmdFlags.BoolVar(&c.verbose, "verbose", false, "") cmdFlags.BoolVar(&c.verbose, "verbose", false, "")
httpAddr := HTTPAddrFlag(cmdFlags) httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil { if err := cmdFlags.Parse(args); err != nil {
@ -111,6 +116,25 @@ func (c *LockCommand) Run(args []string) int {
name = fmt.Sprintf("Consul lock for '%s' at '%s'", script, prefix) name = fmt.Sprintf("Consul lock for '%s' at '%s'", script, prefix)
} }
// Verify the duration if given.
oneshot := false
var wait time.Duration
if try != "" {
var err error
wait, err = time.ParseDuration(try)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing duration for 'try' option: %s", err))
return 1
}
if wait < 0 {
c.Ui.Error("Duration for 'try' option must be positive")
return 1
}
oneshot = true
}
// Create and test the HTTP client // Create and test the HTTP client
conf := api.DefaultConfig() conf := api.DefaultConfig()
conf.Address = *httpAddr conf.Address = *httpAddr
@ -129,9 +153,9 @@ func (c *LockCommand) Run(args []string) int {
// Setup the lock or semaphore // Setup the lock or semaphore
var lu *LockUnlock var lu *LockUnlock
if limit == 1 { if limit == 1 {
lu, err = c.setupLock(client, prefix, name) lu, err = c.setupLock(client, prefix, name, oneshot, wait)
} else { } else {
lu, err = c.setupSemaphore(client, limit, prefix, name) lu, err = c.setupSemaphore(client, limit, prefix, name, oneshot, wait)
} }
if err != nil { if err != nil {
c.Ui.Error(fmt.Sprintf("Lock setup failed: %s", err)) c.Ui.Error(fmt.Sprintf("Lock setup failed: %s", err))
@ -145,7 +169,7 @@ func (c *LockCommand) Run(args []string) int {
lockCh, err := lu.lockFn(c.ShutdownCh) lockCh, err := lu.lockFn(c.ShutdownCh)
if lockCh == nil { if lockCh == nil {
if err == nil { if err == nil {
c.Ui.Error("Shutdown triggered during lock acquisition") c.Ui.Error("Shutdown triggered or timeout during lock acquisition")
} else { } else {
c.Ui.Error(fmt.Sprintf("Lock acquisition failed: %s", err)) c.Ui.Error(fmt.Sprintf("Lock acquisition failed: %s", err))
} }
@ -214,9 +238,10 @@ RELEASE:
return 0 return 0
} }
// setupLock is used to setup a new Lock given the API client, // setupLock is used to setup a new Lock given the API client, the key prefix to
// the key prefix to operate on, and an optional session name. // operate on, and an optional session name. If oneshot is true then we will set
func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockUnlock, error) { // up for a single attempt at acquisition, using the given wait time.
func (c *LockCommand) setupLock(client *api.Client, prefix, name string, oneshot bool, wait time.Duration) (*LockUnlock, error) {
// Use the DefaultSemaphoreKey extension, this way if a lock and // Use the DefaultSemaphoreKey extension, this way if a lock and
// semaphore are both used at the same prefix, we will get a conflict // semaphore are both used at the same prefix, we will get a conflict
// which we can report to the user. // which we can report to the user.
@ -228,6 +253,10 @@ func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockU
Key: key, Key: key,
SessionName: name, SessionName: name,
} }
if oneshot {
opts.LockTryOnce = true
opts.LockWaitTime = wait
}
l, err := client.LockOpts(&opts) l, err := client.LockOpts(&opts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -241,9 +270,10 @@ func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockU
return lu, nil return lu, nil
} }
// setupSemaphore is used to setup a new Semaphore given the // setupSemaphore is used to setup a new Semaphore given the API client, key
// API client, key prefix, session name, and slot holder limit. // prefix, session name, and slot holder limit. If oneshot is true then we will
func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string) (*LockUnlock, error) { // set up for a single attempt at acquisition, using the given wait time.
func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string, oneshot bool, wait time.Duration) (*LockUnlock, error) {
if c.verbose { if c.verbose {
c.Ui.Info(fmt.Sprintf("Setting up semaphore (limit %d) at prefix: %s", limit, prefix)) c.Ui.Info(fmt.Sprintf("Setting up semaphore (limit %d) at prefix: %s", limit, prefix))
} }
@ -252,6 +282,10 @@ func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name
Limit: limit, Limit: limit,
SessionName: name, SessionName: name,
} }
if oneshot {
opts.SemaphoreTryOnce = true
opts.SemaphoreWaitTime = wait
}
s, err := client.SemaphoreOpts(&opts) s, err := client.SemaphoreOpts(&opts)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"path/filepath" "path/filepath"
"strings"
"sync"
"testing" "testing"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
@ -13,7 +15,20 @@ func TestLockCommand_implements(t *testing.T) {
var _ cli.Command = &LockCommand{} var _ cli.Command = &LockCommand{}
} }
func TestLockCommandRun(t *testing.T) { func TestLockCommand_BadArgs(t *testing.T) {
ui := new(cli.MockUi)
c := &LockCommand{Ui: ui}
if code := c.Run([]string{"-try=blah"}); code != 1 {
t.Fatalf("expected return code 1, got %d", code)
}
if code := c.Run([]string{"-try=-10s"}); code != 1 {
t.Fatalf("expected return code 1, got %d", code)
}
}
func TestLockCommand_Run(t *testing.T) {
a1 := testAgent(t) a1 := testAgent(t)
defer a1.Shutdown() defer a1.Shutdown()
waitForLeader(t, a1.httpAddr) waitForLeader(t, a1.httpAddr)
@ -35,3 +50,69 @@ func TestLockCommandRun(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }
func runTry(t *testing.T, n int) {
a1 := testAgent(t)
defer a1.Shutdown()
waitForLeader(t, a1.httpAddr)
// Define a long-running command.
nArg := fmt.Sprintf("-n=%d", n)
args := []string{"-http-addr=" + a1.httpAddr, nArg, "-try=250ms", "test/prefix", "sleep 2"}
// Run several commands at once.
var wg sync.WaitGroup
locked := make([]bool, n+1)
tried := make([]bool, n+1)
for i := 0; i < n+1; i++ {
wg.Add(1)
go func(index int) {
ui := new(cli.MockUi)
c := &LockCommand{Ui: ui}
code := c.Run(append([]string{"-try=250ms"}, args...))
if code == 0 {
locked[index] = true
} else {
reason := ui.ErrorWriter.String()
if !strings.Contains(reason, "Shutdown triggered or timeout during lock acquisition") {
t.Fatalf("bad reason: %s", reason)
}
tried[index] = true
}
wg.Done()
}(i)
}
wg.Wait()
// Tally up the outcomes.
totalLocked := 0
totalTried := 0
for i := 0; i < n+1; i++ {
if locked[i] == tried[i] {
t.Fatalf("command %d didn't lock or try, or did both", i+1)
}
if locked[i] {
totalLocked++
}
if tried[i] {
totalTried++
}
}
// We can't check exact counts because sometimes the try attempts may
// fail because they get woken up but need to do another try, but we
// should get one of each outcome.
if totalLocked == 0 || totalTried == 0 {
t.Fatalf("unexpected outcome: locked=%d, tried=%d", totalLocked, totalTried)
}
}
func TestLockCommand_Try_Lock(t *testing.T) {
runTry(t, 1)
}
func TestLockCommand_Try_Semaphore(t *testing.T) {
runTry(t, 2)
runTry(t, 3)
}

View File

@ -62,5 +62,10 @@ The list of available flags are:
* `-pass-stdin` - Pass stdin to child process. * `-pass-stdin` - Pass stdin to child process.
* `-try` - Make a single attempt to acquire the lock, waiting up to the given
duration supplied as the argument. The duration is a decimal number, with
unit suffix, such as "500ms". Valid time units are "ns", "us" (or "µs"), "ms",
"s", "m", "h".
* `-verbose` - Enables verbose output. * `-verbose` - Enables verbose output.