mirror of https://github.com/status-im/consul.git
lib/retry: extract a new package from lib
This commit is contained in:
parent
52451cf846
commit
ca26dfb4a2
|
@ -7,13 +7,14 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
"github.com/hashicorp/consul/logging"
|
"github.com/hashicorp/consul/logging"
|
||||||
"github.com/hashicorp/consul/proto/pbautoconf"
|
"github.com/hashicorp/consul/proto/pbautoconf"
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutoConfig is all the state necessary for being able to parse a configuration
|
// AutoConfig is all the state necessary for being able to parse a configuration
|
||||||
|
@ -24,7 +25,7 @@ type AutoConfig struct {
|
||||||
acConfig Config
|
acConfig Config
|
||||||
logger hclog.Logger
|
logger hclog.Logger
|
||||||
cache Cache
|
cache Cache
|
||||||
waiter *lib.RetryWaiter
|
waiter *retry.Waiter
|
||||||
config *config.RuntimeConfig
|
config *config.RuntimeConfig
|
||||||
autoConfigResponse *pbautoconf.AutoConfigResponse
|
autoConfigResponse *pbautoconf.AutoConfigResponse
|
||||||
autoConfigSource config.Source
|
autoConfigSource config.Source
|
||||||
|
@ -84,7 +85,7 @@ func New(config Config) (*AutoConfig, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Waiter == nil {
|
if config.Waiter == nil {
|
||||||
config.Waiter = lib.NewRetryWaiter(1, 0, 10*time.Minute, lib.NewJitterRandomStagger(25))
|
config.Waiter = retry.NewRetryWaiter(1, 0, 10*time.Minute, retry.NewJitterRandomStagger(25))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &AutoConfig{
|
return &AutoConfig{
|
||||||
|
|
|
@ -11,6 +11,9 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
|
@ -18,13 +21,11 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
"github.com/hashicorp/consul/proto/pbautoconf"
|
"github.com/hashicorp/consul/proto/pbautoconf"
|
||||||
"github.com/hashicorp/consul/proto/pbconfig"
|
"github.com/hashicorp/consul/proto/pbconfig"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
testretry "github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type configLoader struct {
|
type configLoader struct {
|
||||||
|
@ -412,7 +413,7 @@ func TestInitialConfiguration_retries(t *testing.T) {
|
||||||
mcfg.Config.Loader = loader.Load
|
mcfg.Config.Loader = loader.Load
|
||||||
|
|
||||||
// reduce the retry wait times to make this test run faster
|
// reduce the retry wait times to make this test run faster
|
||||||
mcfg.Config.Waiter = lib.NewRetryWaiter(2, 0, 1*time.Millisecond, nil)
|
mcfg.Config.Waiter = retry.NewWaiter(2, 0, 1*time.Millisecond, nil)
|
||||||
|
|
||||||
indexedRoots, cert, extraCerts := mcfg.setupInitialTLS(t, "autoconf", "dc1", "secret")
|
indexedRoots, cert, extraCerts := mcfg.setupInitialTLS(t, "autoconf", "dc1", "secret")
|
||||||
|
|
||||||
|
@ -927,7 +928,7 @@ func TestRootsUpdate(t *testing.T) {
|
||||||
// however there is no deterministic way to know once its been written outside of maybe a filesystem
|
// however there is no deterministic way to know once its been written outside of maybe a filesystem
|
||||||
// event notifier. That seems a little heavy handed just for this and especially to do in any sort
|
// event notifier. That seems a little heavy handed just for this and especially to do in any sort
|
||||||
// of cross platform way.
|
// of cross platform way.
|
||||||
retry.Run(t, func(r *retry.R) {
|
testretry.Run(t, func(r *testretry.R) {
|
||||||
resp, err := testAC.ac.readPersistedAutoConfig()
|
resp, err := testAC.ac.readPersistedAutoConfig()
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
require.Equal(r, secondRoots.ActiveRootID, resp.CARoots.GetActiveRootID())
|
require.Equal(r, secondRoots.ActiveRootID, resp.CARoots.GetActiveRootID())
|
||||||
|
@ -972,7 +973,7 @@ func TestCertUpdate(t *testing.T) {
|
||||||
// persisting these to disk happens after all the things we would wait for in assertCertUpdated
|
// persisting these to disk happens after all the things we would wait for in assertCertUpdated
|
||||||
// will have fired. There is no deterministic way to know once its been written so we wrap
|
// will have fired. There is no deterministic way to know once its been written so we wrap
|
||||||
// this in a retry.
|
// this in a retry.
|
||||||
retry.Run(t, func(r *retry.R) {
|
testretry.Run(t, func(r *testretry.R) {
|
||||||
resp, err := testAC.ac.readPersistedAutoConfig()
|
resp, err := testAC.ac.readPersistedAutoConfig()
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
|
|
||||||
|
@ -1099,7 +1100,7 @@ func TestFallback(t *testing.T) {
|
||||||
|
|
||||||
// persisting these to disk happens after the RPC we waited on above will have fired
|
// persisting these to disk happens after the RPC we waited on above will have fired
|
||||||
// There is no deterministic way to know once its been written so we wrap this in a retry.
|
// There is no deterministic way to know once its been written so we wrap this in a retry.
|
||||||
retry.Run(t, func(r *retry.R) {
|
testretry.Run(t, func(r *testretry.R) {
|
||||||
resp, err := testAC.ac.readPersistedAutoConfig()
|
resp, err := testAC.ac.readPersistedAutoConfig()
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
|
|
||||||
|
|
|
@ -11,16 +11,17 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/connect"
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAutoEncrypt_generateCSR(t *testing.T) {
|
func TestAutoEncrypt_generateCSR(t *testing.T) {
|
||||||
|
@ -247,7 +248,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) {
|
||||||
resp.VerifyServerHostname = true
|
resp.VerifyServerHostname = true
|
||||||
})
|
})
|
||||||
|
|
||||||
mcfg.Config.Waiter = lib.NewRetryWaiter(2, 0, 1*time.Millisecond, nil)
|
mcfg.Config.Waiter = retry.NewRetryWaiter(2, 0, 1*time.Millisecond, nil)
|
||||||
|
|
||||||
ac := AutoConfig{
|
ac := AutoConfig{
|
||||||
config: &config.RuntimeConfig{
|
config: &config.RuntimeConfig{
|
||||||
|
|
|
@ -5,12 +5,13 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/metadata"
|
"github.com/hashicorp/consul/agent/metadata"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DirectRPC is the interface that needs to be satisifed for AutoConfig to be able to perform
|
// DirectRPC is the interface that needs to be satisifed for AutoConfig to be able to perform
|
||||||
|
@ -77,7 +78,7 @@ type Config struct {
|
||||||
// jitter of 25% of the wait time. Setting this is mainly useful for
|
// jitter of 25% of the wait time. Setting this is mainly useful for
|
||||||
// testing purposes to allow testing out the retrying functionality without
|
// testing purposes to allow testing out the retrying functionality without
|
||||||
// having the test take minutes/hours to complete.
|
// having the test take minutes/hours to complete.
|
||||||
Waiter *lib.RetryWaiter
|
Waiter *retry.Waiter
|
||||||
|
|
||||||
// Loader merges source with the existing FileSources and returns the complete
|
// Loader merges source with the existing FileSources and returns the complete
|
||||||
// RuntimeConfig.
|
// RuntimeConfig.
|
||||||
|
|
|
@ -7,10 +7,11 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
metrics "github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/lib"
|
|
||||||
"github.com/hashicorp/consul/logging"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
"github.com/hashicorp/consul/logging"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -46,7 +47,7 @@ type ReplicatorConfig struct {
|
||||||
|
|
||||||
type Replicator struct {
|
type Replicator struct {
|
||||||
limiter *rate.Limiter
|
limiter *rate.Limiter
|
||||||
waiter *lib.RetryWaiter
|
waiter *retry.Waiter
|
||||||
delegate ReplicatorDelegate
|
delegate ReplicatorDelegate
|
||||||
logger hclog.Logger
|
logger hclog.Logger
|
||||||
lastRemoteIndex uint64
|
lastRemoteIndex uint64
|
||||||
|
@ -75,7 +76,7 @@ func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
||||||
if minFailures < 0 {
|
if minFailures < 0 {
|
||||||
minFailures = 0
|
minFailures = 0
|
||||||
}
|
}
|
||||||
waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10))
|
waiter := retry.NewRetryWaiter(minFailures, 0*time.Second, maxWait, retry.NewJitterRandomStagger(10))
|
||||||
return &Replicator{
|
return &Replicator{
|
||||||
limiter: limiter,
|
limiter: limiter,
|
||||||
waiter: waiter,
|
waiter: waiter,
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
package lib
|
package retry
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -39,12 +41,12 @@ func (j *JitterRandomStagger) AddJitter(baseTime time.Duration) time.Duration {
|
||||||
|
|
||||||
// time.Duration is actually a type alias for int64 which is why casting
|
// time.Duration is actually a type alias for int64 which is why casting
|
||||||
// to the duration type and then dividing works
|
// to the duration type and then dividing works
|
||||||
return baseTime + RandomStagger((baseTime*time.Duration(j.percent))/100)
|
return baseTime + lib.RandomStagger((baseTime*time.Duration(j.percent))/100)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetryWaiter will record failed and successful operations and provide
|
// RetryWaiter will record failed and successful operations and provide
|
||||||
// a channel to wait on before a failed operation can be retried.
|
// a channel to wait on before a failed operation can be retried.
|
||||||
type RetryWaiter struct {
|
type Waiter struct {
|
||||||
minFailures uint
|
minFailures uint
|
||||||
minWait time.Duration
|
minWait time.Duration
|
||||||
maxWait time.Duration
|
maxWait time.Duration
|
||||||
|
@ -53,7 +55,7 @@ type RetryWaiter struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new RetryWaiter
|
// Creates a new RetryWaiter
|
||||||
func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *RetryWaiter {
|
func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *Waiter {
|
||||||
if minFailures < 0 {
|
if minFailures < 0 {
|
||||||
minFailures = defaultMinFailures
|
minFailures = defaultMinFailures
|
||||||
}
|
}
|
||||||
|
@ -66,7 +68,7 @@ func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitt
|
||||||
minWait = 0 * time.Nanosecond
|
minWait = 0 * time.Nanosecond
|
||||||
}
|
}
|
||||||
|
|
||||||
return &RetryWaiter{
|
return &Waiter{
|
||||||
minFailures: uint(minFailures),
|
minFailures: uint(minFailures),
|
||||||
minWait: minWait,
|
minWait: minWait,
|
||||||
maxWait: maxWait,
|
maxWait: maxWait,
|
||||||
|
@ -77,7 +79,7 @@ func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitt
|
||||||
|
|
||||||
// calculates the necessary wait time before the
|
// calculates the necessary wait time before the
|
||||||
// next operation should be allowed.
|
// next operation should be allowed.
|
||||||
func (rw *RetryWaiter) calculateWait() time.Duration {
|
func (rw *Waiter) calculateWait() time.Duration {
|
||||||
waitTime := rw.minWait
|
waitTime := rw.minWait
|
||||||
if rw.failures > rw.minFailures {
|
if rw.failures > rw.minFailures {
|
||||||
shift := rw.failures - rw.minFailures - 1
|
shift := rw.failures - rw.minFailures - 1
|
||||||
|
@ -104,7 +106,7 @@ func (rw *RetryWaiter) calculateWait() time.Duration {
|
||||||
// calculates the waitTime and returns a chan
|
// calculates the waitTime and returns a chan
|
||||||
// that will become selectable once that amount
|
// that will become selectable once that amount
|
||||||
// of time has elapsed.
|
// of time has elapsed.
|
||||||
func (rw *RetryWaiter) wait() <-chan struct{} {
|
func (rw *Waiter) wait() <-chan struct{} {
|
||||||
waitTime := rw.calculateWait()
|
waitTime := rw.calculateWait()
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
if waitTime > 0 {
|
if waitTime > 0 {
|
||||||
|
@ -119,28 +121,33 @@ func (rw *RetryWaiter) wait() <-chan struct{} {
|
||||||
|
|
||||||
// Marks that an operation is successful which resets the failure count.
|
// Marks that an operation is successful which resets the failure count.
|
||||||
// The chan that is returned will be immediately selectable
|
// The chan that is returned will be immediately selectable
|
||||||
func (rw *RetryWaiter) Success() <-chan struct{} {
|
func (rw *Waiter) Success() <-chan struct{} {
|
||||||
rw.Reset()
|
rw.Reset()
|
||||||
return rw.wait()
|
return rw.wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Marks that an operation failed. The chan returned will be selectable
|
// Marks that an operation failed. The chan returned will be selectable
|
||||||
// once the calculated retry wait amount of time has elapsed
|
// once the calculated retry wait amount of time has elapsed
|
||||||
func (rw *RetryWaiter) Failed() <-chan struct{} {
|
func (rw *Waiter) Failed() <-chan struct{} {
|
||||||
rw.failures += 1
|
rw.failures += 1
|
||||||
ch := rw.wait()
|
ch := rw.wait()
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resets the internal failure counter
|
// Resets the internal failure counter.
|
||||||
func (rw *RetryWaiter) Reset() {
|
func (rw *Waiter) Reset() {
|
||||||
rw.failures = 0
|
rw.failures = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Failures returns the current number of consecutive failures recorded.
|
||||||
|
func (rw *Waiter) Failures() int {
|
||||||
|
return int(rw.failures)
|
||||||
|
}
|
||||||
|
|
||||||
// WaitIf is a convenice method to record whether the last
|
// WaitIf is a convenice method to record whether the last
|
||||||
// operation was a success or failure and return a chan that
|
// operation was a success or failure and return a chan that
|
||||||
// will be selectablw when the next operation can be done.
|
// will be selectablw when the next operation can be done.
|
||||||
func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} {
|
func (rw *Waiter) WaitIf(failure bool) <-chan struct{} {
|
||||||
if failure {
|
if failure {
|
||||||
return rw.Failed()
|
return rw.Failed()
|
||||||
}
|
}
|
||||||
|
@ -151,6 +158,6 @@ func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} {
|
||||||
// operation was a success or failure based on whether the err
|
// operation was a success or failure based on whether the err
|
||||||
// is nil and then return a chan that will be selectable when
|
// is nil and then return a chan that will be selectable when
|
||||||
// the next operation can be done.
|
// the next operation can be done.
|
||||||
func (rw *RetryWaiter) WaitIfErr(err error) <-chan struct{} {
|
func (rw *Waiter) WaitIfErr(err error) <-chan struct{} {
|
||||||
return rw.WaitIf(err != nil)
|
return rw.WaitIf(err != nil)
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package lib
|
package retry
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
Loading…
Reference in New Issue