diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 5a573b25a8..dc88f280db 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -806,10 +806,19 @@ func (s *Server) runLegacyACLReplication(ctx context.Context) error { } if err != nil { + metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"}, + 0, + ) lastRemoteIndex = 0 s.updateACLReplicationStatusError() legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err) } else { + metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"}, + 1, + ) + metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"}, + float32(index), + ) lastRemoteIndex = index s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index) legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index) @@ -867,7 +876,7 @@ type replicateFunc func(ctx context.Context, logger hclog.Logger, lastRemoteInde func (s *Server) runACLPolicyReplicator(ctx context.Context) error { policyLogger := s.aclReplicationLogger(structs.ACLReplicatePolicies.SingularNoun()) policyLogger.Info("started ACL Policy replication") - return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies) + return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies, "acl-policies") } // This function is only intended to be run as a managed go routine, it will block until @@ -875,7 +884,7 @@ func (s *Server) runACLPolicyReplicator(ctx context.Context) error { func (s *Server) runACLRoleReplicator(ctx context.Context) error { roleLogger := s.aclReplicationLogger(structs.ACLReplicateRoles.SingularNoun()) roleLogger.Info("started ACL Role replication") - return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles) + return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles, "acl-roles") } // This function is only intended to be run as a managed go routine, it will block until @@ -883,7 +892,7 @@ func (s *Server) runACLRoleReplicator(ctx context.Context) error { func (s *Server) runACLTokenReplicator(ctx context.Context) error { tokenLogger := s.aclReplicationLogger(structs.ACLReplicateTokens.SingularNoun()) tokenLogger.Info("started ACL Token replication") - return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens) + return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens, "acl-tokens") } // This function is only intended to be run as a managed go routine, it will block until @@ -893,6 +902,7 @@ func (s *Server) runACLReplicator( logger hclog.Logger, replicationType structs.ACLReplicationType, replicateFunc replicateFunc, + metricName string, ) error { var failedAttempts uint limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst) @@ -913,6 +923,9 @@ func (s *Server) runACLReplicator( } if err != nil { + metrics.SetGauge([]string{"leader", "replication", metricName, "status"}, + 0, + ) lastRemoteIndex = 0 s.updateACLReplicationStatusError() logger.Warn("ACL replication error (will retry if still leader)", @@ -929,6 +942,12 @@ func (s *Server) runACLReplicator( // do nothing } } else { + metrics.SetGauge([]string{"leader", "replication", metricName, "status"}, + 1, + ) + metrics.SetGauge([]string{"leader", "replication", metricName, "index"}, + float32(index), + ) lastRemoteIndex = index s.updateACLReplicationStatusIndex(replicationType, index) logger.Debug("ACL replication completed through remote index", diff --git a/agent/consul/replication.go b/agent/consul/replication.go index eb292db59f..9ad3065cf0 100644 --- a/agent/consul/replication.go +++ b/agent/consul/replication.go @@ -22,6 +22,7 @@ const ( type ReplicatorDelegate interface { Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (index uint64, exit bool, err error) + MetricName() string } type ReplicatorConfig struct { @@ -100,6 +101,9 @@ func (r *Replicator) Run(ctx context.Context) error { return nil } if err != nil { + metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"}, + 0, + ) // reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during // the next round of replication atomic.StoreUint64(&r.lastRemoteIndex, 0) @@ -114,6 +118,13 @@ func (r *Replicator) Run(ctx context.Context) error { continue } + metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"}, + 1, + ) + metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "index"}, + float32(index), + ) + atomic.StoreUint64(&r.lastRemoteIndex, index) r.logger.Debug("replication completed through remote index", "index", index) r.waiter.Reset() @@ -128,6 +139,11 @@ type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64, logger hcl type FunctionReplicator struct { ReplicateFn ReplicatorFunc + Name string +} + +func (r *FunctionReplicator) MetricName() string { + return r.Name } func (r *FunctionReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) { @@ -171,6 +187,10 @@ type IndexReplicator struct { Logger hclog.Logger } +func (r *IndexReplicator) MetricName() string { + return r.Delegate.MetricName() +} + func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, _ hclog.Logger) (uint64, bool, error) { fetchStart := time.Now() lenRemote, remote, remoteIndex, err := r.Delegate.FetchRemote(lastRemoteIndex) diff --git a/agent/consul/replication_test.go b/agent/consul/replication_test.go index b70583c23d..468f0d617c 100644 --- a/agent/consul/replication_test.go +++ b/agent/consul/replication_test.go @@ -20,6 +20,7 @@ func TestReplicationRestart(t *testing.T) { ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) { return 1, false, nil }, + Name: "foo", }, Rate: 1, diff --git a/agent/consul/server.go b/agent/consul/server.go index 7c694b3f18..27e993822a 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -400,7 +400,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { configReplicatorConfig := ReplicatorConfig{ Name: logging.ConfigEntry, - Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig}, + Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig, Name: "config-entries"}, Rate: s.config.ConfigReplicationRate, Burst: s.config.ConfigReplicationBurst, Logger: s.logger,