diff --git a/api/lock.go b/api/lock.go index f93241ab35..0882e520d2 100644 --- a/api/lock.go +++ b/api/lock.go @@ -29,7 +29,8 @@ const ( // DefaultMonitorRetryTime is how long we wait after a failed monitor check // of a lock (500 response code). This allows the monitor to ride out brief // periods of unavailability, subject to the MonitorRetries setting in the - // lock options which is by default set to 0, disabling this feature. + // lock options which is by default set to 0, disabling this feature. This + // affects locks and semaphores. DefaultMonitorRetryTime = 2 * time.Second // LockFlagValue is a magic flag we set to indicate a key diff --git a/api/semaphore.go b/api/semaphore.go index cce1375813..ab01773f58 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "path" + "strings" "sync" "time" ) @@ -69,6 +70,8 @@ type SemaphoreOptions struct { Session string // Optional, created if not specified SessionName string // Optional, defaults to DefaultLockSessionName SessionTTL string // Optional, defaults to DefaultLockSessionTTL + MonitorRetries int // Optional, defaults to 0 which means no retries + MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime SemaphoreTryOnce bool // Optional, defaults to false which means try forever } @@ -117,6 +120,9 @@ func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) { return nil, fmt.Errorf("invalid SessionTTL: %v", err) } } + if opts.MonitorRetryTime == 0 { + opts.MonitorRetryTime = DefaultMonitorRetryTime + } if opts.SemaphoreWaitTime == 0 { opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime } @@ -472,8 +478,24 @@ func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) { kv := s.c.KV() opts := &QueryOptions{RequireConsistent: true} WAIT: + retries := s.opts.MonitorRetries +RETRY: pairs, meta, err := kv.List(s.opts.Prefix, opts) if err != nil { + // TODO (slackpad) - Make a real error type here instead of using + // a string check. + const serverError = "Unexpected response code: 500" + + // If configured we can try to ride out a brief Consul unavailability + // by doing retries. Note that we have to attempt the retry in a non- + // blocking fashion so that we have a clean place to reset the retry + // counter if service is restored. + if retries > 0 && strings.Contains(err.Error(), serverError) { + time.Sleep(s.opts.MonitorRetryTime) + retries-- + opts.WaitIndex = 0 + goto RETRY + } return } lockPair := s.findLock(pairs) diff --git a/api/semaphore_test.go b/api/semaphore_test.go index 98371ea10d..ab296c4beb 100644 --- a/api/semaphore_test.go +++ b/api/semaphore_test.go @@ -2,6 +2,10 @@ package api import ( "log" + "net/http" + "net/http/httptest" + "net/http/httputil" + "strings" "sync" "testing" "time" @@ -312,6 +316,123 @@ func TestSemaphore_Conflict(t *testing.T) { } } +func TestSemaphore_MonitorRetry(t *testing.T) { + t.Parallel() + raw, s := makeClient(t) + defer s.Stop() + + // Set up a server that always responds with 500 errors. + failer := func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(500) + } + outage := httptest.NewServer(http.HandlerFunc(failer)) + defer outage.Close() + + // Set up a reverse proxy that will send some requests to the + // 500 server and pass everything else through to the real Consul + // server. + var mutex sync.Mutex + errors := 0 + director := func(req *http.Request) { + mutex.Lock() + defer mutex.Unlock() + + req.URL.Scheme = "http" + if errors > 0 && req.Method == "GET" && strings.Contains(req.URL.Path, "/v1/kv/test/sema/.lock") { + req.URL.Host = outage.URL[7:] // Strip off "http://". + errors-- + } else { + req.URL.Host = raw.config.Address + } + } + proxy := httptest.NewServer(&httputil.ReverseProxy{Director: director}) + defer proxy.Close() + + // Make another client that points at the proxy instead of the real + // Consul server. + config := raw.config + config.Address = proxy.URL[7:] // Strip off "http://". + c, err := NewClient(&config) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Set up a lock with retries enabled. + opts := &SemaphoreOptions{ + Prefix: "test/sema/.lock", + Limit: 2, + SessionTTL: "60s", + MonitorRetries: 3, + } + sema, err := c.SemaphoreOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Make sure the default got set. + if sema.opts.MonitorRetryTime != DefaultMonitorRetryTime { + t.Fatalf("bad: %d", sema.opts.MonitorRetryTime) + } + + // Now set a custom time for the test. + opts.MonitorRetryTime = 250 * time.Millisecond + sema, err = c.SemaphoreOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + if sema.opts.MonitorRetryTime != 250*time.Millisecond { + t.Fatalf("bad: %d", sema.opts.MonitorRetryTime) + } + + // Should get the lock. + ch, err := sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if ch == nil { + t.Fatalf("didn't acquire") + } + + // Take the semaphore using the raw client to force the monitor to wake + // up and check the lock again. This time we will return errors for some + // of the responses. + mutex.Lock() + errors = 2 + mutex.Unlock() + another, err := raw.SemaphoreOpts(opts) + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err := another.Acquire(nil); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(5 * opts.MonitorRetryTime) + + // Should still have the semaphore. + select { + case <-ch: + t.Fatalf("lost the semaphore") + default: + } + + // Now return an overwhelming number of errors, using the raw client to + // poke the key and get the monitor to run again. + mutex.Lock() + errors = 10 + mutex.Unlock() + if err := another.Release(); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(5 * opts.MonitorRetryTime) + + // Should lose the semaphore. + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("should not have the semaphore") + } +} + func TestSemaphore_OneShot(t *testing.T) { t.Parallel() c, s := makeClient(t)