From 5befe0f5d5a53014ec363574d588d566b5786693 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Fri, 26 Apr 2019 13:38:39 -0400 Subject: [PATCH] Implement config entry replication (#5706) --- agent/consul/config.go | 57 ++++--- agent/consul/config_endpoint.go | 37 +++++ agent/consul/config_endpoint_test.go | 123 +++++++++++++++ agent/consul/config_replication.go | 199 ++++++++++++++++++++++++ agent/consul/config_replication_test.go | 165 ++++++++++++++++++++ agent/consul/leader.go | 18 +++ agent/consul/replication.go | 155 ++++++++++++++++++ agent/consul/server.go | 17 ++ agent/consul/state/config_entry.go | 2 +- agent/structs/structs.go | 67 ++++++++ lib/retry.go | 156 +++++++++++++++++++ lib/retry_test.go | 184 ++++++++++++++++++++++ 12 files changed, 1159 insertions(+), 21 deletions(-) create mode 100644 agent/consul/config_replication.go create mode 100644 agent/consul/config_replication_test.go create mode 100644 agent/consul/replication.go create mode 100644 lib/retry.go create mode 100644 lib/retry_test.go diff --git a/agent/consul/config.go b/agent/consul/config.go index 23d1e20797..ff2b008377 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -321,6 +321,20 @@ type Config struct { // user events. This function should not block. UserEventHandler func(serf.UserEvent) + // ConfigReplicationRate is the max number of replication rounds that can + // be run per second. Note that either 1 or 2 RPCs are used during each replication + // round + ConfigReplicationRate int + + // ConfigReplicationBurst is how many replication rounds can be bursted after a + // period of idleness + ConfigReplicationBurst int + + // ConfigReplicationApply limit is the max number of replication-related + // apply operations that we allow during a one second period. This is + // used to limit the amount of Raft bandwidth used for replication. + ConfigReplicationApplyLimit int + // CoordinateUpdatePeriod controls how long a server batches coordinate // updates before applying them in a Raft transaction. A larger period // leads to fewer Raft transactions, but also the stored coordinates @@ -432,26 +446,29 @@ func DefaultConfig() *Config { } conf := &Config{ - Build: version.Version, - Datacenter: DefaultDC, - NodeName: hostname, - RPCAddr: DefaultRPCAddr, - RaftConfig: raft.DefaultConfig(), - SerfLANConfig: lib.SerfDefaultConfig(), - SerfWANConfig: lib.SerfDefaultConfig(), - SerfFloodInterval: 60 * time.Second, - ReconcileInterval: 60 * time.Second, - ProtocolVersion: ProtocolVersion2Compatible, - ACLPolicyTTL: 30 * time.Second, - ACLTokenTTL: 30 * time.Second, - ACLDefaultPolicy: "allow", - ACLDownPolicy: "extend-cache", - ACLReplicationRate: 1, - ACLReplicationBurst: 5, - ACLReplicationApplyLimit: 100, // ops / sec - TombstoneTTL: 15 * time.Minute, - TombstoneTTLGranularity: 30 * time.Second, - SessionTTLMin: 10 * time.Second, + Build: version.Version, + Datacenter: DefaultDC, + NodeName: hostname, + RPCAddr: DefaultRPCAddr, + RaftConfig: raft.DefaultConfig(), + SerfLANConfig: lib.SerfDefaultConfig(), + SerfWANConfig: lib.SerfDefaultConfig(), + SerfFloodInterval: 60 * time.Second, + ReconcileInterval: 60 * time.Second, + ProtocolVersion: ProtocolVersion2Compatible, + ACLPolicyTTL: 30 * time.Second, + ACLTokenTTL: 30 * time.Second, + ACLDefaultPolicy: "allow", + ACLDownPolicy: "extend-cache", + ACLReplicationRate: 1, + ACLReplicationBurst: 5, + ACLReplicationApplyLimit: 100, // ops / sec + ConfigReplicationRate: 1, + ConfigReplicationBurst: 5, + ConfigReplicationApplyLimit: 100, // ops / sec + TombstoneTTL: 15 * time.Minute, + TombstoneTTLGranularity: 30 * time.Second, + SessionTTLMin: 10 * time.Second, // These are tuned to provide a total throughput of 128 updates // per second. If you update these, you should update the client- diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 720e0df034..31f67354c0 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -132,6 +132,43 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe }) } +// ListAll returns all the known configuration entries +func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.IndexedGenericConfigEntries) error { + if done, err := c.srv.forward("ConfigEntry.ListAll", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now()) + + // Fetch the ACL token, if any. + rule, err := c.srv.ResolveToken(args.Token) + if err != nil { + return err + } + + return c.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, entries, err := state.ConfigEntries(ws) + if err != nil { + return err + } + + // Filter the entries returned by ACL permissions. + filteredEntries := make([]structs.ConfigEntry, 0, len(entries)) + for _, entry := range entries { + if rule != nil && !entry.CanRead(rule) { + continue + } + filteredEntries = append(filteredEntries, entry) + } + + reply.Entries = filteredEntries + reply.Index = index + return nil + }) +} + // Delete deletes a config entry. func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) error { if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done { diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 12c2a43a75..5bf31eab42 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -274,6 +274,49 @@ func TestConfigEntry_List(t *testing.T) { require.Equal(expected, out) } +func TestConfigEntry_ListAll(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Create some dummy services in the state store to look up. + state := s1.fsm.State() + expected := structs.IndexedGenericConfigEntries{ + Entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + }, + }, + } + require.NoError(state.EnsureConfigEntry(1, expected.Entries[0])) + require.NoError(state.EnsureConfigEntry(2, expected.Entries[1])) + require.NoError(state.EnsureConfigEntry(3, expected.Entries[2])) + + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var out structs.IndexedGenericConfigEntries + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ListAll", &args, &out)) + + expected.QueryMeta = out.QueryMeta + require.Equal(expected, out) +} + func TestConfigEntry_List_ACLDeny(t *testing.T) { t.Parallel() @@ -355,6 +398,86 @@ operator = "read" require.Equal(structs.ProxyDefaults, proxyConf.Kind) } +func TestConfigEntry_ListAll_ACLDeny(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + codec := rpcClient(t, s1) + defer codec.Close() + + // Create the ACL. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTokenTypeClient, + Rules: ` +service "foo" { + policy = "read" +} +operator = "read" +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // Create some dummy service/proxy configs to be looked up. + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + })) + require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + })) + require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "db", + })) + + // This should filter out the "db" service since we don't have permissions for it. + args := structs.ConfigEntryQuery{ + Datacenter: s1.config.Datacenter, + QueryOptions: structs.QueryOptions{Token: id}, + } + var out structs.IndexedGenericConfigEntries + err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.ListAll", &args, &out) + require.NoError(err) + require.Len(out.Entries, 2) + svcIndex := 0 + proxyIndex := 1 + if out.Entries[0].GetKind() == structs.ProxyDefaults { + svcIndex = 1 + proxyIndex = 0 + } + + svcConf, ok := out.Entries[svcIndex].(*structs.ServiceConfigEntry) + require.True(ok) + proxyConf, ok := out.Entries[proxyIndex].(*structs.ProxyConfigEntry) + require.True(ok) + + require.Equal("foo", svcConf.Name) + require.Equal(structs.ServiceDefaults, svcConf.Kind) + require.Equal(structs.ProxyConfigGlobal, proxyConf.Name) + require.Equal(structs.ProxyDefaults, proxyConf.Kind) +} + func TestConfigEntry_Delete(t *testing.T) { t.Parallel() diff --git a/agent/consul/config_replication.go b/agent/consul/config_replication.go new file mode 100644 index 0000000000..64ebd1e66e --- /dev/null +++ b/agent/consul/config_replication.go @@ -0,0 +1,199 @@ +package consul + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/structs" +) + +func cmpConfigLess(first structs.ConfigEntry, second structs.ConfigEntry) bool { + return first.GetKind() < second.GetKind() || (first.GetKind() == second.GetKind() && first.GetName() < second.GetName()) +} + +func configSort(configs []structs.ConfigEntry) { + sort.Slice(configs, func(i, j int) bool { + return cmpConfigLess(configs[i], configs[j]) + }) +} + +func diffConfigEntries(local []structs.ConfigEntry, remote []structs.ConfigEntry, lastRemoteIndex uint64) ([]structs.ConfigEntry, []structs.ConfigEntry) { + configSort(local) + configSort(remote) + + var deletions []structs.ConfigEntry + var updates []structs.ConfigEntry + var localIdx int + var remoteIdx int + for localIdx, remoteIdx = 0, 0; localIdx < len(local) && remoteIdx < len(remote); { + if local[localIdx].GetKind() == remote[remoteIdx].GetKind() && local[localIdx].GetName() == remote[remoteIdx].GetName() { + // config is in both the local and remote state - need to check raft indices + if remote[remoteIdx].GetRaftIndex().ModifyIndex > lastRemoteIndex { + updates = append(updates, remote[remoteIdx]) + } + // increment both indices when equal + localIdx += 1 + remoteIdx += 1 + } else if cmpConfigLess(local[localIdx], remote[remoteIdx]) { + // config no longer in remoted state - needs deleting + deletions = append(deletions, local[localIdx]) + + // increment just the local index + localIdx += 1 + } else { + // local state doesn't have this config - needs updating + updates = append(updates, remote[remoteIdx]) + + // increment just the remote index + remoteIdx += 1 + } + } + + for ; localIdx < len(local); localIdx += 1 { + deletions = append(deletions, local[localIdx]) + } + + for ; remoteIdx < len(remote); remoteIdx += 1 { + updates = append(updates, remote[remoteIdx]) + } + + return deletions, updates +} + +func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.ConfigEntry, op structs.ConfigEntryOp) (bool, error) { + ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit)) + defer ticker.Stop() + + for i, entry := range configs { + req := structs.ConfigEntryRequest{ + Op: op, + Datacenter: s.config.Datacenter, + Entry: entry, + } + + resp, err := s.raftApply(structs.ConfigEntryRequestType, &req) + if err != nil { + return false, fmt.Errorf("Failed to apply config %s: %v", op, err) + } + if respErr, ok := resp.(error); ok && err != nil { + return false, fmt.Errorf("Failed to apply config %s: %v", op, respErr) + } + + if i < len(configs)-1 { + select { + case <-ctx.Done(): + return true, nil + case <-ticker.C: + // do nothing - ready for the next batch + } + } + } + + return false, nil +} + +func (s *Server) fetchConfigEntries(lastRemoteIndex uint64) (*structs.IndexedGenericConfigEntries, error) { + defer metrics.MeasureSince([]string{"leader", "replication", "config-entries", "fetch"}, time.Now()) + + req := structs.DCSpecificRequest{ + Datacenter: s.config.PrimaryDatacenter, + QueryOptions: structs.QueryOptions{ + AllowStale: true, + MinQueryIndex: lastRemoteIndex, + Token: s.tokens.ReplicationToken(), + }, + } + + var response structs.IndexedGenericConfigEntries + if err := s.RPC("ConfigEntry.ListAll", &req, &response); err != nil { + return nil, err + } + + return &response, nil +} + +func (s *Server) replicateConfig(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) { + remote, err := s.fetchConfigEntries(lastRemoteIndex) + if err != nil { + return 0, false, fmt.Errorf("failed to retrieve remote config entries: %v", err) + } + + s.logger.Printf("[DEBUG] replication: finished fetching config entries: %d", len(remote.Entries)) + + // Need to check if we should be stopping. This will be common as the fetching process is a blocking + // RPC which could have been hanging around for a long time and during that time leadership could + // have been lost. + select { + case <-ctx.Done(): + return 0, true, nil + default: + // do nothing + } + + // Measure everything after the remote query, which can block for long + // periods of time. This metric is a good measure of how expensive the + // replication process is. + defer metrics.MeasureSince([]string{"leader", "replication", "config", "apply"}, time.Now()) + + _, local, err := s.fsm.State().ConfigEntries(nil) + if err != nil { + return 0, false, fmt.Errorf("failed to retrieve local config entries: %v", err) + } + + // If the remote index ever goes backwards, it's a good indication that + // the remote side was rebuilt and we should do a full sync since we + // can't make any assumptions about what's going on. + // + // Resetting lastRemoteIndex to 0 will work because we never consider local + // raft indices. Instead we compare the raft modify index in the response object + // with the lastRemoteIndex (only when we already have a config entry of the same kind/name) + // to determine if an update is needed. Resetting lastRemoteIndex to 0 then has the affect + // of making us think all the local state is out of date and any matching entries should + // still be updated. + // + // The lastRemoteIndex is not used when the entry exists either only in the local state or + // only in the remote state. In those situations we need to either delete it or create it. + if remote.QueryMeta.Index < lastRemoteIndex { + s.logger.Printf("[WARN] replication: Config Entry replication remote index moved backwards (%d to %d), forcing a full Config Entry sync", lastRemoteIndex, remote.QueryMeta.Index) + lastRemoteIndex = 0 + } + + s.logger.Printf("[DEBUG] replication: Config Entry replication - local: %d, remote: %d", len(local), len(remote.Entries)) + // Calculate the changes required to bring the state into sync and then + // apply them. + deletions, updates := diffConfigEntries(local, remote.Entries, lastRemoteIndex) + + s.logger.Printf("[DEBUG] replication: Config Entry replication - deletions: %d, updates: %d", len(deletions), len(updates)) + + if len(deletions) > 0 { + s.logger.Printf("[DEBUG] replication: Config Entry replication - performing %d deletions", len(deletions)) + + exit, err := s.reconcileLocalConfig(ctx, deletions, structs.ConfigEntryDelete) + if exit { + return 0, true, nil + } + if err != nil { + return 0, false, fmt.Errorf("failed to delete local config entries: %v", err) + } + s.logger.Printf("[DEBUG] replication: Config Entry replication - finished deletions") + } + + if len(updates) > 0 { + s.logger.Printf("[DEBUG] replication: Config Entry replication - performing %d updates", len(updates)) + exit, err := s.reconcileLocalConfig(ctx, updates, structs.ConfigEntryUpsert) + if exit { + return 0, true, nil + } + if err != nil { + return 0, false, fmt.Errorf("failed to update local config entries: %v", err) + } + s.logger.Printf("[DEBUG] replication: Config Entry replication - finished updates") + } + + // Return the index we got back from the remote side, since we've synced + // up with the remote state as of that index. + return remote.QueryMeta.Index, false, nil +} diff --git a/agent/consul/config_replication_test.go b/agent/consul/config_replication_test.go new file mode 100644 index 0000000000..d53892b9b3 --- /dev/null +++ b/agent/consul/config_replication_test.go @@ -0,0 +1,165 @@ +package consul + +import ( + "fmt" + "os" + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" +) + +func TestReplication_ConfigEntries(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + client := rpcClient(t, s1) + defer client.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.ConfigReplicationRate = 100 + c.ConfigReplicationBurst = 100 + c.ConfigReplicationApplyLimit = 1000000 + }) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join. + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s1.RPC, "dc2") + + // Create some new configuration entries + var entries []structs.ConfigEntry + for i := 0; i < 50; i++ { + arg := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryUpsert, + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf("svc-%d", i), + Protocol: "tcp", + }, + } + + var out struct{} + require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out)) + entries = append(entries, arg.Entry) + } + + arg := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryUpsert, + Entry: &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "foo": "bar", + "bar": 1, + }, + }, + } + + var out struct{} + require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out)) + entries = append(entries, arg.Entry) + + checkSame := func(t *retry.R) error { + _, remote, err := s1.fsm.State().ConfigEntries(nil) + require.NoError(t, err) + _, local, err := s2.fsm.State().ConfigEntries(nil) + require.NoError(t, err) + + require.Len(t, local, len(remote)) + for i, entry := range remote { + require.Equal(t, entry.GetKind(), local[i].GetKind()) + require.Equal(t, entry.GetName(), local[i].GetName()) + + // more validations + switch entry.GetKind() { + case structs.ServiceDefaults: + localSvc, ok := local[i].(*structs.ServiceConfigEntry) + require.True(t, ok) + remoteSvc, ok := entry.(*structs.ServiceConfigEntry) + require.True(t, ok) + + require.Equal(t, remoteSvc.Protocol, localSvc.Protocol) + require.Equal(t, remoteSvc.Connect, localSvc.Connect) + case structs.ProxyDefaults: + localProxy, ok := local[i].(*structs.ProxyConfigEntry) + require.True(t, ok) + remoteProxy, ok := entry.(*structs.ProxyConfigEntry) + require.True(t, ok) + + require.Equal(t, remoteProxy.Config, localProxy.Config) + } + } + return nil + } + + // Wait for the replica to converge. + retry.Run(t, func(r *retry.R) { + checkSame(r) + }) + + // Update those policies + for i := 0; i < 50; i++ { + arg := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryUpsert, + Entry: &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: fmt.Sprintf("svc-%d", i), + Protocol: "udp", + }, + } + + var out struct{} + require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out)) + } + + arg = structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryUpsert, + Entry: &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "foo": "baz", + "baz": 2, + }, + }, + } + + require.NoError(t, s1.RPC("ConfigEntry.Apply", &arg, &out)) + + // Wait for the replica to converge. + retry.Run(t, func(r *retry.R) { + checkSame(r) + }) + + for _, entry := range entries { + arg := structs.ConfigEntryRequest{ + Datacenter: "dc1", + Op: structs.ConfigEntryDelete, + Entry: entry, + } + + var out struct{} + require.NoError(t, s1.RPC("ConfigEntry.Delete", &arg, &out)) + } + + // Wait for the replica to converge. + retry.Run(t, func(r *retry.R) { + checkSame(r) + }) +} diff --git a/agent/consul/leader.go b/agent/consul/leader.go index a44a0116f6..5abf12d607 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -269,6 +269,8 @@ func (s *Server) establishLeadership() error { return err } + s.startConfigReplication() + s.startEnterpriseLeader() s.startCARootPruning() @@ -289,6 +291,8 @@ func (s *Server) revokeLeadership() error { return err } + s.stopConfigReplication() + s.stopEnterpriseLeader() s.stopCARootPruning() @@ -848,6 +852,20 @@ func (s *Server) stopACLReplication() { s.aclReplicationEnabled = false } +func (s *Server) startConfigReplication() { + if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter { + // replication shouldn't run in the primary DC + return + } + + s.configReplicator.Start() +} + +func (s *Server) stopConfigReplication() { + // will be a no-op when not started + s.configReplicator.Stop() +} + // getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary func (s *Server) getOrCreateAutopilotConfig() *autopilot.Config { state := s.fsm.State() diff --git a/agent/consul/replication.go b/agent/consul/replication.go new file mode 100644 index 0000000000..789df61e65 --- /dev/null +++ b/agent/consul/replication.go @@ -0,0 +1,155 @@ +package consul + +import ( + "context" + "fmt" + "log" + "os" + "sync" + "time" + + "github.com/hashicorp/consul/lib" + "golang.org/x/time/rate" +) + +const ( + // replicationMaxRetryWait is the maximum number of seconds to wait between + // failed blocking queries when backing off. + replicationDefaultMaxRetryWait = 120 * time.Second + + replicationDefaultRate = 1 +) + +type ReplicatorConfig struct { + // Name to be used in various logging + Name string + // Function to perform the actual replication + ReplicateFn ReplicatorFunc + // The number of replication rounds per second that are allowed + Rate int + // The number of replication rounds that can be done in a burst + Burst int + // Minimum number of RPC failures to ignore before backing off + MinFailures int + // Maximum wait time between failing RPCs + MaxRetryWait time.Duration + // Where to send our logs + Logger *log.Logger +} + +type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64) (index uint64, exit bool, err error) + +type Replicator struct { + name string + lock sync.RWMutex + running bool + cancel context.CancelFunc + ctx context.Context + limiter *rate.Limiter + waiter *lib.RetryWaiter + replicate ReplicatorFunc + logger *log.Logger +} + +func NewReplicator(config *ReplicatorConfig) (*Replicator, error) { + if config == nil { + return nil, fmt.Errorf("Cannot create the Replicator without a config") + } + if config.ReplicateFn == nil { + return nil, fmt.Errorf("Cannot create the Replicator without a ReplicateFn set in the config") + } + if config.Logger == nil { + config.Logger = log.New(os.Stderr, "", log.LstdFlags) + } + ctx, cancel := context.WithCancel(context.Background()) + limiter := rate.NewLimiter(rate.Limit(config.Rate), config.Burst) + + maxWait := config.MaxRetryWait + if maxWait == 0 { + maxWait = replicationDefaultMaxRetryWait + } + + minFailures := config.MinFailures + if minFailures < 0 { + minFailures = 0 + } + waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10)) + return &Replicator{ + name: config.Name, + running: false, + cancel: cancel, + ctx: ctx, + limiter: limiter, + waiter: waiter, + replicate: config.ReplicateFn, + logger: config.Logger, + }, nil +} + +func (r *Replicator) Start() { + r.lock.Lock() + defer r.lock.Unlock() + + if r.running { + return + } + + go r.run() + + r.running = true + r.logger.Printf("[INFO] replication: started %s replication", r.name) +} + +func (r *Replicator) run() { + var lastRemoteIndex uint64 + + defer r.logger.Printf("[INFO] replication: stopped %s replication", r.name) + + for { + // This ensures we aren't doing too many successful replication rounds - mostly useful when + // the data within the primary datacenter is changing rapidly but we try to limit the amount + // of resources replication into the secondary datacenter should take + if err := r.limiter.Wait(r.ctx); err != nil { + return + } + + // Perform a single round of replication + index, exit, err := r.replicate(r.ctx, lastRemoteIndex) + if exit { + // the replication function told us to exit + return + } + + if err != nil { + // reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during + // the next round of replication + lastRemoteIndex = 0 + r.logger.Printf("[WARN] replication: %s replication error (will retry if still leader): %v", r.name, err) + } else { + lastRemoteIndex = index + r.logger.Printf("[DEBUG] replication: %s replication completed through remote index %d", r.name, index) + } + + select { + case <-r.ctx.Done(): + return + // wait some amount of time to prevent churning through many replication rounds while replication is failing + case <-r.waiter.WaitIfErr(err): + // do nothing + } + } +} + +func (r *Replicator) Stop() { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.running { + return + } + + r.logger.Printf("[DEBUG] replication: stopping %s replication", r.name) + r.cancel() + r.cancel = nil + r.running = false +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 330321f08a..b1596a82ec 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -138,6 +138,10 @@ type Server struct { // Consul configuration config *Config + // configReplicator is used to manage the leaders replication routines for + // centralized config + configReplicator *Replicator + // tokens holds ACL tokens initially from the configuration, but can // be updated at runtime, so should always be used instead of going to // the configuration directly. @@ -347,6 +351,19 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store, tl return nil, err } + configReplicatorConfig := ReplicatorConfig{ + Name: "Config Entry", + ReplicateFn: s.replicateConfig, + Rate: s.config.ConfigReplicationRate, + Burst: s.config.ConfigReplicationBurst, + Logger: logger, + } + s.configReplicator, err = NewReplicator(&configReplicatorConfig) + if err != nil { + s.Shutdown() + return nil, err + } + // Initialize the stats fetcher that autopilot will use. s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 82eb805091..ad90b18183 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -107,7 +107,7 @@ func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string) (uint64, struc // ConfigEntries is called to get all config entry objects. func (s *Store) ConfigEntries(ws memdb.WatchSet) (uint64, []structs.ConfigEntry, error) { - return s.ConfigEntriesByKind(nil, "") + return s.ConfigEntriesByKind(ws, "") } // ConfigEntriesByKind is called to get all config entry objects with the given kind. diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 94d5b564bf..c1acf7a5b4 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1298,6 +1298,73 @@ func (c *IndexedConfigEntries) UnmarshalBinary(data []byte) error { return nil } +type IndexedGenericConfigEntries struct { + Entries []ConfigEntry + QueryMeta +} + +func (c *IndexedGenericConfigEntries) MarshalBinary() (data []byte, err error) { + // bs will grow if needed but allocate enough to avoid reallocation in common + // case. + bs := make([]byte, 128) + enc := codec.NewEncoderBytes(&bs, msgpackHandle) + + if err := enc.Encode(len(c.Entries)); err != nil { + return nil, err + } + + for _, entry := range c.Entries { + if err := enc.Encode(entry.GetKind()); err != nil { + return nil, err + } + if err := enc.Encode(entry); err != nil { + return nil, err + } + } + + if err := enc.Encode(c.QueryMeta); err != nil { + return nil, err + } + + return bs, nil +} + +func (c *IndexedGenericConfigEntries) UnmarshalBinary(data []byte) error { + // First decode the number of entries. + var numEntries int + dec := codec.NewDecoderBytes(data, msgpackHandle) + if err := dec.Decode(&numEntries); err != nil { + return err + } + + // Then decode the slice of ConfigEntries + c.Entries = make([]ConfigEntry, numEntries) + for i := 0; i < numEntries; i++ { + var kind string + if err := dec.Decode(&kind); err != nil { + return err + } + + entry, err := MakeConfigEntry(kind, "") + if err != nil { + return err + } + + if err := dec.Decode(entry); err != nil { + return err + } + + c.Entries[i] = entry + } + + if err := dec.Decode(&c.QueryMeta); err != nil { + return err + } + + return nil + +} + // DirEntry is used to represent a directory entry. This is // used for values in our Key-Value store. type DirEntry struct { diff --git a/lib/retry.go b/lib/retry.go new file mode 100644 index 0000000000..59cb91c753 --- /dev/null +++ b/lib/retry.go @@ -0,0 +1,156 @@ +package lib + +import ( + "time" +) + +const ( + defaultMinFailures = 0 + defaultMaxWait = 2 * time.Minute +) + +// Interface used for offloading jitter calculations from the RetryWaiter +type Jitter interface { + AddJitter(baseTime time.Duration) time.Duration +} + +// Calculates a random jitter between 0 and up to a specific percentage of the baseTime +type JitterRandomStagger struct { + // int64 because we are going to be doing math against an int64 to represent nanoseconds + percent int64 +} + +// Creates a new JitterRandomStagger +func NewJitterRandomStagger(percent int) *JitterRandomStagger { + if percent < 0 { + percent = 0 + } + + return &JitterRandomStagger{ + percent: int64(percent), + } +} + +// Implments the Jitter interface +func (j *JitterRandomStagger) AddJitter(baseTime time.Duration) time.Duration { + if j.percent == 0 { + return baseTime + } + + // time.Duration is actually a type alias for int64 which is why casting + // to the duration type and then dividing works + return baseTime + RandomStagger((baseTime*time.Duration(j.percent))/100) +} + +// RetryWaiter will record failed and successful operations and provide +// a channel to wait on before a failed operation can be retried. +type RetryWaiter struct { + minFailures uint + minWait time.Duration + maxWait time.Duration + jitter Jitter + failures uint +} + +// Creates a new RetryWaiter +func NewRetryWaiter(minFailures int, minWait, maxWait time.Duration, jitter Jitter) *RetryWaiter { + if minFailures < 0 { + minFailures = defaultMinFailures + } + + if maxWait <= 0 { + maxWait = defaultMaxWait + } + + if minWait <= 0 { + minWait = 0 * time.Nanosecond + } + + return &RetryWaiter{ + minFailures: uint(minFailures), + minWait: minWait, + maxWait: maxWait, + failures: 0, + jitter: jitter, + } +} + +// calculates the necessary wait time before the +// next operation should be allowed. +func (rw *RetryWaiter) calculateWait() time.Duration { + waitTime := rw.minWait + if rw.failures > rw.minFailures { + shift := rw.failures - rw.minFailures - 1 + waitTime = rw.maxWait + if shift < 31 { + waitTime = (1 << shift) * time.Second + } + if waitTime > rw.maxWait { + waitTime = rw.maxWait + } + + if rw.jitter != nil { + waitTime = rw.jitter.AddJitter(waitTime) + } + } + + if waitTime < rw.minWait { + waitTime = rw.minWait + } + + return waitTime +} + +// calculates the waitTime and returns a chan +// that will become selectable once that amount +// of time has elapsed. +func (rw *RetryWaiter) wait() <-chan struct{} { + waitTime := rw.calculateWait() + ch := make(chan struct{}) + if waitTime > 0 { + time.AfterFunc(waitTime, func() { close(ch) }) + } else { + // if there should be 0 wait time then we ensure + // that the chan will be immediately selectable + close(ch) + } + return ch +} + +// Marks that an operation is successful which resets the failure count. +// The chan that is returned will be immediately selectable +func (rw *RetryWaiter) Success() <-chan struct{} { + rw.Reset() + return rw.wait() +} + +// Marks that an operation failed. The chan returned will be selectable +// once the calculated retry wait amount of time has elapsed +func (rw *RetryWaiter) Failed() <-chan struct{} { + rw.failures += 1 + ch := rw.wait() + return ch +} + +// Resets the internal failure counter +func (rw *RetryWaiter) Reset() { + rw.failures = 0 +} + +// WaitIf is a convenice method to record whether the last +// operation was a success or failure and return a chan that +// will be selectablw when the next operation can be done. +func (rw *RetryWaiter) WaitIf(failure bool) <-chan struct{} { + if failure { + return rw.Failed() + } + return rw.Success() +} + +// WaitIfErr is a convenience method to record whether the last +// operation was a success or failure based on whether the err +// is nil and then return a chan that will be selectable when +// the next operation can be done. +func (rw *RetryWaiter) WaitIfErr(err error) <-chan struct{} { + return rw.WaitIf(err != nil) +} diff --git a/lib/retry_test.go b/lib/retry_test.go new file mode 100644 index 0000000000..325b5b9526 --- /dev/null +++ b/lib/retry_test.go @@ -0,0 +1,184 @@ +package lib + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestJitterRandomStagger(t *testing.T) { + t.Parallel() + + t.Run("0 percent", func(t *testing.T) { + t.Parallel() + jitter := NewJitterRandomStagger(0) + for i := 0; i < 10; i++ { + baseTime := time.Duration(i) * time.Second + require.Equal(t, baseTime, jitter.AddJitter(baseTime)) + } + }) + + t.Run("10 percent", func(t *testing.T) { + t.Parallel() + jitter := NewJitterRandomStagger(10) + for i := 0; i < 10; i++ { + baseTime := 5000 * time.Millisecond + maxTime := 5500 * time.Millisecond + newTime := jitter.AddJitter(baseTime) + require.True(t, newTime > baseTime) + require.True(t, newTime <= maxTime) + } + }) + + t.Run("100 percent", func(t *testing.T) { + t.Parallel() + jitter := NewJitterRandomStagger(100) + for i := 0; i < 10; i++ { + baseTime := 1234 * time.Millisecond + maxTime := 2468 * time.Millisecond + newTime := jitter.AddJitter(baseTime) + require.True(t, newTime > baseTime) + require.True(t, newTime <= maxTime) + } + }) +} + +func TestRetryWaiter_calculateWait(t *testing.T) { + t.Parallel() + + t.Run("Defaults", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 0, 0, nil) + + require.Equal(t, 0*time.Nanosecond, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 1*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 2*time.Second, rw.calculateWait()) + rw.failures = 31 + require.Equal(t, defaultMaxWait, rw.calculateWait()) + }) + + t.Run("Minimum Wait", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 5*time.Second, 0, nil) + + require.Equal(t, 5*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 5*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 5*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 5*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 8*time.Second, rw.calculateWait()) + }) + + t.Run("Minimum Failures", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(5, 0, 0, nil) + require.Equal(t, 0*time.Nanosecond, rw.calculateWait()) + rw.failures += 5 + require.Equal(t, 0*time.Nanosecond, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 1*time.Second, rw.calculateWait()) + }) + + t.Run("Maximum Wait", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 0, 5*time.Second, nil) + require.Equal(t, 0*time.Nanosecond, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 1*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 2*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 4*time.Second, rw.calculateWait()) + rw.failures += 1 + require.Equal(t, 5*time.Second, rw.calculateWait()) + rw.failures = 31 + require.Equal(t, 5*time.Second, rw.calculateWait()) + }) +} + +func TestRetryWaiter_WaitChans(t *testing.T) { + t.Parallel() + + t.Run("Minimum Wait - Success", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil) + + select { + case <-time.After(200 * time.Millisecond): + case <-rw.Success(): + require.Fail(t, "minimum wait not respected") + } + }) + + t.Run("Minimum Wait - WaitIf", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil) + + select { + case <-time.After(200 * time.Millisecond): + case <-rw.WaitIf(false): + require.Fail(t, "minimum wait not respected") + } + }) + + t.Run("Minimum Wait - WaitIfErr", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 250*time.Millisecond, 0, nil) + + select { + case <-time.After(200 * time.Millisecond): + case <-rw.WaitIfErr(nil): + require.Fail(t, "minimum wait not respected") + } + }) + + t.Run("Maximum Wait - Failed", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil) + + select { + case <-time.After(500 * time.Millisecond): + require.Fail(t, "maximum wait not respected") + case <-rw.Failed(): + } + }) + + t.Run("Maximum Wait - WaitIf", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil) + + select { + case <-time.After(500 * time.Millisecond): + require.Fail(t, "maximum wait not respected") + case <-rw.WaitIf(true): + } + }) + + t.Run("Maximum Wait - WaitIfErr", func(t *testing.T) { + t.Parallel() + + rw := NewRetryWaiter(0, 0, 250*time.Millisecond, nil) + + select { + case <-time.After(500 * time.Millisecond): + require.Fail(t, "maximum wait not respected") + case <-rw.WaitIfErr(fmt.Errorf("Fake Error")): + } + }) +}