mirror of
https://github.com/status-im/consul.git
synced 2025-01-24 12:40:17 +00:00
Merge pull request #600 from hashicorp/f-api-semaphore
Adding Semaphore support to API
This commit is contained in:
commit
f2e1594814
@ -24,6 +24,7 @@ var consulConfig = `{
|
||||
},
|
||||
"data_dir": "%s",
|
||||
"bootstrap": true,
|
||||
"log_level": "debug",
|
||||
"server": true
|
||||
}`
|
||||
|
||||
@ -73,6 +74,8 @@ func newTestServer(t *testing.T) *testServer {
|
||||
|
||||
// Start the server
|
||||
cmd := exec.Command("consul", "agent", "-config-file", configFile.Name())
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
434
api/semaphore.go
Normal file
434
api/semaphore.go
Normal file
@ -0,0 +1,434 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultSemaphoreSessionName is the Session Name we assign if none is provided
|
||||
DefaultSemaphoreSessionName = "Consul API Semaphore"
|
||||
|
||||
// DefaultSemaphoreSessionTTL is the default session TTL if no Session is provided
|
||||
// when creating a new Semaphore. This is used because we do not have another
|
||||
// other check to depend upon.
|
||||
DefaultSemaphoreSessionTTL = "15s"
|
||||
|
||||
// DefaultSemaphoreWaitTime is how long we block for at a time to check if semaphore
|
||||
// acquisition is possible. This affects the minimum time it takes to cancel
|
||||
// a Semaphore acquisition.
|
||||
DefaultSemaphoreWaitTime = 15 * time.Second
|
||||
|
||||
// DefaultSemaphoreRetryTime is how long we wait after a failed lock acquisition
|
||||
// before attempting to do the lock again. This is so that once a lock-delay
|
||||
// is in affect, we do not hot loop retrying the acquisition.
|
||||
DefaultSemaphoreRetryTime = 5 * time.Second
|
||||
|
||||
// DefaultSemaphoreKey is the key used within the prefix to
|
||||
// use for coordination between all the contenders.
|
||||
DefaultSemaphoreKey = ".lock"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrSemaphoreHeld is returned if we attempt to double lock
|
||||
ErrSemaphoreHeld = fmt.Errorf("Semaphore already held")
|
||||
|
||||
// ErrSemaphoreNotHeld is returned if we attempt to unlock a lock
|
||||
// that we do not hold.
|
||||
ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held")
|
||||
)
|
||||
|
||||
// Semaphore is used to implement a distributed semaphore
|
||||
// using the Consul KV primitives.
|
||||
type Semaphore struct {
|
||||
c *Client
|
||||
opts *SemaphoreOptions
|
||||
|
||||
isHeld bool
|
||||
sessionRenew chan struct{}
|
||||
lockSession string
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
// SemaphoreOptions is used to parameterize the Semaphore
|
||||
type SemaphoreOptions struct {
|
||||
Prefix string // Must be set and have write permissions
|
||||
Limit int // Must be set, and be positive
|
||||
Value []byte // Optional, value to associate with the contender entry
|
||||
Session string // OPtional, created if not specified
|
||||
SessionName string // Optional, defaults to DefaultLockSessionName
|
||||
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
|
||||
}
|
||||
|
||||
// semaphoreLock is written under the DefaultSemaphoreKey and
|
||||
// is used to coordinate between all the contenders.
|
||||
type semaphoreLock struct {
|
||||
// Limit is the integer limit of holders. This is used to
|
||||
// verify that all the holders agree on the value.
|
||||
Limit int
|
||||
|
||||
// Holders is a list of all the semaphore holders.
|
||||
// It maps the session ID to true. It is used as a set effectively.
|
||||
Holders map[string]bool
|
||||
}
|
||||
|
||||
// SemaphorePrefix is used to created a Semaphore which will operate
|
||||
// at the given KV prefix and uses the given limit for the semaphore.
|
||||
// The prefix must have write privileges, and the limit must be agreed
|
||||
// upon by all contenders.
|
||||
func (c *Client) SemaphorePrefix(prefix string, limit int) (*Semaphore, error) {
|
||||
opts := &SemaphoreOptions{
|
||||
Prefix: prefix,
|
||||
Limit: limit,
|
||||
}
|
||||
return c.SemaphoreOpts(opts)
|
||||
}
|
||||
|
||||
// SemaphoreOpts is used to create a Semaphore with the given options.
|
||||
// The prefix must have write privileges, and the limit must be agreed
|
||||
// upon by all contenders. If a Session is not provided, one will be created.
|
||||
func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
|
||||
if opts.Prefix == "" {
|
||||
return nil, fmt.Errorf("missing prefix")
|
||||
}
|
||||
if opts.Limit <= 0 {
|
||||
return nil, fmt.Errorf("semaphore limit must be positive")
|
||||
}
|
||||
if opts.SessionName == "" {
|
||||
opts.SessionName = DefaultSemaphoreSessionName
|
||||
}
|
||||
if opts.SessionTTL == "" {
|
||||
opts.SessionTTL = DefaultSemaphoreSessionTTL
|
||||
} else {
|
||||
if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
|
||||
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
|
||||
}
|
||||
}
|
||||
s := &Semaphore{
|
||||
c: c,
|
||||
opts: opts,
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Acquire attempts to reserve a slot in the semaphore, blocking until
|
||||
// success, interrupted via the stopCh or an error is encounted.
|
||||
// Providing a non-nil stopCh can be used to abort the attempt.
|
||||
// On success, a channel is returned that represents our slot.
|
||||
// This channel could be closed at any time due to session invalidation,
|
||||
// communication errors, operator intervention, etc. It is NOT safe to
|
||||
// assume that the slot is held until Release() unless the Session is specifically
|
||||
// created without any associated health checks. By default Consul sessions
|
||||
// prefer liveness over safety and an application must be able to handle
|
||||
// the session being lost.
|
||||
func (s *Semaphore) Acquire(stopCh chan struct{}) (chan struct{}, error) {
|
||||
// Hold the lock as we try to acquire
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// Check if we already hold the semaphore
|
||||
if s.isHeld {
|
||||
return nil, ErrSemaphoreHeld
|
||||
}
|
||||
|
||||
// Check if we need to create a session first
|
||||
s.lockSession = s.opts.Session
|
||||
if s.lockSession == "" {
|
||||
if sess, err := s.createSession(); err != nil {
|
||||
return nil, fmt.Errorf("failed to create session: %v", err)
|
||||
} else {
|
||||
s.sessionRenew = make(chan struct{})
|
||||
s.lockSession = sess
|
||||
go s.renewSession(sess, s.sessionRenew)
|
||||
|
||||
// If we fail to acquire the lock, cleanup the session
|
||||
defer func() {
|
||||
if !s.isHeld {
|
||||
close(s.sessionRenew)
|
||||
s.sessionRenew = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Create the contender entry
|
||||
kv := s.c.KV()
|
||||
made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), nil)
|
||||
if err != nil || !made {
|
||||
return nil, fmt.Errorf("failed to make contender entry: %v", err)
|
||||
}
|
||||
|
||||
// Setup the query options
|
||||
qOpts := &QueryOptions{
|
||||
WaitTime: DefaultSemaphoreWaitTime,
|
||||
}
|
||||
|
||||
WAIT:
|
||||
// Check if we should quit
|
||||
select {
|
||||
case <-stopCh:
|
||||
return nil, nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Read the prefix
|
||||
pairs, meta, err := kv.List(s.opts.Prefix, qOpts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read prefix: %v", err)
|
||||
}
|
||||
|
||||
// Decode the lock
|
||||
lockPair := s.findLock(pairs)
|
||||
lock, err := s.decodeLock(lockPair)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Verify we agree with the limit
|
||||
if lock.Limit != s.opts.Limit {
|
||||
return nil, fmt.Errorf("semaphore limit conflict (lock: %d, local: %d)",
|
||||
lock.Limit, s.opts.Limit)
|
||||
}
|
||||
|
||||
// Prune the dead holders
|
||||
s.pruneDeadHolders(lock, pairs)
|
||||
|
||||
// Check if the lock is held
|
||||
if len(lock.Holders) >= lock.Limit {
|
||||
qOpts.WaitIndex = meta.LastIndex
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
// Create a new lock with us as a holder
|
||||
lock.Holders[s.lockSession] = true
|
||||
newLock, err := s.encodeLock(lock, lockPair.ModifyIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attempt the acquisition
|
||||
didSet, _, err := kv.CAS(newLock, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to update lock: %v", err)
|
||||
}
|
||||
if !didSet {
|
||||
// Update failed, could have been a race with another contender,
|
||||
// retry the operation
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
// Watch to ensure we maintain ownership of the slot
|
||||
lockCh := make(chan struct{})
|
||||
go s.monitorLock(s.lockSession, lockCh)
|
||||
|
||||
// Set that we own the lock
|
||||
s.isHeld = true
|
||||
|
||||
// Acquired! All done
|
||||
return lockCh, nil
|
||||
}
|
||||
|
||||
// Release is used to voluntarily give up our semaphore slot. It is
|
||||
// an error to call this if the semaphore has not been acquired.
|
||||
func (s *Semaphore) Release() error {
|
||||
// Hold the lock as we try to release
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// Ensure the lock is actually held
|
||||
if !s.isHeld {
|
||||
return ErrSemaphoreNotHeld
|
||||
}
|
||||
|
||||
// Set that we no longer own the lock
|
||||
s.isHeld = false
|
||||
|
||||
// Stop the session renew
|
||||
if s.sessionRenew != nil {
|
||||
defer func() {
|
||||
close(s.sessionRenew)
|
||||
s.sessionRenew = nil
|
||||
}()
|
||||
}
|
||||
|
||||
// Get and clear the lock session
|
||||
lockSession := s.lockSession
|
||||
s.lockSession = ""
|
||||
|
||||
// Remove ourselves as a lock holder
|
||||
kv := s.c.KV()
|
||||
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
|
||||
READ:
|
||||
pair, _, err := kv.Get(key, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pair == nil {
|
||||
pair = &KVPair{}
|
||||
}
|
||||
lock, err := s.decodeLock(pair)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a new lock without us as a holder
|
||||
if _, ok := lock.Holders[lockSession]; ok {
|
||||
delete(lock.Holders, lockSession)
|
||||
newLock, err := s.encodeLock(lock, pair.ModifyIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Swap the locks
|
||||
didSet, _, err := kv.CAS(newLock, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update lock: %v", err)
|
||||
}
|
||||
if !didSet {
|
||||
goto READ
|
||||
}
|
||||
}
|
||||
|
||||
// Destroy the contender entry
|
||||
contenderKey := path.Join(s.opts.Prefix, lockSession)
|
||||
if _, err := kv.Delete(contenderKey, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createSession is used to create a new managed session
|
||||
func (s *Semaphore) createSession() (string, error) {
|
||||
session := s.c.Session()
|
||||
se := &SessionEntry{
|
||||
Name: s.opts.SessionName,
|
||||
TTL: s.opts.SessionTTL,
|
||||
Behavior: SessionBehaviorDelete,
|
||||
}
|
||||
id, _, err := session.Create(se, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// renewSession is a long running routine that maintians a session
|
||||
// by doing a periodic Session renewal.
|
||||
func (s *Semaphore) renewSession(id string, doneCh chan struct{}) {
|
||||
session := s.c.Session()
|
||||
ttl, _ := time.ParseDuration(s.opts.SessionTTL)
|
||||
for {
|
||||
select {
|
||||
case <-time.After(ttl / 2):
|
||||
entry, _, err := session.Renew(id, nil)
|
||||
if err != nil || entry == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Handle the server updating the TTL
|
||||
ttl, _ = time.ParseDuration(entry.TTL)
|
||||
|
||||
case <-doneCh:
|
||||
// Attempt a session destroy
|
||||
session.Destroy(id, nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// contenderEntry returns a formatted KVPair for the contender
|
||||
func (s *Semaphore) contenderEntry(session string) *KVPair {
|
||||
return &KVPair{
|
||||
Key: path.Join(s.opts.Prefix, session),
|
||||
Value: s.opts.Value,
|
||||
Session: session,
|
||||
}
|
||||
}
|
||||
|
||||
// findLock is used to find the KV Pair which is used for coordination
|
||||
func (s *Semaphore) findLock(pairs KVPairs) *KVPair {
|
||||
key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
|
||||
for _, pair := range pairs {
|
||||
if pair.Key == key {
|
||||
return pair
|
||||
}
|
||||
}
|
||||
return &KVPair{}
|
||||
}
|
||||
|
||||
// decodeLock is used to decode a semaphoreLock from an
|
||||
// entry in Consul
|
||||
func (s *Semaphore) decodeLock(pair *KVPair) (*semaphoreLock, error) {
|
||||
// Handle if there is no lock
|
||||
if pair == nil || pair.Value == nil {
|
||||
return &semaphoreLock{
|
||||
Limit: s.opts.Limit,
|
||||
Holders: make(map[string]bool),
|
||||
}, nil
|
||||
}
|
||||
|
||||
l := &semaphoreLock{}
|
||||
if err := json.Unmarshal(pair.Value, l); err != nil {
|
||||
return nil, fmt.Errorf("lock decoding failed: %v", err)
|
||||
}
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// encodeLock is used to encode a semaphoreLock into a KVPair
|
||||
// that can be PUT
|
||||
func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, error) {
|
||||
enc, err := json.Marshal(l)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("lock encoding failed: %v", err)
|
||||
}
|
||||
pair := &KVPair{
|
||||
Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey),
|
||||
Value: enc,
|
||||
ModifyIndex: oldIndex,
|
||||
}
|
||||
return pair, nil
|
||||
}
|
||||
|
||||
// pruneDeadHolders is used to remove all the dead lock holders
|
||||
func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) {
|
||||
// Gather all the live holders
|
||||
alive := make(map[string]struct{}, len(pairs))
|
||||
for _, pair := range pairs {
|
||||
if pair.Session != "" {
|
||||
alive[pair.Session] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove any holders that are dead
|
||||
for holder := range lock.Holders {
|
||||
if _, ok := alive[holder]; !ok {
|
||||
delete(lock.Holders, holder)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// monitorLock is a long running routine to monitor a semaphore ownership
|
||||
// It closes the stopCh if we lose our slot.
|
||||
func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) {
|
||||
defer close(stopCh)
|
||||
kv := s.c.KV()
|
||||
opts := &QueryOptions{RequireConsistent: true}
|
||||
WAIT:
|
||||
pairs, meta, err := kv.List(s.opts.Prefix, opts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
lockPair := s.findLock(pairs)
|
||||
lock, err := s.decodeLock(lockPair)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.pruneDeadHolders(lock, pairs)
|
||||
if _, ok := lock.Holders[session]; ok {
|
||||
opts.WaitIndex = meta.LastIndex
|
||||
goto WAIT
|
||||
}
|
||||
}
|
214
api/semaphore_test.go
Normal file
214
api/semaphore_test.go
Normal file
@ -0,0 +1,214 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSemaphore_AcquireRelease(t *testing.T) {
|
||||
c, s := makeClient(t)
|
||||
defer s.stop()
|
||||
|
||||
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Initial release should fail
|
||||
err = sema.Release()
|
||||
if err != ErrSemaphoreNotHeld {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should work
|
||||
lockCh, err := sema.Acquire(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if lockCh == nil {
|
||||
t.Fatalf("not hold")
|
||||
}
|
||||
|
||||
// Double lock should fail
|
||||
_, err = sema.Acquire(nil)
|
||||
if err != ErrSemaphoreHeld {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should be held
|
||||
select {
|
||||
case <-lockCh:
|
||||
t.Fatalf("should be held")
|
||||
default:
|
||||
}
|
||||
|
||||
// Initial release should work
|
||||
err = sema.Release()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Double unlock should fail
|
||||
err = sema.Release()
|
||||
if err != ErrSemaphoreNotHeld {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should lose resource
|
||||
select {
|
||||
case <-lockCh:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("should not be held")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSemaphore_ForceInvalidate(t *testing.T) {
|
||||
c, s := makeClient(t)
|
||||
defer s.stop()
|
||||
|
||||
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should work
|
||||
lockCh, err := sema.Acquire(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if lockCh == nil {
|
||||
t.Fatalf("not acquired")
|
||||
}
|
||||
defer sema.Release()
|
||||
|
||||
go func() {
|
||||
// Nuke the session, simulator an operator invalidation
|
||||
// or a health check failure
|
||||
session := c.Session()
|
||||
session.Destroy(sema.lockSession, nil)
|
||||
}()
|
||||
|
||||
// Should loose slot
|
||||
select {
|
||||
case <-lockCh:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("should not be locked")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSemaphore_DeleteKey(t *testing.T) {
|
||||
c, s := makeClient(t)
|
||||
defer s.stop()
|
||||
|
||||
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should work
|
||||
lockCh, err := sema.Acquire(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if lockCh == nil {
|
||||
t.Fatalf("not locked")
|
||||
}
|
||||
defer sema.Release()
|
||||
|
||||
go func() {
|
||||
// Nuke the key, simulate an operator intervention
|
||||
kv := c.KV()
|
||||
kv.DeleteTree("test/semaphore", nil)
|
||||
}()
|
||||
|
||||
// Should loose leadership
|
||||
select {
|
||||
case <-lockCh:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("should not be locked")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSemaphore_Contend(t *testing.T) {
|
||||
c, s := makeClient(t)
|
||||
defer s.stop()
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
acquired := make([]bool, 4)
|
||||
for idx := range acquired {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Should work eventually, will contend
|
||||
lockCh, err := sema.Acquire(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if lockCh == nil {
|
||||
t.Fatalf("not locked")
|
||||
}
|
||||
defer sema.Release()
|
||||
log.Printf("Contender %d acquired", idx)
|
||||
|
||||
// Set acquired and then leave
|
||||
acquired[idx] = true
|
||||
}(idx)
|
||||
}
|
||||
|
||||
// Wait for termination
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
// Wait for everybody to get a turn
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(3 * DefaultLockRetryTime):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
|
||||
for idx, did := range acquired {
|
||||
if !did {
|
||||
t.Fatalf("contender %d never acquired", idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSemaphore_BadLimit(t *testing.T) {
|
||||
c, s := makeClient(t)
|
||||
defer s.stop()
|
||||
|
||||
sema, err := c.SemaphorePrefix("test/semaphore", 0)
|
||||
if err == nil {
|
||||
t.Fatalf("should error")
|
||||
}
|
||||
|
||||
sema, err = c.SemaphorePrefix("test/semaphore", 1)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
_, err = sema.Acquire(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
sema2, err := c.SemaphorePrefix("test/semaphore", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
_, err = sema2.Acquire(nil)
|
||||
if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
@ -157,6 +157,8 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
||||
} else {
|
||||
reply.Index = index
|
||||
}
|
||||
reply.Entries = nil
|
||||
|
||||
} else {
|
||||
// Determine the maximum affected index
|
||||
var maxIndex uint64
|
||||
|
Loading…
x
Reference in New Issue
Block a user