consul/agent/setup.go

542 lines
18 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package agent
import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
wal "github.com/hashicorp/raft-wal"
"github.com/hashicorp/raft-wal/verifier"
"google.golang.org/grpc/grpclog"
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/consul/xdscapacity"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
grpcInt "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/balancer"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
grpcWare "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/hoststats"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
)
// TODO: BaseDeps should be renamed in the future once more of Agent.Start
// has been moved out in front of Agent.New, and we can better see the setup
// dependencies.
type BaseDeps struct {
consul.Deps // TODO: un-embed
RuntimeConfig *config.RuntimeConfig
MetricsConfig *lib.MetricsConfig
AutoConfig *autoconf.AutoConfig // TODO: use an interface
Cache *cache.Cache
LeafCertManager *leafcert.Manager
ViewStore *submatview.Store
WatchedFiles []string
NetRPC *LazyNetRPC
deregisterBalancer, deregisterResolver func()
stopHostCollector context.CancelFunc
}
type NetRPC interface {
RPC(ctx context.Context, method string, args any, reply any) error
}
type LazyNetRPC struct {
mu sync.RWMutex
rpc NetRPC
}
func (r *LazyNetRPC) SetNetRPC(rpc NetRPC) {
r.mu.Lock()
defer r.mu.Unlock()
r.rpc = rpc
}
func (r *LazyNetRPC) RPC(ctx context.Context, method string, args any, reply any) error {
r.mu.RLock()
r2 := r.rpc
r.mu.RUnlock()
if r2 == nil {
return errors.New("rpc: initialization ordering error; net-rpc not ready yet")
}
return r2.RPC(ctx, method, args, reply)
}
type ConfigLoader func(source config.Source) (config.LoadResult, error)
func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hclog.InterceptLogger) (BaseDeps, error) {
d := BaseDeps{}
result, err := configLoader(nil)
if err != nil {
return d, err
}
d.WatchedFiles = result.WatchedFiles
d.Experiments = result.RuntimeConfig.Experiments
cfg := result.RuntimeConfig
logConf := cfg.Logging
logConf.Name = logging.Agent
if providedLogger != nil {
d.Logger = providedLogger
} else {
d.Logger, err = logging.Setup(logConf, logOut)
if err != nil {
return d, err
}
}
grpcLogInitOnce.Do(func() {
grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger))
})
for _, w := range result.Warnings {
d.Logger.Warn(w)
}
cfg.NodeID, err = newNodeIDFromConfig(cfg, d.Logger)
if err != nil {
return d, fmt.Errorf("failed to setup node ID: %w", err)
}
isServer := result.RuntimeConfig.ServerMode
gauges, counters, summaries := getPrometheusDefs(cfg, isServer)
cfg.Telemetry.PrometheusOpts.GaugeDefinitions = gauges
cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters
cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries
var extraSinks []metrics.MetricSink
if cfg.IsCloudEnabled() {
// This values is set late within newNodeIDFromConfig above
cfg.Cloud.NodeID = cfg.NodeID
d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.DataDir)
if err != nil {
return d, err
}
if d.HCP.Sink != nil {
extraSinks = append(extraSinks, d.HCP.Sink)
}
}
d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger, extraSinks...)
if err != nil {
return d, fmt.Errorf("failed to initialize telemetry: %w", err)
}
if !cfg.Telemetry.Disable && cfg.Telemetry.EnableHostMetrics {
ctx, cancel := context.WithCancel(context.Background())
hoststats.NewCollector(ctx, d.Logger, cfg.DataDir)
d.stopHostCollector = cancel
}
d.TLSConfigurator, err = tlsutil.NewConfigurator(cfg.TLS, d.Logger)
if err != nil {
return d, err
}
d.RuntimeConfig = cfg
d.Tokens = new(token.Store)
cfg.Cache.Logger = d.Logger.Named("cache")
// cache-types are not registered yet, but they won't be used until the components are started.
d.Cache = cache.New(cfg.Cache)
d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore"))
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
d.NetRPC = &LazyNetRPC{}
// TODO: create leafCertManager in BaseDeps once NetRPC is available without Agent
d.LeafCertManager = leafcert.NewManager(leafcert.Deps{
Logger: d.Logger.Named("leaf-certs"),
CertSigner: leafcert.NewNetRPCCertSigner(d.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(d.Cache, cfg.Datacenter),
Config: leafcert.Config{
TestOverrideCAChangeInitialDelay: cfg.ConnectTestCALeafRootChangeSpread,
},
})
// Set the leaf cert manager in the embedded deps type so it can be used by consul servers.
d.Deps.LeafCertManager = d.LeafCertManager
agentType := "client"
if cfg.ServerMode {
agentType = "server"
}
resolverBuilder := resolver.NewServerResolverBuilder(resolver.Config{
AgentType: agentType,
Datacenter: cfg.Datacenter,
// Set the authority to something sufficiently unique so any usage in
// tests would be self-isolating in the global resolver map, while also
// not incurring a huge penalty for non-test code.
Authority: cfg.Datacenter + "." + string(cfg.NodeID),
})
resolver.Register(resolverBuilder)
d.deregisterResolver = func() {
resolver.Deregister(resolverBuilder.Authority())
}
balancerBuilder := balancer.NewBuilder(
resolverBuilder.Authority(),
d.Logger.Named("grpc.balancer"),
)
balancerBuilder.Register()
d.deregisterBalancer = balancerBuilder.Deregister
d.GRPCConnPool = grpcInt.NewClientConnPool(grpcInt.ClientConnPoolConfig{
Servers: resolverBuilder,
SrcAddr: d.ConnPool.SrcAddr,
TLSWrapper: grpcInt.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()),
ALPNWrapper: grpcInt.ALPNWrapper(d.TLSConfigurator.OutgoingALPNRPCWrapper()),
UseTLSForDC: d.TLSConfigurator.UseTLS,
DialingFromServer: cfg.ServerMode,
DialingFromDatacenter: cfg.Datacenter,
})
d.LeaderForwarder = resolverBuilder
d.Router = router.NewRouter(
d.Logger,
cfg.Datacenter,
fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter),
grpcInt.NewTracker(resolverBuilder, balancerBuilder),
)
// this needs to happen prior to creating auto-config as some of the dependencies
// must also be passed to auto-config
d, err = initEnterpriseBaseDeps(d, cfg)
if err != nil {
return d, err
}
acConf := autoconf.Config{
DirectRPC: d.ConnPool,
Logger: d.Logger,
Loader: configLoader,
ServerProvider: d.Router,
TLSConfigurator: d.TLSConfigurator,
Cache: d.Cache,
LeafCertManager: d.LeafCertManager,
Tokens: d.Tokens,
EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps, cfg),
}
d.AutoConfig, err = autoconf.New(acConf)
if err != nil {
return d, err
}
d.NewRequestRecorderFunc = middleware.NewRequestRecorder
d.GetNetRPCInterceptorFunc = middleware.GetNetRPCInterceptor
d.EventPublisher = stream.NewEventPublisher(10 * time.Second)
d.XDSStreamLimiter = limiter.NewSessionLimiter()
d.Registry = consul.NewTypeRegistry()
return d, nil
}
// Close cleans up any state and goroutines associated to bd's members not
// handled by something else (e.g. the agent stop channel).
func (bd BaseDeps) Close() {
bd.AutoConfig.Stop()
bd.LeafCertManager.Stop()
bd.MetricsConfig.Cancel()
if bd.HCP.Sink != nil {
bd.HCP.Sink.Shutdown()
}
for _, fn := range []func(){bd.deregisterBalancer, bd.deregisterResolver, bd.stopHostCollector} {
if fn != nil {
fn()
}
}
}
// grpcLogInitOnce because the test suite will call NewBaseDeps in many tests and
// causes data races when it is re-initialized.
var grpcLogInitOnce sync.Once
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
var rpcSrcAddr *net.TCPAddr
if !ipaddr.IsAny(config.RPCBindAddr) {
rpcSrcAddr = &net.TCPAddr{IP: config.RPCBindAddr.IP}
}
pool := &pool.ConnPool{
Server: config.ServerMode,
SrcAddr: rpcSrcAddr,
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
RPCHoldTimeout: config.RPCHoldTimeout,
MaxQueryTime: config.MaxQueryTime,
DefaultQueryTime: config.DefaultQueryTime,
}
pool.SetRPCClientTimeout(config.RPCClientTimeout)
if config.ServerMode {
pool.MaxTime = 2 * time.Minute
pool.MaxStreams = 64
} else {
// MaxTime controls how long we keep an idle connection open to a server.
// 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
pool.MaxTime = 127 * time.Second
pool.MaxStreams = 32
}
return pool
}
// getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends
// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics.
func getPrometheusDefs(cfg *config.RuntimeConfig, isServer bool) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {
// TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
raftGauges := []prometheus.GaugeDefinition{
{
Name: []string{"raft", "fsm", "lastRestoreDuration"},
Help: "This measures how long the last FSM restore (from disk or leader) took.",
},
{
Name: []string{"raft", "leader", "oldestLogAge"},
Help: "This measures how old the oldest log in the leader's log store is.",
},
}
serverGauges := []prometheus.GaugeDefinition{
{
Name: []string{"server", "isLeader"},
Help: "Tracks if the server is a leader.",
},
}
// Build slice of slices for all gauge definitions
var gauges = [][]prometheus.GaugeDefinition{
cache.Gauges,
consul.RPCGauges,
consul.SessionGauges,
grpcWare.StatsGauges,
xds.StatsGauges,
usagemetrics.Gauges,
consul.ReplicationGauges,
CertExpirationGauges,
Gauges,
raftGauges,
serverGauges,
}
if cfg.Telemetry.EnableHostMetrics {
gauges = append(gauges, hoststats.Gauges)
}
// TODO(ffmmm): conditionally add only leader specific metrics to gauges, counters, summaries, etc
if isServer {
gauges = append(gauges,
consul.AutopilotGauges,
consul.LeaderCertExpirationGauges,
consul.LeaderPeeringMetrics,
xdscapacity.StatsGauges,
)
}
if isServer && cfg.RaftLogStoreConfig.Verification.Enabled {
verifierGauges := make([]prometheus.GaugeDefinition, 0)
for _, d := range verifier.MetricDefinitions.Gauges {
verifierGauges = append(verifierGauges, prometheus.GaugeDefinition{
Name: []string{"raft", "logstore", "verifier", d.Name},
Help: d.Desc,
})
}
gauges = append(gauges, verifierGauges)
}
if isServer &&
(cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendWAL ||
cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendDefault) {
walGauges := make([]prometheus.GaugeDefinition, 0)
for _, d := range wal.MetricDefinitions.Gauges {
walGauges = append(walGauges, prometheus.GaugeDefinition{
Name: []string{"raft", "wal", d.Name},
Help: d.Desc,
})
}
gauges = append(gauges, walGauges)
}
// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
var gaugeDefs []prometheus.GaugeDefinition
for _, g := range gauges {
// Set Consul to each definition's namespace
// TODO(kit): Prepending the service to each definition should be handled by go-metrics
var withService []prometheus.GaugeDefinition
for _, gauge := range g {
gauge.Name = append([]string{cfg.Telemetry.MetricsPrefix}, gauge.Name...)
withService = append(withService, gauge)
}
gaugeDefs = append(gaugeDefs, withService...)
}
raftCounters := []prometheus.CounterDefinition{
// TODO(kit): "raft..." metrics come from the raft lib and we should migrate these to a telemetry
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
{
Name: []string{"raft", "apply"},
Help: "This counts the number of Raft transactions occurring over the interval.",
},
{
Name: []string{"raft", "state", "candidate"},
Help: "This increments whenever a Consul server starts an election.",
},
{
Name: []string{"raft", "state", "leader"},
Help: "This increments whenever a Consul server becomes a leader.",
},
}
var counters = [][]prometheus.CounterDefinition{
CatalogCounters,
cache.Counters,
consul.ACLCounters,
consul.CatalogCounters,
consul.ClientCounters,
consul.RPCCounters,
grpcWare.StatsCounters,
local.StateCounters,
xds.StatsCounters,
raftCounters,
rate.Counters,
}
// For some unknown reason, we seem to add the raft counters above without
// checking if this is a server like we do above for some of the summaries
// above. We should probably fix that but I want to not change behavior right
// now. If we are a server, add summaries for WAL and verifier metrics.
if isServer && cfg.RaftLogStoreConfig.Verification.Enabled {
verifierCounters := make([]prometheus.CounterDefinition, 0)
for _, d := range verifier.MetricDefinitions.Counters {
verifierCounters = append(verifierCounters, prometheus.CounterDefinition{
Name: []string{"raft", "logstore", "verifier", d.Name},
Help: d.Desc,
})
}
counters = append(counters, verifierCounters)
}
if isServer &&
(cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendWAL ||
cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendDefault) {
walCounters := make([]prometheus.CounterDefinition, 0)
for _, d := range wal.MetricDefinitions.Counters {
walCounters = append(walCounters, prometheus.CounterDefinition{
Name: []string{"raft", "wal", d.Name},
Help: d.Desc,
})
}
counters = append(counters, walCounters)
}
// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
var counterDefs []prometheus.CounterDefinition
for _, c := range counters {
// TODO(kit): Prepending the service to each definition should be handled by go-metrics
var withService []prometheus.CounterDefinition
for _, counter := range c {
counter.Name = append([]string{cfg.Telemetry.MetricsPrefix}, counter.Name...)
withService = append(withService, counter)
}
counterDefs = append(counterDefs, withService...)
}
raftSummaries := []prometheus.SummaryDefinition{
// TODO(kit): "raft..." metrics come from the raft lib and we should migrate these to a telemetry
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
{
Name: []string{"raft", "commitTime"},
Help: "This measures the time it takes to commit a new entry to the Raft log on the leader.",
},
{
Name: []string{"raft", "leader", "lastContact"},
Help: "Measures the time since the leader was last able to contact the follower nodes when checking its leader lease.",
},
{
Name: []string{"raft", "snapshot", "persist"},
Help: "Measures the time it takes raft to write a new snapshot to disk.",
},
{
Name: []string{"raft", "rpc", "installSnapshot"},
Help: "Measures the time it takes the raft leader to install a snapshot on a follower that is catching up after being down or has just joined the cluster.",
},
}
var summaries = [][]prometheus.SummaryDefinition{
HTTPSummaries,
consul.ACLSummaries,
consul.ACLEndpointSummaries,
consul.CatalogSummaries,
consul.FederationStateSummaries,
consul.IntentionSummaries,
consul.KVSummaries,
consul.LeaderSummaries,
consul.PreparedQuerySummaries,
consul.RPCSummaries,
consul.SegmentCESummaries,
consul.SessionSummaries,
consul.SessionEndpointSummaries,
consul.TxnSummaries,
fsm.CommandsSummaries,
fsm.SnapshotSummaries,
raftSummaries,
xds.StatsSummaries,
}
// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
var summaryDefs []prometheus.SummaryDefinition
for _, s := range summaries {
// TODO(kit): Prepending the service to each definition should be handled by go-metrics
var withService []prometheus.SummaryDefinition
for _, summary := range s {
summary.Name = append([]string{cfg.Telemetry.MetricsPrefix}, summary.Name...)
withService = append(withService, summary)
}
summaryDefs = append(summaryDefs, withService...)
}
return gaugeDefs, counterDefs, summaryDefs
}