Adds the ability for semaphore monitors to ride out brief periods of 500 errors.

This commit is contained in:
James Phillips 2016-01-05 17:09:03 -08:00
parent 8caa9e4c7e
commit ca08ba3aee
3 changed files with 145 additions and 1 deletions

View File

@ -29,7 +29,8 @@ const (
// DefaultMonitorRetryTime is how long we wait after a failed monitor check
// of a lock (500 response code). This allows the monitor to ride out brief
// periods of unavailability, subject to the MonitorRetries setting in the
// lock options which is by default set to 0, disabling this feature.
// lock options which is by default set to 0, disabling this feature. This
// affects locks and semaphores.
DefaultMonitorRetryTime = 2 * time.Second
// LockFlagValue is a magic flag we set to indicate a key

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"path"
"strings"
"sync"
"time"
)
@ -69,6 +70,8 @@ type SemaphoreOptions struct {
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
MonitorRetries int // Optional, defaults to 0 which means no retries
MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime
SemaphoreTryOnce bool // Optional, defaults to false which means try forever
}
@ -117,6 +120,9 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
if opts.MonitorRetryTime == 0 {
opts.MonitorRetryTime = DefaultMonitorRetryTime
}
if opts.SemaphoreWaitTime == 0 {
opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime
}
@ -472,8 +478,24 @@ func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) {
kv := s.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
retries := s.opts.MonitorRetries
RETRY:
pairs, meta, err := kv.List(s.opts.Prefix, opts)
if err != nil {
// TODO (slackpad) - Make a real error type here instead of using
// a string check.
const serverError = "Unexpected response code: 500"
// If configured we can try to ride out a brief Consul unavailability
// by doing retries. Note that we have to attempt the retry in a non-
// blocking fashion so that we have a clean place to reset the retry
// counter if service is restored.
if retries > 0 && strings.Contains(err.Error(), serverError) {
time.Sleep(s.opts.MonitorRetryTime)
retries--
opts.WaitIndex = 0
goto RETRY
}
return
}
lockPair := s.findLock(pairs)

View File

@ -2,6 +2,10 @@ package api
import (
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
"strings"
"sync"
"testing"
"time"
@ -312,6 +316,123 @@ func TestSemaphore_Conflict(t *testing.T) {
}
}
func TestSemaphore_MonitorRetry(t *testing.T) {
t.Parallel()
raw, s := makeClient(t)
defer s.Stop()
// Set up a server that always responds with 500 errors.
failer := func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(500)
}
outage := httptest.NewServer(http.HandlerFunc(failer))
defer outage.Close()
// Set up a reverse proxy that will send some requests to the
// 500 server and pass everything else through to the real Consul
// server.
var mutex sync.Mutex
errors := 0
director := func(req *http.Request) {
mutex.Lock()
defer mutex.Unlock()
req.URL.Scheme = "http"
if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/sema/.lock") {
req.URL.Host = outage.URL[7:] // Strip off "http://".
errors--
} else {
req.URL.Host = raw.config.Address
}
}
proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director})
defer proxy.Close()
// Make another client that points at the proxy instead of the real
// Consul server.
config := raw.config
config.Address = proxy.URL[7:] // Strip off "http://".
c, err := NewClient(&config)
if err != nil {
t.Fatalf("err: %v", err)
}
// Set up a lock with retries enabled.
opts := &SemaphoreOptions{
Prefix: "test/sema/.lock",
Limit: 2,
SessionTTL: "60s",
MonitorRetries: 3,
}
sema, err := c.SemaphoreOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
// Make sure the default got set.
if sema.opts.MonitorRetryTime != DefaultMonitorRetryTime {
t.Fatalf("bad: %d", sema.opts.MonitorRetryTime)
}
// Now set a custom time for the test.
opts.MonitorRetryTime = 250 * time.Millisecond
sema, err = c.SemaphoreOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
if sema.opts.MonitorRetryTime != 250*time.Millisecond {
t.Fatalf("bad: %d", sema.opts.MonitorRetryTime)
}
// Should get the lock.
ch, err := sema.Acquire(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if ch == nil {
t.Fatalf("didn't acquire")
}
// Take the semaphore using the raw client to force the monitor to wake
// up and check the lock again. This time we will return errors for some
// of the responses.
mutex.Lock()
errors = 2
mutex.Unlock()
another, err := raw.SemaphoreOpts(opts)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := another.Acquire(nil); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(5 * opts.MonitorRetryTime)
// Should still have the semaphore.
select {
case <-ch:
t.Fatalf("lost the semaphore")
default:
}
// Now return an overwhelming number of errors, using the raw client to
// poke the key and get the monitor to run again.
mutex.Lock()
errors = 10
mutex.Unlock()
if err := another.Release(); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(5 * opts.MonitorRetryTime)
// Should lose the semaphore.
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("should not have the semaphore")
}
}
func TestSemaphore_OneShot(t *testing.T) {
t.Parallel()
c, s := makeClient(t)