mirror of https://github.com/status-im/consul.git
Merge pull request #10103 from hashicorp/backport/1.8.x/10073
[Backport/1.8.x] Backport #10073
This commit is contained in:
commit
86cd4fbe8a
|
@ -574,7 +574,7 @@ jobs:
|
||||||
- checkout
|
- checkout
|
||||||
- add_ssh_keys: # needs a key to push updated static asset commit back to github
|
- add_ssh_keys: # needs a key to push updated static asset commit back to github
|
||||||
fingerprints:
|
fingerprints:
|
||||||
- "3d:6b:98:55:78:4e:52:17:4e:17:ba:f3:bf:0b:96:2a"
|
- "fc:55:84:15:0a:1d:c8:e9:06:d0:e8:9c:7b:a9:b7:31"
|
||||||
- attach_workspace:
|
- attach_workspace:
|
||||||
at: .
|
at: .
|
||||||
- run:
|
- run:
|
||||||
|
@ -583,9 +583,9 @@ jobs:
|
||||||
# check if there are any changes in ui-v2/
|
# check if there are any changes in ui-v2/
|
||||||
# if there are, we commit the ui static asset file
|
# if there are, we commit the ui static asset file
|
||||||
# HEAD^! is shorthand for HEAD^..HEAD (parent of HEAD and HEAD)
|
# HEAD^! is shorthand for HEAD^..HEAD (parent of HEAD and HEAD)
|
||||||
if ! git diff --quiet --exit-code HEAD^! ui-v2/; then
|
if ! git diff --quiet --exit-code HEAD^! ui/; then
|
||||||
git config --local user.email "hashicorp-ci@users.noreply.github.com"
|
git config --local user.email "github-team-consul-core@hashicorp.com"
|
||||||
git config --local user.name "hashicorp-ci"
|
git config --local user.name "hc-github-team-consul-core"
|
||||||
|
|
||||||
short_sha=$(git rev-parse --short HEAD)
|
short_sha=$(git rev-parse --short HEAD)
|
||||||
git add agent/bindata_assetfs.go
|
git add agent/bindata_assetfs.go
|
||||||
|
@ -741,7 +741,7 @@ jobs:
|
||||||
- checkout
|
- checkout
|
||||||
- add_ssh_keys: # needs a key to push cherry-picked commits back to github
|
- add_ssh_keys: # needs a key to push cherry-picked commits back to github
|
||||||
fingerprints:
|
fingerprints:
|
||||||
- "3d:6b:98:55:78:4e:52:17:4e:17:ba:f3:bf:0b:96:2a"
|
- "fc:55:84:15:0a:1d:c8:e9:06:d0:e8:9c:7b:a9:b7:31"
|
||||||
- run: .circleci/scripts/cherry-picker.sh
|
- run: .circleci/scripts/cherry-picker.sh
|
||||||
|
|
||||||
trigger-oss-merge:
|
trigger-oss-merge:
|
||||||
|
|
|
@ -97,8 +97,8 @@ fi
|
||||||
|
|
||||||
# loop through all labels on the PR
|
# loop through all labels on the PR
|
||||||
for label in $labels; do
|
for label in $labels; do
|
||||||
git config --local user.email "hashicorp-ci@users.noreply.github.com"
|
git config --local user.email "github-team-consul-core@hashicorp.com"
|
||||||
git config --local user.name "hashicorp-ci"
|
git config --local user.name "hc-github-team-consul-core"
|
||||||
status "checking label: $label"
|
status "checking label: $label"
|
||||||
# TODO: enable this when replatform is merged into stable-website
|
# TODO: enable this when replatform is merged into stable-website
|
||||||
# if the label matches docs-cherrypick, it will attempt to cherry-pick to stable-website
|
# if the label matches docs-cherrypick, it will attempt to cherry-pick to stable-website
|
||||||
|
|
|
@ -790,10 +790,19 @@ func (s *Server) runLegacyACLReplication(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
|
||||||
|
0,
|
||||||
|
)
|
||||||
lastRemoteIndex = 0
|
lastRemoteIndex = 0
|
||||||
s.updateACLReplicationStatusError()
|
s.updateACLReplicationStatusError()
|
||||||
legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err)
|
legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err)
|
||||||
} else {
|
} else {
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"},
|
||||||
|
float32(index),
|
||||||
|
)
|
||||||
lastRemoteIndex = index
|
lastRemoteIndex = index
|
||||||
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
|
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
|
||||||
legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index)
|
legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index)
|
||||||
|
@ -851,7 +860,7 @@ type replicateFunc func(ctx context.Context, logger hclog.Logger, lastRemoteInde
|
||||||
func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
|
func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
|
||||||
policyLogger := s.aclReplicationLogger(structs.ACLReplicatePolicies.SingularNoun())
|
policyLogger := s.aclReplicationLogger(structs.ACLReplicatePolicies.SingularNoun())
|
||||||
policyLogger.Info("started ACL Policy replication")
|
policyLogger.Info("started ACL Policy replication")
|
||||||
return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies)
|
return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies, "acl-policies")
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function is only intended to be run as a managed go routine, it will block until
|
// This function is only intended to be run as a managed go routine, it will block until
|
||||||
|
@ -859,7 +868,7 @@ func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
|
||||||
func (s *Server) runACLRoleReplicator(ctx context.Context) error {
|
func (s *Server) runACLRoleReplicator(ctx context.Context) error {
|
||||||
roleLogger := s.aclReplicationLogger(structs.ACLReplicateRoles.SingularNoun())
|
roleLogger := s.aclReplicationLogger(structs.ACLReplicateRoles.SingularNoun())
|
||||||
roleLogger.Info("started ACL Role replication")
|
roleLogger.Info("started ACL Role replication")
|
||||||
return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles)
|
return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles, "acl-roles")
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function is only intended to be run as a managed go routine, it will block until
|
// This function is only intended to be run as a managed go routine, it will block until
|
||||||
|
@ -867,7 +876,7 @@ func (s *Server) runACLRoleReplicator(ctx context.Context) error {
|
||||||
func (s *Server) runACLTokenReplicator(ctx context.Context) error {
|
func (s *Server) runACLTokenReplicator(ctx context.Context) error {
|
||||||
tokenLogger := s.aclReplicationLogger(structs.ACLReplicateTokens.SingularNoun())
|
tokenLogger := s.aclReplicationLogger(structs.ACLReplicateTokens.SingularNoun())
|
||||||
tokenLogger.Info("started ACL Token replication")
|
tokenLogger.Info("started ACL Token replication")
|
||||||
return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens)
|
return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens, "acl-tokens")
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function is only intended to be run as a managed go routine, it will block until
|
// This function is only intended to be run as a managed go routine, it will block until
|
||||||
|
@ -877,6 +886,7 @@ func (s *Server) runACLReplicator(
|
||||||
logger hclog.Logger,
|
logger hclog.Logger,
|
||||||
replicationType structs.ACLReplicationType,
|
replicationType structs.ACLReplicationType,
|
||||||
replicateFunc replicateFunc,
|
replicateFunc replicateFunc,
|
||||||
|
metricName string,
|
||||||
) error {
|
) error {
|
||||||
var failedAttempts uint
|
var failedAttempts uint
|
||||||
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
|
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
|
||||||
|
@ -897,6 +907,9 @@ func (s *Server) runACLReplicator(
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
|
||||||
|
0,
|
||||||
|
)
|
||||||
lastRemoteIndex = 0
|
lastRemoteIndex = 0
|
||||||
s.updateACLReplicationStatusError()
|
s.updateACLReplicationStatusError()
|
||||||
logger.Warn("ACL replication error (will retry if still leader)",
|
logger.Warn("ACL replication error (will retry if still leader)",
|
||||||
|
@ -913,6 +926,12 @@ func (s *Server) runACLReplicator(
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", metricName, "index"},
|
||||||
|
float32(index),
|
||||||
|
)
|
||||||
lastRemoteIndex = index
|
lastRemoteIndex = index
|
||||||
s.updateACLReplicationStatusIndex(replicationType, index)
|
s.updateACLReplicationStatusIndex(replicationType, index)
|
||||||
logger.Debug("ACL replication completed through remote index",
|
logger.Debug("ACL replication completed through remote index",
|
||||||
|
|
|
@ -23,6 +23,7 @@ const (
|
||||||
|
|
||||||
type ReplicatorDelegate interface {
|
type ReplicatorDelegate interface {
|
||||||
Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (index uint64, exit bool, err error)
|
Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (index uint64, exit bool, err error)
|
||||||
|
MetricName() string
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReplicatorConfig struct {
|
type ReplicatorConfig struct {
|
||||||
|
@ -104,6 +105,9 @@ func (r *Replicator) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"},
|
||||||
|
0,
|
||||||
|
)
|
||||||
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
|
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
|
||||||
// the next round of replication
|
// the next round of replication
|
||||||
atomic.StoreUint64(&r.lastRemoteIndex, 0)
|
atomic.StoreUint64(&r.lastRemoteIndex, 0)
|
||||||
|
@ -112,6 +116,12 @@ func (r *Replicator) Run(ctx context.Context) error {
|
||||||
r.logger.Warn("replication error (will retry if still leader)", "error", err)
|
r.logger.Warn("replication error (will retry if still leader)", "error", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"},
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "index"},
|
||||||
|
float32(index),
|
||||||
|
)
|
||||||
atomic.StoreUint64(&r.lastRemoteIndex, index)
|
atomic.StoreUint64(&r.lastRemoteIndex, index)
|
||||||
r.logger.Debug("replication completed through remote index", "index", index)
|
r.logger.Debug("replication completed through remote index", "index", index)
|
||||||
}
|
}
|
||||||
|
@ -134,6 +144,11 @@ type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64, logger hcl
|
||||||
|
|
||||||
type FunctionReplicator struct {
|
type FunctionReplicator struct {
|
||||||
ReplicateFn ReplicatorFunc
|
ReplicateFn ReplicatorFunc
|
||||||
|
Name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *FunctionReplicator) MetricName() string {
|
||||||
|
return r.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *FunctionReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
|
func (r *FunctionReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
|
||||||
|
@ -177,6 +192,10 @@ type IndexReplicator struct {
|
||||||
Logger hclog.Logger
|
Logger hclog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *IndexReplicator) MetricName() string {
|
||||||
|
return r.Delegate.MetricName()
|
||||||
|
}
|
||||||
|
|
||||||
func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, _ hclog.Logger) (uint64, bool, error) {
|
func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, _ hclog.Logger) (uint64, bool, error) {
|
||||||
fetchStart := time.Now()
|
fetchStart := time.Now()
|
||||||
lenRemote, remote, remoteIndex, err := r.Delegate.FetchRemote(lastRemoteIndex)
|
lenRemote, remote, remoteIndex, err := r.Delegate.FetchRemote(lastRemoteIndex)
|
||||||
|
|
|
@ -20,6 +20,7 @@ func TestReplicationRestart(t *testing.T) {
|
||||||
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
|
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
|
||||||
return 1, false, nil
|
return 1, false, nil
|
||||||
},
|
},
|
||||||
|
Name: "foo",
|
||||||
},
|
},
|
||||||
|
|
||||||
Rate: 1,
|
Rate: 1,
|
||||||
|
|
|
@ -412,7 +412,7 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
||||||
|
|
||||||
configReplicatorConfig := ReplicatorConfig{
|
configReplicatorConfig := ReplicatorConfig{
|
||||||
Name: logging.ConfigEntry,
|
Name: logging.ConfigEntry,
|
||||||
Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig},
|
Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig, Name: "config-entries"},
|
||||||
Rate: s.config.ConfigReplicationRate,
|
Rate: s.config.ConfigReplicationRate,
|
||||||
Burst: s.config.ConfigReplicationBurst,
|
Burst: s.config.ConfigReplicationBurst,
|
||||||
Logger: s.logger,
|
Logger: s.logger,
|
||||||
|
|
Loading…
Reference in New Issue