Implement config entry replication (#5706)

This commit is contained in:
Matt Keeler 2019-04-26 13:38:39 -04:00 committed by GitHub
parent 6c885d383a
commit 5befe0f5d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1159 additions and 21 deletions

View File

@ -321,6 +321,20 @@ type Config struct {
// user events. This function should not block.
UserEventHandler func(serf.UserEvent)
// ConfigReplicationRate is the max number of replication rounds that can
// be run per second. Note that either 1 or 2 RPCs are used during each replication
// round
ConfigReplicationRate int
// ConfigReplicationBurst is how many replication rounds can be bursted after a
// period of idleness
ConfigReplicationBurst int
// ConfigReplicationApply limit is the max number of replication-related
// apply operations that we allow during a one second period. This is
// used to limit the amount of Raft bandwidth used for replication.
ConfigReplicationApplyLimit int
// CoordinateUpdatePeriod controls how long a server batches coordinate
// updates before applying them in a Raft transaction. A larger period
// leads to fewer Raft transactions, but also the stored coordinates
@ -432,26 +446,29 @@ func DefaultConfig() *Config {
}
conf := &Config{
Build: version.Version,
Datacenter: DefaultDC,
NodeName: hostname,
RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(),
SerfLANConfig: lib.SerfDefaultConfig(),
SerfWANConfig: lib.SerfDefaultConfig(),
SerfFloodInterval: 60 * time.Second,
ReconcileInterval: 60 * time.Second,
ProtocolVersion: ProtocolVersion2Compatible,
ACLPolicyTTL: 30 * time.Second,
ACLTokenTTL: 30 * time.Second,
ACLDefaultPolicy: "allow",
ACLDownPolicy: "extend-cache",
ACLReplicationRate: 1,
ACLReplicationBurst: 5,
ACLReplicationApplyLimit: 100, // ops / sec
TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second,
Build: version.Version,
Datacenter: DefaultDC,
NodeName: hostname,
RPCAddr: DefaultRPCAddr,
RaftConfig: raft.DefaultConfig(),
SerfLANConfig: lib.SerfDefaultConfig(),
SerfWANConfig: lib.SerfDefaultConfig(),
SerfFloodInterval: 60 * time.Second,
ReconcileInterval: 60 * time.Second,
ProtocolVersion: ProtocolVersion2Compatible,
ACLPolicyTTL: 30 * time.Second,
ACLTokenTTL: 30 * time.Second,
ACLDefaultPolicy: "allow",
ACLDownPolicy: "extend-cache",
ACLReplicationRate: 1,
ACLReplicationBurst: 5,
ACLReplicationApplyLimit: 100, // ops / sec
ConfigReplicationRate: 1,
ConfigReplicationBurst: 5,
ConfigReplicationApplyLimit: 100, // ops / sec
TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second,
// These are tuned to provide a total throughput of 128 updates
// per second. If you update these, you should update the client-

View File

@ -132,6 +132,43 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe
})
}
// ListAll returns all the known configuration entries
func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.IndexedGenericConfigEntries) error {
if done, err := c.srv.forward("ConfigEntry.ListAll", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now())
// Fetch the ACL token, if any.
rule, err := c.srv.ResolveToken(args.Token)
if err != nil {
return err
}
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ConfigEntries(ws)
if err != nil {
return err
}
// Filter the entries returned by ACL permissions.
filteredEntries := make([]structs.ConfigEntry, 0, len(entries))
for _, entry := range entries {
if rule != nil && !entry.CanRead(rule) {
continue
}
filteredEntries = append(filteredEntries, entry)
}
reply.Entries = filteredEntries
reply.Index = index
return nil
})
}
// Delete deletes a config entry.
func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error {
if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done {

View File

@ -274,6 +274,49 @@ func TestConfigEntry_List(t *testing.T) {
require.Equal(expected, out)
}
func TestConfigEntry_ListAll(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
// Create some dummy services in the state store to look up.
state := s1.fsm.State()
expected := structs.IndexedGenericConfigEntries{
Entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
},
&structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
},
},
}
require.NoError(state.EnsureConfigEntry(1, expected.Entries[0]))
require.NoError(state.EnsureConfigEntry(2, expected.Entries[1]))
require.NoError(state.EnsureConfigEntry(3, expected.Entries[2]))
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
var out structs.IndexedGenericConfigEntries
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ListAll", &args, &out))
expected.QueryMeta = out.QueryMeta
require.Equal(expected, out)
}
func TestConfigEntry_List_ACLDeny(t *testing.T) {
t.Parallel()
@ -355,6 +398,86 @@ operator = "read"
require.Equal(structs.ProxyDefaults, proxyConf.Kind)
}
func TestConfigEntry_ListAll_ACLDeny(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// Create the ACL.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTokenTypeClient,
Rules: `
service "foo" {
policy = "read"
}
operator = "read"
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// Create some dummy service/proxy configs to be looked up.
state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
}))
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
}))
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
}))
// This should filter out the "db" service since we don't have permissions for it.
args := structs.ConfigEntryQuery{
Datacenter: s1.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: id},
}
var out structs.IndexedGenericConfigEntries
err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.ListAll", &args, &out)
require.NoError(err)
require.Len(out.Entries, 2)
svcIndex := 0
proxyIndex := 1
if out.Entries[0].GetKind() == structs.ProxyDefaults {
svcIndex = 1
proxyIndex = 0
}
svcConf, ok := out.Entries[svcIndex].(*structs.ServiceConfigEntry)
require.True(ok)
proxyConf, ok := out.Entries[proxyIndex].(*structs.ProxyConfigEntry)
require.True(ok)
require.Equal("foo", svcConf.Name)
require.Equal(structs.ServiceDefaults, svcConf.Kind)
require.Equal(structs.ProxyConfigGlobal, proxyConf.Name)
require.Equal(structs.ProxyDefaults, proxyConf.Kind)
}
func TestConfigEntry_Delete(t *testing.T) {
t.Parallel()

View File

@ -0,0 +1,199 @@
package consul
import (
"context"
"fmt"
"sort"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/structs"
)
func cmpConfigLess(first structs.ConfigEntry, second structs.ConfigEntry) bool {
return first.GetKind() < second.GetKind() || (first.GetKind() == second.GetKind() && first.GetName() < second.GetName())
}
func configSort(configs []structs.ConfigEntry) {
sort.Slice(configs, func(i, j int) bool {
return cmpConfigLess(configs[i], configs[j])
})
}
func diffConfigEntries(local []structs.ConfigEntry, remote []structs.ConfigEntry, lastRemoteIndex uint64) ([]structs.ConfigEntry, []structs.ConfigEntry) {
configSort(local)
configSort(remote)
var deletions []structs.ConfigEntry
var updates []structs.ConfigEntry
var localIdx int
var remoteIdx int
for localIdx, remoteIdx = 0, 0; localIdx < len(local) && remoteIdx < len(remote); {
if local[localIdx].GetKind() == remote[remoteIdx].GetKind() && local[localIdx].GetName() == remote[remoteIdx].GetName() {
// config is in both the local and remote state - need to check raft indices
if remote[remoteIdx].GetRaftIndex().ModifyIndex > lastRemoteIndex {
updates = append(updates, remote[remoteIdx])
}
// increment both indices when equal
localIdx += 1
remoteIdx += 1
} else if cmpConfigLess(local[localIdx], remote[remoteIdx]) {
// config no longer in remoted state - needs deleting
deletions = append(deletions, local[localIdx])
// increment just the local index
localIdx += 1
} else {
// local state doesn't have this config - needs updating
updates = append(updates, remote[remoteIdx])
// increment just the remote index
remoteIdx += 1
}
}
for ; localIdx < len(local); localIdx += 1 {
deletions = append(deletions, local[localIdx])
}
for ; remoteIdx < len(remote); remoteIdx += 1 {
updates = append(updates, remote[remoteIdx])
}
return deletions, updates
}
func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.ConfigEntry, op structs.ConfigEntryOp) (bool, error) {
ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit))
defer ticker.Stop()
for i, entry := range configs {
req := structs.ConfigEntryRequest{
Op: op,
Datacenter: s.config.Datacenter,
Entry: entry,
}
resp, err := s.raftApply(structs.ConfigEntryRequestType, &req)
if err != nil {
return false, fmt.Errorf("Failed to apply config %s: %v", op, err)
}
if respErr, ok := resp.(error); ok && err != nil {
return false, fmt.Errorf("Failed to apply config %s: %v", op, respErr)
}
if i < len(configs)-1 {
select {
case <-ctx.Done():
return true, nil
case <-ticker.C:
// do nothing - ready for the next batch
}
}
}
return false, nil
}
func (s *Server) fetchConfigEntries(lastRemoteIndex uint64) (*structs.IndexedGenericConfigEntries, error) {
defer metrics.MeasureSince([]string{"leader", "replication", "config-entries", "fetch"}, time.Now())
req := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter,
QueryOptions: structs.QueryOptions{
AllowStale: true,
MinQueryIndex: lastRemoteIndex,
Token: s.tokens.ReplicationToken(),
},
}
var response structs.IndexedGenericConfigEntries
if err := s.RPC("ConfigEntry.ListAll", &req, &response); err != nil {
return nil, err
}
return &response, nil
}
func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
remote, err := s.fetchConfigEntries(lastRemoteIndex)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve remote config entries: %v", err)
}
s.logger.Printf("[DEBUG] replication: finished fetching config entries: %d", len(remote.Entries))
// Need to check if we should be stopping. This will be common as the fetching process is a blocking
// RPC which could have been hanging around for a long time and during that time leadership could
// have been lost.
select {
case <-ctx.Done():
return 0, true, nil
default:
// do nothing
}
// Measure everything after the remote query, which can block for long
// periods of time. This metric is a good measure of how expensive the
// replication process is.
defer metrics.MeasureSince([]string{"leader", "replication", "config", "apply"}, time.Now())
_, local, err := s.fsm.State().ConfigEntries(nil)
if err != nil {
return 0, false, fmt.Errorf("failed to retrieve local config entries: %v", err)
}
// If the remote index ever goes backwards, it's a good indication that
// the remote side was rebuilt and we should do a full sync since we
// can't make any assumptions about what's going on.
//
// Resetting lastRemoteIndex to 0 will work because we never consider local
// raft indices. Instead we compare the raft modify index in the response object
// with the lastRemoteIndex (only when we already have a config entry of the same kind/name)
// to determine if an update is needed. Resetting lastRemoteIndex to 0 then has the affect
// of making us think all the local state is out of date and any matching entries should
// still be updated.
//
// The lastRemoteIndex is not used when the entry exists either only in the local state or
// only in the remote state. In those situations we need to either delete it or create it.
if remote.QueryMeta.Index < lastRemoteIndex {
s.logger.Printf("[WARN] replication: Config Entry replication remote index moved backwards (%d to %d), forcing a full Config Entry sync", lastRemoteIndex, remote.QueryMeta.Index)
lastRemoteIndex = 0
}
s.logger.Printf("[DEBUG] replication: Config Entry replication - local: %d, remote: %d", len(local), len(remote.Entries))
// Calculate the changes required to bring the state into sync and then
// apply them.
deletions, updates := diffConfigEntries(local, remote.Entries, lastRemoteIndex)
s.logger.Printf("[DEBUG] replication: Config Entry replication - deletions: %d, updates: %d", len(deletions), len(updates))
if len(deletions) > 0 {
s.logger.Printf("[DEBUG] replication: Config Entry replication - performing %d deletions", len(deletions))
exit, err := s.reconcileLocalConfig(ctx, deletions, structs.ConfigEntryDelete)
if exit {
return 0, true, nil
}
if err != nil {
return 0, false, fmt.Errorf("failed to delete local config entries: %v", err)
}
s.logger.Printf("[DEBUG] replication: Config Entry replication - finished deletions")
}
if len(updates) > 0 {
s.logger.Printf("[DEBUG] replication: Config Entry replication - performing %d updates", len(updates))
exit, err := s.reconcileLocalConfig(ctx, updates, structs.ConfigEntryUpsert)
if exit {
return 0, true, nil
}
if err != nil {
return 0, false, fmt.Errorf("failed to update local config entries: %v", err)
}
s.logger.Printf("[DEBUG] replication: Config Entry replication - finished updates")
}
// Return the index we got back from the remote side, since we've synced
// up with the remote state as of that index.
return remote.QueryMeta.Index, false, nil
}

View File

@ -0,0 +1,165 @@
package consul
import (
"fmt"
"os"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)
func TestReplication_ConfigEntries(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.ConfigReplicationRate = 100
c.ConfigReplicationBurst = 100
c.ConfigReplicationApplyLimit = 1000000
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s1.RPC, "dc2")
// Create some new configuration entries
var entries []structs.ConfigEntry
for i := 0; i < 50; i++ {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("svc-%d", i),
Protocol: "tcp",
},
}
var out struct{}
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
entries = append(entries, arg.Entry)
}
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"foo": "bar",
"bar": 1,
},
},
}
var out struct{}
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
entries = append(entries, arg.Entry)
checkSame := func(t *retry.R) error {
_, remote, err := s1.fsm.State().ConfigEntries(nil)
require.NoError(t, err)
_, local, err := s2.fsm.State().ConfigEntries(nil)
require.NoError(t, err)
require.Len(t, local, len(remote))
for i, entry := range remote {
require.Equal(t, entry.GetKind(), local[i].GetKind())
require.Equal(t, entry.GetName(), local[i].GetName())
// more validations
switch entry.GetKind() {
case structs.ServiceDefaults:
localSvc, ok := local[i].(*structs.ServiceConfigEntry)
require.True(t, ok)
remoteSvc, ok := entry.(*structs.ServiceConfigEntry)
require.True(t, ok)
require.Equal(t, remoteSvc.Protocol, localSvc.Protocol)
require.Equal(t, remoteSvc.Connect, localSvc.Connect)
case structs.ProxyDefaults:
localProxy, ok := local[i].(*structs.ProxyConfigEntry)
require.True(t, ok)
remoteProxy, ok := entry.(*structs.ProxyConfigEntry)
require.True(t, ok)
require.Equal(t, remoteProxy.Config, localProxy.Config)
}
}
return nil
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
// Update those policies
for i := 0; i < 50; i++ {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("svc-%d", i),
Protocol: "udp",
},
}
var out struct{}
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
}
arg = structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryUpsert,
Entry: &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"foo": "baz",
"baz": 2,
},
},
}
require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out))
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
for _, entry := range entries {
arg := structs.ConfigEntryRequest{
Datacenter: "dc1",
Op: structs.ConfigEntryDelete,
Entry: entry,
}
var out struct{}
require.NoError(t, s1.RPC("ConfigEntry.Delete", &arg, &out))
}
// Wait for the replica to converge.
retry.Run(t, func(r *retry.R) {
checkSame(r)
})
}

View File

@ -269,6 +269,8 @@ func (s *Server) establishLeadership() error {
return err
}
s.startConfigReplication()
s.startEnterpriseLeader()
s.startCARootPruning()
@ -289,6 +291,8 @@ func (s *Server) revokeLeadership() error {
return err
}
s.stopConfigReplication()
s.stopEnterpriseLeader()
s.stopCARootPruning()
@ -848,6 +852,20 @@ func (s *Server) stopACLReplication() {
s.aclReplicationEnabled = false
}
func (s *Server) startConfigReplication() {
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
// replication shouldn't run in the primary DC
return
}
s.configReplicator.Start()
}
func (s *Server) stopConfigReplication() {
// will be a no-op when not started
s.configReplicator.Stop()
}
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config {
state := s.fsm.State()

155
agent/consul/replication.go Normal file
View File

@ -0,0 +1,155 @@
package consul
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/hashicorp/consul/lib"
"golang.org/x/time/rate"
)
const (
// replicationMaxRetryWait is the maximum number of seconds to wait between
// failed blocking queries when backing off.
replicationDefaultMaxRetryWait = 120 * time.Second
replicationDefaultRate = 1
)
type ReplicatorConfig struct {
// Name to be used in various logging
Name string
// Function to perform the actual replication
ReplicateFn ReplicatorFunc
// The number of replication rounds per second that are allowed
Rate int
// The number of replication rounds that can be done in a burst
Burst int
// Minimum number of RPC failures to ignore before backing off
MinFailures int
// Maximum wait time between failing RPCs
MaxRetryWait time.Duration
// Where to send our logs
Logger *log.Logger
}
type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64) (index uint64, exit bool, err error)
type Replicator struct {
name string
lock sync.RWMutex
running bool
cancel context.CancelFunc
ctx context.Context
limiter *rate.Limiter
waiter *lib.RetryWaiter
replicate ReplicatorFunc
logger *log.Logger
}
func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
if config == nil {
return nil, fmt.Errorf("Cannot create the Replicator without a config")
}
if config.ReplicateFn == nil {
return nil, fmt.Errorf("Cannot create the Replicator without a ReplicateFn set in the config")
}
if config.Logger == nil {
config.Logger = log.New(os.Stderr, "", log.LstdFlags)
}
ctx, cancel := context.WithCancel(context.Background())
limiter := rate.NewLimiter(rate.Limit(config.Rate), config.Burst)
maxWait := config.MaxRetryWait
if maxWait == 0 {
maxWait = replicationDefaultMaxRetryWait
}
minFailures := config.MinFailures
if minFailures < 0 {
minFailures = 0
}
waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10))
return &Replicator{
name: config.Name,
running: false,
cancel: cancel,
ctx: ctx,
limiter: limiter,
waiter: waiter,
replicate: config.ReplicateFn,
logger: config.Logger,
}, nil
}
func (r *Replicator) Start() {
r.lock.Lock()
defer r.lock.Unlock()
if r.running {
return
}
go r.run()
r.running = true
r.logger.Printf("[INFO] replication: started %s replication", r.name)
}
func (r *Replicator) run() {
var lastRemoteIndex uint64
defer r.logger.Printf("[INFO] replication: stopped %s replication", r.name)
for {
// This ensures we aren't doing too many successful replication rounds - mostly useful when
// the data within the primary datacenter is changing rapidly but we try to limit the amount
// of resources replication into the secondary datacenter should take
if err := r.limiter.Wait(r.ctx); err != nil {
return
}
// Perform a single round of replication
index, exit, err := r.replicate(r.ctx, lastRemoteIndex)
if exit {
// the replication function told us to exit
return
}
if err != nil {
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
// the next round of replication
lastRemoteIndex = 0
r.logger.Printf("[WARN] replication: %s replication error (will retry if still leader): %v", r.name, err)
} else {
lastRemoteIndex = index
r.logger.Printf("[DEBUG] replication: %s replication completed through remote index %d", r.name, index)
}
select {
case <-r.ctx.Done():
return
// wait some amount of time to prevent churning through many replication rounds while replication is failing
case <-r.waiter.WaitIfErr(err):
// do nothing
}
}
}
func (r *Replicator) Stop() {
r.lock.Lock()
defer r.lock.Unlock()
if !r.running {
return
}
r.logger.Printf("[DEBUG] replication: stopping %s replication", r.name)
r.cancel()
r.cancel = nil
r.running = false
}

View File

@ -138,6 +138,10 @@ type Server struct {
// Consul configuration
config *Config
// configReplicator is used to manage the leaders replication routines for
// centralized config
configReplicator *Replicator
// tokens holds ACL tokens initially from the configuration, but can
// be updated at runtime, so should always be used instead of going to
// the configuration directly.
@ -347,6 +351,19 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store, tl
return nil, err
}
configReplicatorConfig := ReplicatorConfig{
Name: "Config Entry",
ReplicateFn: s.replicateConfig,
Rate: s.config.ConfigReplicationRate,
Burst: s.config.ConfigReplicationBurst,
Logger: logger,
}
s.configReplicator, err = NewReplicator(&configReplicatorConfig)
if err != nil {
s.Shutdown()
return nil, err
}
// Initialize the stats fetcher that autopilot will use.
s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)

View File

@ -107,7 +107,7 @@ func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string) (uint64, struc
// ConfigEntries is called to get all config entry objects.
func (s *Store) ConfigEntries(ws memdb.WatchSet) (uint64, []structs.ConfigEntry, error) {
return s.ConfigEntriesByKind(nil, "")
return s.ConfigEntriesByKind(ws, "")
}
// ConfigEntriesByKind is called to get all config entry objects with the given kind.

View File

@ -1298,6 +1298,73 @@ func (c *IndexedConfigEntries) UnmarshalBinary(data []byte) error {
return nil
}
type IndexedGenericConfigEntries struct {
Entries []ConfigEntry
QueryMeta
}
func (c *IndexedGenericConfigEntries) MarshalBinary() (data []byte, err error) {
// bs will grow if needed but allocate enough to avoid reallocation in common
// case.
bs := make([]byte, 128)
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
if err := enc.Encode(len(c.Entries)); err != nil {
return nil, err
}
for _, entry := range c.Entries {
if err := enc.Encode(entry.GetKind()); err != nil {
return nil, err
}
if err := enc.Encode(entry); err != nil {
return nil, err
}
}
if err := enc.Encode(c.QueryMeta); err != nil {
return nil, err
}
return bs, nil
}
func (c *IndexedGenericConfigEntries) UnmarshalBinary(data []byte) error {
// First decode the number of entries.
var numEntries int
dec := codec.NewDecoderBytes(data, msgpackHandle)
if err := dec.Decode(&numEntries); err != nil {
return err
}
// Then decode the slice of ConfigEntries
c.Entries = make([]ConfigEntry, numEntries)
for i := 0; i < numEntries; i++ {
var kind string
if err := dec.Decode(&kind); err != nil {
return err
}
entry, err := MakeConfigEntry(kind, "")
if err != nil {
return err
}
if err := dec.Decode(entry); err != nil {
return err
}
c.Entries[i] = entry
}
if err := dec.Decode(&c.QueryMeta); err != nil {
return err
}
return nil
}
// DirEntry is used to represent a directory entry. This is
// used for values in our Key-Value store.
type DirEntry struct {

156
lib/retry.go Normal file
View File

@ -0,0 +1,156 @@
package lib
import (
"time"
)
const (
defaultMinFailures = 0
defaultMaxWait = 2 * time.Minute
)
// Interface used for offloading jitter calculations from the RetryWaiter
type Jitter interface {
AddJitter(baseTime time.Duration) time.Duration
}
// Calculates a random jitter between 0 and up to a specific percentage of the baseTime
type JitterRandomStagger struct {
// int64 because we are going to be doing math against an int64 to represent nanoseconds
percent int64
}
// Creates a new JitterRandomStagger
func NewJitterRandomStagger(percent int) *JitterRandomStagger {
if percent < 0 {
percent = 0
}
return &JitterRandomStagger{
percent: int64(percent),
}
}
// Implments the Jitter interface
func (j *JitterRandomStagger) AddJitter(baseTime time.Duration) time.Duration {
if j.percent == 0 {
return baseTime
}
// time.Duration is actually a type alias for int64 which is why casting
// to the duration type and then dividing works
return baseTime + RandomStagger((baseTime*time.Duration(j.percent))/100)
}
// RetryWaiter will record failed and successful operations and provide
// a channel to wait on before a failed operation can be retried.
type RetryWaiter struct {
minFailures uint
minWait time.Duration
maxWait time.Duration
jitter Jitter
failures uint
}
// Creates a new RetryWaiter
func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *RetryWaiter {
if minFailures < 0 {
minFailures = defaultMinFailures
}
if maxWait <= 0 {
maxWait = defaultMaxWait
}
if minWait <= 0 {
minWait = 0 * time.Nanosecond
}
return &RetryWaiter{
minFailures: uint(minFailures),
minWait: minWait,
maxWait: maxWait,
failures: 0,
jitter: jitter,
}
}
// calculates the necessary wait time before the
// next operation should be allowed.
func (rw *RetryWaiter) calculateWait() time.Duration {
waitTime := rw.minWait
if rw.failures > rw.minFailures {
shift := rw.failures - rw.minFailures - 1
waitTime = rw.maxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > rw.maxWait {
waitTime = rw.maxWait
}
if rw.jitter != nil {
waitTime = rw.jitter.AddJitter(waitTime)
}
}
if waitTime < rw.minWait {
waitTime = rw.minWait
}
return waitTime
}
// calculates the waitTime and returns a chan
// that will become selectable once that amount
// of time has elapsed.
func (rw *RetryWaiter) wait() <-chan struct{} {
waitTime := rw.calculateWait()
ch := make(chan struct{})
if waitTime > 0 {
time.AfterFunc(waitTime, func() { close(ch) })
} else {
// if there should be 0 wait time then we ensure
// that the chan will be immediately selectable
close(ch)
}
return ch
}
// Marks that an operation is successful which resets the failure count.
// The chan that is returned will be immediately selectable
func (rw *RetryWaiter) Success() <-chan struct{} {
rw.Reset()
return rw.wait()
}
// Marks that an operation failed. The chan returned will be selectable
// once the calculated retry wait amount of time has elapsed
func (rw *RetryWaiter) Failed() <-chan struct{} {
rw.failures += 1
ch := rw.wait()
return ch
}
// Resets the internal failure counter
func (rw *RetryWaiter) Reset() {
rw.failures = 0
}
// WaitIf is a convenice method to record whether the last
// operation was a success or failure and return a chan that
// will be selectablw when the next operation can be done.
func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} {
if failure {
return rw.Failed()
}
return rw.Success()
}
// WaitIfErr is a convenience method to record whether the last
// operation was a success or failure based on whether the err
// is nil and then return a chan that will be selectable when
// the next operation can be done.
func (rw *RetryWaiter) WaitIfErr(err error) <-chan struct{} {
return rw.WaitIf(err != nil)
}

184
lib/retry_test.go Normal file
View File

@ -0,0 +1,184 @@
package lib
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestJitterRandomStagger(t *testing.T) {
t.Parallel()
t.Run("0 percent", func(t *testing.T) {
t.Parallel()
jitter := NewJitterRandomStagger(0)
for i := 0; i < 10; i++ {
baseTime := time.Duration(i) * time.Second
require.Equal(t, baseTime, jitter.AddJitter(baseTime))
}
})
t.Run("10 percent", func(t *testing.T) {
t.Parallel()
jitter := NewJitterRandomStagger(10)
for i := 0; i < 10; i++ {
baseTime := 5000 * time.Millisecond
maxTime := 5500 * time.Millisecond
newTime := jitter.AddJitter(baseTime)
require.True(t, newTime > baseTime)
require.True(t, newTime <= maxTime)
}
})
t.Run("100 percent", func(t *testing.T) {
t.Parallel()
jitter := NewJitterRandomStagger(100)
for i := 0; i < 10; i++ {
baseTime := 1234 * time.Millisecond
maxTime := 2468 * time.Millisecond
newTime := jitter.AddJitter(baseTime)
require.True(t, newTime > baseTime)
require.True(t, newTime <= maxTime)
}
})
}
func TestRetryWaiter_calculateWait(t *testing.T) {
t.Parallel()
t.Run("Defaults", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 0, 0, nil)
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
rw.failures += 1
require.Equal(t, 1*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 2*time.Second, rw.calculateWait())
rw.failures = 31
require.Equal(t, defaultMaxWait, rw.calculateWait())
})
t.Run("Minimum Wait", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 5*time.Second, 0, nil)
require.Equal(t, 5*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 5*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 5*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 5*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 8*time.Second, rw.calculateWait())
})
t.Run("Minimum Failures", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(5, 0, 0, nil)
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
rw.failures += 5
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
rw.failures += 1
require.Equal(t, 1*time.Second, rw.calculateWait())
})
t.Run("Maximum Wait", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 0, 5*time.Second, nil)
require.Equal(t, 0*time.Nanosecond, rw.calculateWait())
rw.failures += 1
require.Equal(t, 1*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 2*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 4*time.Second, rw.calculateWait())
rw.failures += 1
require.Equal(t, 5*time.Second, rw.calculateWait())
rw.failures = 31
require.Equal(t, 5*time.Second, rw.calculateWait())
})
}
func TestRetryWaiter_WaitChans(t *testing.T) {
t.Parallel()
t.Run("Minimum Wait - Success", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil)
select {
case <-time.After(200 * time.Millisecond):
case <-rw.Success():
require.Fail(t, "minimum wait not respected")
}
})
t.Run("Minimum Wait - WaitIf", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil)
select {
case <-time.After(200 * time.Millisecond):
case <-rw.WaitIf(false):
require.Fail(t, "minimum wait not respected")
}
})
t.Run("Minimum Wait - WaitIfErr", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil)
select {
case <-time.After(200 * time.Millisecond):
case <-rw.WaitIfErr(nil):
require.Fail(t, "minimum wait not respected")
}
})
t.Run("Maximum Wait - Failed", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil)
select {
case <-time.After(500 * time.Millisecond):
require.Fail(t, "maximum wait not respected")
case <-rw.Failed():
}
})
t.Run("Maximum Wait - WaitIf", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil)
select {
case <-time.After(500 * time.Millisecond):
require.Fail(t, "maximum wait not respected")
case <-rw.WaitIf(true):
}
})
t.Run("Maximum Wait - WaitIfErr", func(t *testing.T) {
t.Parallel()
rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil)
select {
case <-time.After(500 * time.Millisecond):
require.Fail(t, "maximum wait not respected")
case <-rw.WaitIfErr(fmt.Errorf("Fake Error")):
}
})
}