consul/agent/consul/leader_connect.go
hashicorp-copywrite[bot] 5fb9df1640
[COMPLIANCE] License changes (#18443)
* Adding explicit MPL license for sub-package

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Adding explicit MPL license for sub-package

This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository.

* Updating the license from MPL to Business Source License

Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl.

* add missing license headers

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

* Update copyright file headers to BUSL-1.1

---------

Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 09:12:13 -04:00

321 lines
9.1 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul
import (
"context"
"fmt"
"time"
"golang.org/x/time/rate"
"github.com/hashicorp/go-version"
"github.com/hashicorp/consul/agent/consul/gateways"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
)
const (
// loopRateLimit is the maximum rate per second at which we can rerun CA and intention
// replication watches.
loopRateLimit rate.Limit = 0.2
// retryBucketSize is the maximum number of stored rate limit attempts for looped
// blocking query operations.
retryBucketSize = 5
)
var (
// maxRetryBackoff is the maximum number of seconds to wait between failed blocking
// queries when backing off.
maxRetryBackoff = 256
// minVirtualIPVersion is the minimum version for all Consul servers for virtual IP
// assignment to be enabled.
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
// minVirtualIPVersion is the minimum version for all Consul servers for virtual IP
// assignment to be enabled for terminating gateways.
minVirtualIPTerminatingGatewayVersion = version.Must(version.NewVersion("1.11.2"))
// virtualIPVersionCheckInterval is the frequency we check whether all servers meet
// the minimum version to enable virtual IP assignment for services.
virtualIPVersionCheckInterval = time.Minute
)
// startConnectLeader starts multi-dc connect leader routines.
func (s *Server) startConnectLeader(ctx context.Context) error {
if !s.config.ConnectEnabled {
return nil
}
s.caManager.Start(ctx)
s.leaderRoutineManager.Start(ctx, caRootPruningRoutineName, s.runCARootPruning)
s.leaderRoutineManager.Start(ctx, caRootMetricRoutineName, rootCAExpiryMonitor(s).Monitor)
s.leaderRoutineManager.Start(ctx, caSigningMetricRoutineName, signingCAExpiryMonitor(s).Monitor)
s.leaderRoutineManager.Start(ctx, virtualIPCheckRoutineName, s.runVirtualIPVersionCheck)
s.leaderRoutineManager.Start(ctx, configEntryControllersRoutineName, s.runConfigEntryControllers)
return s.startIntentionConfigEntryMigration(ctx)
}
// stopConnectLeader stops connect specific leader functions.
func (s *Server) stopConnectLeader() {
s.caManager.Stop()
s.leaderRoutineManager.Stop(intentionMigrationRoutineName)
s.leaderRoutineManager.Stop(caRootPruningRoutineName)
s.leaderRoutineManager.Stop(caRootMetricRoutineName)
s.leaderRoutineManager.Stop(caSigningMetricRoutineName)
s.leaderRoutineManager.Stop(virtualIPCheckRoutineName)
s.leaderRoutineManager.Stop(configEntryControllersRoutineName)
}
func (s *Server) runConfigEntryControllers(ctx context.Context) error {
updater := &gateways.Updater{
UpdateWithStatus: func(entry structs.ControlledConfigEntry) error {
_, err := s.leaderRaftApply("ConfigEntry.Apply", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsertWithStatusCAS,
Entry: entry,
})
return err
},
Update: func(entry structs.ConfigEntry) error {
_, err := s.leaderRaftApply("ConfigEntry.Apply", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsertCAS,
Entry: entry,
})
return err
},
Delete: func(entry structs.ConfigEntry) error {
_, err := s.leaderRaftApply("ConfigEntry.Delete", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{
Op: structs.ConfigEntryDeleteCAS,
Entry: entry,
})
return err
},
}
logger := s.logger.Named(logging.APIGatewayController)
return gateways.NewAPIGatewayController(s.fsm, s.publisher, updater, logger).Run(ctx)
}
func (s *Server) runCARootPruning(ctx context.Context) error {
ticker := time.NewTicker(caRootPruneInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := s.pruneCARoots(); err != nil {
s.loggers.Named(logging.Connect).Error("error pruning CA roots", "error", err)
}
}
}
}
// pruneCARoots looks for any CARoots that have been rotated out and expired.
func (s *Server) pruneCARoots() error {
if !s.config.ConnectEnabled {
return nil
}
state := s.fsm.State()
idx, roots, err := state.CARoots(nil)
if err != nil {
return err
}
_, caConf, err := state.CAConfig(nil)
if err != nil {
return err
}
common, err := caConf.GetCommonConfig()
if err != nil {
return err
}
var newRoots structs.CARoots
for _, r := range roots {
if !r.Active && !r.RotatedOutAt.IsZero() && time.Now().Sub(r.RotatedOutAt) > common.LeafCertTTL*2 {
s.loggers.Named(logging.Connect).Info("pruning old unused root CA", "id", r.ID)
continue
}
newRoot := *r
newRoots = append(newRoots, &newRoot)
}
// Return early if there's nothing to remove.
if len(newRoots) == len(roots) {
return nil
}
// Commit the new root state.
var args structs.CARequest
args.Op = structs.CAOpSetRoots
args.Index = idx
args.Roots = newRoots
_, err = s.raftApply(structs.ConnectCARequestType, args)
return err
}
func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error {
// Return early if the flag is already set.
done, err := s.setVirtualIPFlags()
if err != nil {
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
}
if done {
s.leaderRoutineManager.Stop(virtualIPCheckRoutineName)
return nil
}
ticker := time.NewTicker(virtualIPVersionCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
done, err := s.setVirtualIPFlags()
if err != nil {
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
continue
}
if done {
return nil
}
}
}
}
func (s *Server) setVirtualIPFlags() (bool, error) {
virtualIPFlag, err := s.setVirtualIPVersionFlag()
if err != nil {
return false, err
}
terminatingGatewayVirtualIPFlag, err := s.setVirtualIPTerminatingGatewayVersionFlag()
if err != nil {
return false, err
}
return virtualIPFlag && terminatingGatewayVirtualIPFlag, nil
}
func (s *Server) setVirtualIPVersionFlag() (bool, error) {
val, err := s.GetSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
if err != nil {
return false, err
}
if val != "" {
return true, nil
}
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
return false, fmt.Errorf("can't allocate Virtual IPs until all servers >= %s",
minVirtualIPVersion.String())
}
if err := s.SetSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil {
return false, nil
}
return true, nil
}
func (s *Server) setVirtualIPTerminatingGatewayVersionFlag() (bool, error) {
val, err := s.GetSystemMetadata(structs.SystemMetadataTermGatewayVirtualIPsEnabled)
if err != nil {
return false, err
}
if val != "" {
return true, nil
}
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPTerminatingGatewayVersion); !ok {
return false, fmt.Errorf("can't allocate Virtual IPs for terminating gateways until all servers >= %s",
minVirtualIPTerminatingGatewayVersion.String())
}
if err := s.SetSystemMetadataKey(structs.SystemMetadataTermGatewayVirtualIPsEnabled, "true"); err != nil {
return false, nil
}
return true, nil
}
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
// upon errors up to a maximum of maxRetryBackoff seconds.
func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) {
retryLoopBackoffHandleSuccess(ctx, loopFn, errFn, false)
}
func retryLoopBackoffAbortOnSuccess(ctx context.Context, loopFn func() error, errFn func(error)) {
retryLoopBackoffHandleSuccess(ctx, loopFn, errFn, true)
}
func retryLoopBackoffHandleSuccess(ctx context.Context, loopFn func() error, errFn func(error), abortOnSuccess bool) {
var failedAttempts uint
limiter := rate.NewLimiter(loopRateLimit, retryBucketSize)
for {
// Rate limit how often we run the loop
limiter.Wait(ctx)
select {
case <-ctx.Done():
return
default:
}
if (1 << failedAttempts) < maxRetryBackoff {
failedAttempts++
}
retryTime := (1 << failedAttempts) * time.Second
if err := loopFn(); err != nil {
errFn(err)
timer := time.NewTimer(retryTime)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
continue
}
} else if abortOnSuccess {
return
}
// Reset the failed attempts after a successful run.
failedAttempts = 0
}
}
// nextIndexVal computes the next index value to query for, resetting to zero
// if the index went backward.
func nextIndexVal(prevIdx, idx uint64) uint64 {
if prevIdx > idx {
return 0
}
return idx
}
// halfTime returns a duration that is half the time between notBefore and
// notAfter.
func halfTime(notBefore, notAfter time.Time) time.Duration {
interval := notAfter.Sub(notBefore)
return interval / 2
}
// lessThanHalfTimePassed decides if half the time between notBefore and
// notAfter has passed relative to now.
// lessThanHalfTimePassed is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
func lessThanHalfTimePassed(now, notBefore, notAfter time.Time) bool {
t := notBefore.Add(halfTime(notBefore, notAfter))
return t.Sub(now) > 0
}