api: Adding Lock helpers for leader election

This commit is contained in:
Armon Dadgar 2015-01-09 17:35:17 -08:00
parent 8b320b852e
commit 854aef82b0
2 changed files with 464 additions and 0 deletions

280
api/lock.go Normal file
View File

@ -0,0 +1,280 @@
package api
import (
"fmt"
"sync"
"time"
)
const (
// DefaultLockSessionName is the Session Name we assign if none is provided
DefaultLockSessionName = "Consul API Lock"
// DefaultLockSessionTTL is the default session TTL if no Session is provided
// when creating a new Lock. This is used because we do not have another
// other check to depend upon.
DefaultLockSessionTTL = "15s"
// DefaultLockWaitTime is how long we block for at a time to check if lock
// acquisition is possible. This affects the minimum time it takes to cancel
// a Lock acquisition.
DefaultLockWaitTime = 15 * time.Second
// DefaultLockRetyTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultLockRetyTime = 5 * time.Second
)
var (
// ErrLockHeld is returned if we attempt to double lock
ErrLockHeld = fmt.Errorf("Lock already held")
// ErrLockNotHeld is returned if we attempt to unlock a lock
// that we do not hold.
ErrLockNotHeld = fmt.Errorf("Lock not held")
)
// Lock is used to implement client-side leader election. It is follows the
// algorithm as described here: https://consul.io/docs/guides/leader-election.html.
type Lock struct {
c *Client
opts *LockOptions
isHeld bool
sessionRenew chan struct{}
lockSession string
l sync.Mutex
}
// LockOptions is used to parameterize the Lock behavior.
type LockOptions struct {
Key string // Must be set and have write permissions
Value []byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
}
// LockKey returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func (c *Client) LockKey(key string) (*Lock, error) {
opts := &LockOptions{
Key: key,
}
return c.LockOpts(opts)
}
// LockOpts returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
if opts.SessionName == "" {
opts.SessionName = DefaultLockSessionName
}
if opts.SessionTTL == "" {
opts.SessionTTL = DefaultLockSessionTTL
} else {
if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
l := &Lock{
c: c,
opts: opts,
}
return l, nil
}
// Lock attempts to acquire the lock and blocks while doing so.
// Providing a non-nil stopCh can be used to abort the lock attempt.
// Returns a channel that is closed if our lock is lost or an error.
// This channel could be closed at any time due to session invalidation,
// communication errors, operator intervention, etc. It is NOT safe to
// assume that the lock is held until Unlock() unless the Session is specifically
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the lock being lost.
func (l *Lock) Lock(stopCh chan struct{}) (chan struct{}, error) {
// Hold the lock as we try to acquire
l.l.Lock()
defer l.l.Unlock()
// Check if we already hold the lock
if l.isHeld {
return nil, ErrLockHeld
}
// Check if we need to create a session first
l.lockSession = l.opts.Session
if l.lockSession == "" {
if s, err := l.createSession(); err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
} else {
l.sessionRenew = make(chan struct{})
l.lockSession = s
go l.renewSession(s, l.sessionRenew)
// If we fail to acquire the lock, cleanup the session
defer func() {
if !l.isHeld {
close(l.sessionRenew)
l.sessionRenew = nil
}
}()
}
}
// Setup the query options
kv := l.c.KV()
qOpts := &QueryOptions{
WaitTime: DefaultLockWaitTime,
}
WAIT:
// Check if we should quit
select {
case <-stopCh:
return nil, nil
default:
}
// Look for an existing lock, blocking until not taken
pair, meta, err := kv.Get(l.opts.Key, qOpts)
if err != nil {
return nil, fmt.Errorf("failed to read lock: %v", err)
}
if pair != nil && pair.Session != "" {
qOpts.WaitIndex = meta.LastIndex
goto WAIT
}
// Try to acquire the lock
lockEnt := l.lockEntry(l.lockSession)
locked, _, err := kv.Acquire(lockEnt, nil)
if err != nil {
return nil, fmt.Errorf("failed to acquire lock: %v", err)
}
// Handle the case of not getting the lock
if !locked {
select {
case <-time.After(DefaultLockRetyTime):
goto WAIT
case <-stopCh:
return nil, nil
}
}
// Watch to ensure we maintain leadership
leaderCh := make(chan struct{})
go l.monitorLock(l.lockSession, leaderCh)
// Set that we own the lock
l.isHeld = true
// Locked! All done
return leaderCh, nil
}
// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *Lock) Unlock() error {
// Hold the lock as we try to release
l.l.Lock()
defer l.l.Unlock()
// Ensure the lock is actually held
if !l.isHeld {
return ErrLockNotHeld
}
// Set that we no longwer own the lock
l.isHeld = false
// Stop the session renew
if l.sessionRenew != nil {
defer func() {
close(l.sessionRenew)
l.sessionRenew = nil
}()
}
// Get the lock entry, and clear the lock session
lockEnt := l.lockEntry(l.lockSession)
l.lockSession = ""
// Release the lock explicitly
kv := l.c.KV()
_, _, err := kv.Release(lockEnt, nil)
if err != nil {
return fmt.Errorf("failed to release lock: %v", err)
}
return nil
}
// createSession is used to create a new managed session
func (l *Lock) createSession() (string, error) {
session := l.c.Session()
se := &SessionEntry{
Name: l.opts.SessionName,
TTL: l.opts.SessionTTL,
}
id, _, err := session.Create(se, nil)
if err != nil {
return "", err
}
return id, nil
}
// lockEntry returns a formatted KVPair for the lock
func (l *Lock) lockEntry(session string) *KVPair {
return &KVPair{
Key: l.opts.Key,
Value: l.opts.Value,
Session: session,
}
}
// renewSession is a long running routine that maintians a session
// by doing a periodic Session renewal.
func (l *Lock) renewSession(id string, doneCh chan struct{}) {
session := l.c.Session()
ttl, _ := time.ParseDuration(l.opts.SessionTTL)
for {
select {
case <-time.After(ttl / 2):
entry, _, err := session.Renew(id, nil)
if err != nil || entry == nil {
return
}
// Handle the server updating the TTL
ttl, _ = time.ParseDuration(entry.TTL)
case <-doneCh:
// Attempt a session destroy
session.Destroy(id, nil)
return
}
}
}
// monitorLock is a long running routine to monitor a lock ownership
// It closes the stopCh if we lose our leadership.
func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
kv := l.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
pair, meta, err := kv.Get(l.opts.Key, opts)
if err != nil {
close(stopCh)
return
}
if pair != nil && pair.Session == session {
opts.WaitIndex = meta.LastIndex
goto WAIT
}
close(stopCh)
}

184
api/lock_test.go Normal file
View File

@ -0,0 +1,184 @@
package api
import (
"log"
"sync"
"testing"
"time"
)
func TestLock_LockUnlock(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Initial unlock should fail
err = lock.Unlock()
if err != ErrLockNotHeld {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
// Double lock should fail
_, err = lock.Lock(nil)
if err != ErrLockHeld {
t.Fatalf("err: %v", err)
}
// Should be leader
select {
case <-leaderCh:
t.Fatalf("should be leader")
default:
}
// Initial unlock should work
err = lock.Unlock()
if err != nil {
t.Fatalf("err: %v", err)
}
// Double unlock should fail
err = lock.Unlock()
if err != ErrLockNotHeld {
t.Fatalf("err: %v", err)
}
// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}
func TestLock_ForceInvalidate(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
go func() {
// Nuke the session, simulator an operator invalidation
// or a health check failure
session := c.Session()
session.Destroy(lock.lockSession, nil)
}()
// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}
func TestLock_DeleteKey(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
go func() {
// Nuke the key, simulate an operator intervention
kv := c.KV()
kv.Delete("test/lock", nil)
}()
// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}
func TestLock_Contend(t *testing.T) {
c, s := makeClient(t)
defer s.stop()
wg := &sync.WaitGroup{}
acquired := make([]bool, 3)
for idx := range acquired {
wg.Add(1)
go func(idx int) {
defer wg.Done()
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}
// Should work eventually, will contend
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
log.Printf("Contender %d acquired", idx)
// Set acquired and then leave
acquired[idx] = true
}(idx)
}
// Wait for termination
doneCh := make(chan struct{})
go func() {
wg.Wait()
close(doneCh)
}()
// Wait for everybody to get a turn
select {
case <-doneCh:
case <-time.After(3 * DefaultLockRetyTime):
t.Fatalf("timeout")
}
for idx, did := range acquired {
if !did {
t.Fatalf("contender %d never acquired", idx)
}
}
}