From 8caa9e4c7e0a453843b6a32ce8ae462fc5e73f04 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 5 Jan 2016 16:40:35 -0800 Subject: [PATCH] Adds "try" support to locks and semaphores. --- api/lock.go | 14 +++- api/lock_test.go | 69 +++++++++++++++ api/semaphore.go | 26 ++++-- api/semaphore_test.go | 83 +++++++++++++++++++ command/lock.go | 52 ++++++++++-- command/lock_test.go | 83 ++++++++++++++++++- .../source/docs/commands/lock.html.markdown | 5 ++ 7 files changed, 314 insertions(+), 18 deletions(-) diff --git a/api/lock.go b/api/lock.go index c1f6edf822..f93241ab35 100644 --- a/api/lock.go +++ b/api/lock.go @@ -76,6 +76,8 @@ type LockOptions struct { SessionTTL string // Optional, defaults to DefaultLockSessionTTL MonitorRetries int // Optional, defaults to 0 which means no retries MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime + LockWaitTime time.Duration // Optional, defaults to DefaultLockWaitTime + LockTryOnce bool // Optional, defaults to false which means try forever } // LockKey returns a handle to a lock struct which can be used @@ -108,6 +110,9 @@ func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) { if opts.MonitorRetryTime == 0 { opts.MonitorRetryTime = DefaultMonitorRetryTime } + if opts.LockWaitTime == 0 { + opts.LockWaitTime = DefaultLockWaitTime + } l := &Lock{ c: c, opts: opts, @@ -158,9 +163,10 @@ func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) { // Setup the query options kv := l.c.KV() qOpts := &QueryOptions{ - WaitTime: DefaultLockWaitTime, + WaitTime: l.opts.LockWaitTime, } + attempts := 0 WAIT: // Check if we should quit select { @@ -169,6 +175,12 @@ WAIT: default: } + // See if we completed a one-shot. + if attempts > 0 && l.opts.LockTryOnce { + return nil, nil + } + attempts++ + // Look for an existing lock, blocking until not taken pair, meta, err := kv.Get(l.opts.Key, qOpts) if err != nil { diff --git a/api/lock_test.go b/api/lock_test.go index 7d8cbfec35..fe279d803c 100644 --- a/api/lock_test.go +++ b/api/lock_test.go @@ -488,3 +488,72 @@ func TestLock_MonitorRetry(t *testing.T) { t.Fatalf("should not be leader") } } + +func TestLock_OneShot(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + // Set up a lock as a one-shot. + opts := &LockOptions{ + Key: "test/lock", + LockTryOnce: true, + } + lock, err := c.LockOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure the default got set. + if lock.opts.LockWaitTime != DefaultLockWaitTime { + t.Fatalf("bad: %d", lock.opts.LockWaitTime) + } + + // Now set a custom time for the test. + opts.LockWaitTime = 250 * time.Millisecond + lock, err = c.LockOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + if lock.opts.LockWaitTime != 250*time.Millisecond { + t.Fatalf("bad: %d", lock.opts.LockWaitTime) + } + + // Should get the lock. + ch, err := lock.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch == nil { + t.Fatalf("not leader") + } + + // Now try with another session. + contender, err := c.LockOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + start := time.Now() + ch, err = contender.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch != nil { + t.Fatalf("should not be leader") + } + if diff := time.Now().Sub(start); diff > 2*contender.opts.LockWaitTime { + t.Fatalf("took too long: %9.6f", diff.Seconds()) + } + + // Unlock and then make sure the contender can get it. + if err := lock.Unlock(); err != nil { + t.Fatalf("err: %v", err) + } + ch, err = contender.Lock(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch == nil { + t.Fatalf("should be leader") + } +} diff --git a/api/semaphore.go b/api/semaphore.go index 4e70be2e70..cce1375813 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -63,12 +63,14 @@ type Semaphore struct { // SemaphoreOptions is used to parameterize the Semaphore type SemaphoreOptions struct { - Prefix string // Must be set and have write permissions - Limit int // Must be set, and be positive - Value []byte // Optional, value to associate with the contender entry - Session string // Optional, created if not specified - SessionName string // Optional, defaults to DefaultLockSessionName - SessionTTL string // Optional, defaults to DefaultLockSessionTTL + Prefix string // Must be set and have write permissions + Limit int // Must be set, and be positive + Value []byte // Optional, value to associate with the contender entry + Session string // Optional, created if not specified + SessionName string // Optional, defaults to DefaultLockSessionName + SessionTTL string // Optional, defaults to DefaultLockSessionTTL + SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime + SemaphoreTryOnce bool // Optional, defaults to false which means try forever } // semaphoreLock is written under the DefaultSemaphoreKey and @@ -115,6 +117,9 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) { return nil, fmt.Errorf("invalid SessionTTL: %v", err) } } + if opts.SemaphoreWaitTime == 0 { + opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime + } s := &Semaphore{ c: c, opts: opts, @@ -172,9 +177,10 @@ func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) { // Setup the query options qOpts := &QueryOptions{ - WaitTime: DefaultSemaphoreWaitTime, + WaitTime: s.opts.SemaphoreWaitTime, } + attempts := 0 WAIT: // Check if we should quit select { @@ -183,6 +189,12 @@ WAIT: default: } + // See if we completed a one-shot. + if attempts > 0 && s.opts.SemaphoreTryOnce { + return nil, nil + } + attempts++ + // Read the prefix pairs, meta, err := kv.List(s.opts.Prefix, qOpts) if err != nil { diff --git a/api/semaphore_test.go b/api/semaphore_test.go index 5e5e53588c..98371ea10d 100644 --- a/api/semaphore_test.go +++ b/api/semaphore_test.go @@ -311,3 +311,86 @@ func TestSemaphore_Conflict(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestSemaphore_OneShot(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + // Set up a semaphore as a one-shot. + opts := &SemaphoreOptions{ + Prefix: "test/sema/.lock", + Limit: 2, + SemaphoreTryOnce: true, + } + sema, err := c.SemaphoreOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure the default got set. + if sema.opts.SemaphoreWaitTime != DefaultSemaphoreWaitTime { + t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime) + } + + // Now set a custom time for the test. + opts.SemaphoreWaitTime = 250 * time.Millisecond + sema, err = c.SemaphoreOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + if sema.opts.SemaphoreWaitTime != 250*time.Millisecond { + t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime) + } + + // Should acquire the semaphore. + ch, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch == nil { + t.Fatalf("should have acquired the semaphore") + } + + // Try with another session. + another, err := c.SemaphoreOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + ch, err = another.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch == nil { + t.Fatalf("should have acquired the semaphore") + } + + // Try with a third one that shouldn't get it. + contender, err := c.SemaphoreOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + start := time.Now() + ch, err = contender.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch != nil { + t.Fatalf("should not have acquired the semaphore") + } + if diff := time.Now().Sub(start); diff > 2*contender.opts.SemaphoreWaitTime { + t.Fatalf("took too long: %9.6f", diff.Seconds()) + } + + // Give up a slot and make sure the third one can get it. + if err := another.Release(); err != nil { + t.Fatalf("err: %v", err) + } + ch, err = contender.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch == nil { + t.Fatalf("should have acquired the semaphore") + } +} diff --git a/command/lock.go b/command/lock.go index 987248be68..fcbf4bfe87 100644 --- a/command/lock.go +++ b/command/lock.go @@ -47,6 +47,7 @@ Usage: consul lock [options] prefix child... disrupted the child process will be sent a SIGTERM signal and given time to gracefully exit. After the grace period expires the process will be hard terminated. + For Consul agents on Windows, the child process is always hard terminated with a SIGKILL, since Windows has no POSIX compatible notion for SIGTERM. @@ -66,6 +67,8 @@ Options: -name="" Optional name to associate with lock session. -token="" ACL token to use. Defaults to that of agent. -pass-stdin Pass stdin to child process. + -try=duration Make a single attempt to acquire the lock, waiting + up to the given duration (eg. "15s"). -verbose Enables verbose output ` return strings.TrimSpace(helpText) @@ -76,12 +79,14 @@ func (c *LockCommand) Run(args []string) int { var name, token string var limit int var passStdin bool + var try string cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.IntVar(&limit, "n", 1, "") cmdFlags.StringVar(&name, "name", "", "") cmdFlags.StringVar(&token, "token", "", "") cmdFlags.BoolVar(&passStdin, "pass-stdin", false, "") + cmdFlags.StringVar(&try, "try", "", "") cmdFlags.BoolVar(&c.verbose, "verbose", false, "") httpAddr := HTTPAddrFlag(cmdFlags) if err := cmdFlags.Parse(args); err != nil { @@ -111,6 +116,25 @@ func (c *LockCommand) Run(args []string) int { name = fmt.Sprintf("Consul lock for '%s' at '%s'", script, prefix) } + // Verify the duration if given. + oneshot := false + var wait time.Duration + if try != "" { + var err error + wait, err = time.ParseDuration(try) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing duration for 'try' option: %s", err)) + return 1 + } + + if wait < 0 { + c.Ui.Error("Duration for 'try' option must be positive") + return 1 + } + + oneshot = true + } + // Create and test the HTTP client conf := api.DefaultConfig() conf.Address = *httpAddr @@ -129,9 +153,9 @@ func (c *LockCommand) Run(args []string) int { // Setup the lock or semaphore var lu *LockUnlock if limit == 1 { - lu, err = c.setupLock(client, prefix, name) + lu, err = c.setupLock(client, prefix, name, oneshot, wait) } else { - lu, err = c.setupSemaphore(client, limit, prefix, name) + lu, err = c.setupSemaphore(client, limit, prefix, name, oneshot, wait) } if err != nil { c.Ui.Error(fmt.Sprintf("Lock setup failed: %s", err)) @@ -145,7 +169,7 @@ func (c *LockCommand) Run(args []string) int { lockCh, err := lu.lockFn(c.ShutdownCh) if lockCh == nil { if err == nil { - c.Ui.Error("Shutdown triggered during lock acquisition") + c.Ui.Error("Shutdown triggered or timeout during lock acquisition") } else { c.Ui.Error(fmt.Sprintf("Lock acquisition failed: %s", err)) } @@ -214,9 +238,10 @@ RELEASE: return 0 } -// setupLock is used to setup a new Lock given the API client, -// the key prefix to operate on, and an optional session name. -func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockUnlock, error) { +// setupLock is used to setup a new Lock given the API client, the key prefix to +// operate on, and an optional session name. If oneshot is true then we will set +// up for a single attempt at acquisition, using the given wait time. +func (c *LockCommand) setupLock(client *api.Client, prefix, name string, oneshot bool, wait time.Duration) (*LockUnlock, error) { // Use the DefaultSemaphoreKey extension, this way if a lock and // semaphore are both used at the same prefix, we will get a conflict // which we can report to the user. @@ -228,6 +253,10 @@ func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockU Key: key, SessionName: name, } + if oneshot { + opts.LockTryOnce = true + opts.LockWaitTime = wait + } l, err := client.LockOpts(&opts) if err != nil { return nil, err @@ -241,9 +270,10 @@ func (c *LockCommand) setupLock(client *api.Client, prefix, name string) (*LockU return lu, nil } -// setupSemaphore is used to setup a new Semaphore given the -// API client, key prefix, session name, and slot holder limit. -func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string) (*LockUnlock, error) { +// setupSemaphore is used to setup a new Semaphore given the API client, key +// prefix, session name, and slot holder limit. If oneshot is true then we will +// set up for a single attempt at acquisition, using the given wait time. +func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name string, oneshot bool, wait time.Duration) (*LockUnlock, error) { if c.verbose { c.Ui.Info(fmt.Sprintf("Setting up semaphore (limit %d) at prefix: %s", limit, prefix)) } @@ -252,6 +282,10 @@ func (c *LockCommand) setupSemaphore(client *api.Client, limit int, prefix, name Limit: limit, SessionName: name, } + if oneshot { + opts.SemaphoreTryOnce = true + opts.SemaphoreWaitTime = wait + } s, err := client.SemaphoreOpts(&opts) if err != nil { return nil, err diff --git a/command/lock_test.go b/command/lock_test.go index df3f029867..479ad7d5b8 100644 --- a/command/lock_test.go +++ b/command/lock_test.go @@ -4,6 +4,8 @@ import ( "fmt" "io/ioutil" "path/filepath" + "strings" + "sync" "testing" "github.com/mitchellh/cli" @@ -13,7 +15,20 @@ func TestLockCommand_implements(t *testing.T) { var _ cli.Command = &LockCommand{} } -func TestLockCommandRun(t *testing.T) { +func TestLockCommand_BadArgs(t *testing.T) { + ui := new(cli.MockUi) + c := &LockCommand{Ui: ui} + + if code := c.Run([]string{"-try=blah"}); code != 1 { + t.Fatalf("expected return code 1, got %d", code) + } + + if code := c.Run([]string{"-try=-10s"}); code != 1 { + t.Fatalf("expected return code 1, got %d", code) + } +} + +func TestLockCommand_Run(t *testing.T) { a1 := testAgent(t) defer a1.Shutdown() waitForLeader(t, a1.httpAddr) @@ -35,3 +50,69 @@ func TestLockCommandRun(t *testing.T) { t.Fatalf("err: %v", err) } } + +func runTry(t *testing.T, n int) { + a1 := testAgent(t) + defer a1.Shutdown() + waitForLeader(t, a1.httpAddr) + + // Define a long-running command. + nArg := fmt.Sprintf("-n=%d", n) + args := []string{"-http-addr=" + a1.httpAddr, nArg, "-try=250ms", "test/prefix", "sleep 2"} + + // Run several commands at once. + var wg sync.WaitGroup + locked := make([]bool, n+1) + tried := make([]bool, n+1) + for i := 0; i < n+1; i++ { + wg.Add(1) + go func(index int) { + ui := new(cli.MockUi) + c := &LockCommand{Ui: ui} + + code := c.Run(append([]string{"-try=250ms"}, args...)) + if code == 0 { + locked[index] = true + } else { + reason := ui.ErrorWriter.String() + if !strings.Contains(reason, "Shutdown triggered or timeout during lock acquisition") { + t.Fatalf("bad reason: %s", reason) + } + tried[index] = true + } + wg.Done() + }(i) + } + wg.Wait() + + // Tally up the outcomes. + totalLocked := 0 + totalTried := 0 + for i := 0; i < n+1; i++ { + if locked[i] == tried[i] { + t.Fatalf("command %d didn't lock or try, or did both", i+1) + } + if locked[i] { + totalLocked++ + } + if tried[i] { + totalTried++ + } + } + + // We can't check exact counts because sometimes the try attempts may + // fail because they get woken up but need to do another try, but we + // should get one of each outcome. + if totalLocked == 0 || totalTried == 0 { + t.Fatalf("unexpected outcome: locked=%d, tried=%d", totalLocked, totalTried) + } +} + +func TestLockCommand_Try_Lock(t *testing.T) { + runTry(t, 1) +} + +func TestLockCommand_Try_Semaphore(t *testing.T) { + runTry(t, 2) + runTry(t, 3) +} diff --git a/website/source/docs/commands/lock.html.markdown b/website/source/docs/commands/lock.html.markdown index 5073ce22fd..a5b63bf728 100644 --- a/website/source/docs/commands/lock.html.markdown +++ b/website/source/docs/commands/lock.html.markdown @@ -62,5 +62,10 @@ The list of available flags are: * `-pass-stdin` - Pass stdin to child process. +* `-try` - Make a single attempt to acquire the lock, waiting up to the given + duration supplied as the argument. The duration is a decimal number, with + unit suffix, such as "500ms". Valid time units are "ns", "us" (or "µs"), "ms", + "s", "m", "h". + * `-verbose` - Enables verbose output.