peering: emit exported services count metric (#13811)

Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
alex 2022-07-22 12:05:08 -07:00 committed by GitHub
parent 77917b6b5d
commit 927cee692b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 206 additions and 4 deletions

View File

@ -8,6 +8,8 @@ import (
"fmt"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
@ -27,8 +29,72 @@ import (
"github.com/hashicorp/consul/proto/pbpeerstream"
)
var leaderExportedServicesCountKey = []string{"consul", "peering", "exported_services"}
var LeaderPeeringMetrics = []prometheus.GaugeDefinition{
{
Name: leaderExportedServicesCountKey,
Help: "A gauge that tracks how many services are exported for the peering. " +
"The labels are \"peering\" and, for enterprise, \"partition\". " +
"We emit this metric every 9 seconds",
},
}
func (s *Server) startPeeringStreamSync(ctx context.Context) {
s.leaderRoutineManager.Start(ctx, peeringStreamsRoutineName, s.runPeeringSync)
s.leaderRoutineManager.Start(ctx, peeringStreamsMetricsRoutineName, s.runPeeringMetrics)
}
func (s *Server) runPeeringMetrics(ctx context.Context) error {
ticker := time.NewTicker(s.config.MetricsReportingInterval)
defer ticker.Stop()
logger := s.logger.Named(logging.PeeringMetrics)
defaultMetrics := metrics.Default
for {
select {
case <-ctx.Done():
logger.Info("stopping peering metrics")
// "Zero-out" the metric on exit so that when prometheus scrapes this
// metric from a non-leader, it does not get a stale value.
metrics.SetGauge(leaderExportedServicesCountKey, float32(0))
return nil
case <-ticker.C:
if err := s.emitPeeringMetricsOnce(logger, defaultMetrics()); err != nil {
s.logger.Error("error emitting peering stream metrics", "error", err)
}
}
}
}
func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metrics.Metrics) error {
_, peers, err := s.fsm.State().PeeringList(nil, *structs.NodeEnterpriseMetaInPartition(structs.WildcardSpecifier))
if err != nil {
return err
}
for _, peer := range peers {
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if !found {
logger.Trace("did not find status for", "peer_name", peer.Name)
continue
}
esc := status.GetExportedServicesCount()
part := peer.Partition
labels := []metrics.Label{
{Name: "peer_name", Value: peer.Name},
{Name: "peer_id", Value: peer.ID},
}
if part != "" {
labels = append(labels, metrics.Label{Name: "partition", Value: part})
}
metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels)
}
return nil
}
func (s *Server) runPeeringSync(ctx context.Context) error {
@ -51,6 +117,7 @@ func (s *Server) runPeeringSync(ctx context.Context) error {
func (s *Server) stopPeeringStreamSync() {
// will be a no-op when not started
s.leaderRoutineManager.Stop(peeringStreamsRoutineName)
s.leaderRoutineManager.Stop(peeringStreamsMetricsRoutineName)
}
// syncPeeringsAndBlock is a long-running goroutine that is responsible for watching

View File

@ -4,10 +4,12 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
@ -615,7 +617,7 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
return lastIdx
}
// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe
// TODO(peering): once we move away from keeping state in stream tracker only on leaders, move this test to consul/server_test maybe
func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
@ -904,3 +906,133 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
})
}
}
// TODO(peering): once we move away from keeping state in stream tracker only on leaders, move this test to consul/server_test maybe
func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
var (
s2PeerID1 = generateUUID()
s2PeerID2 = generateUUID()
testContextTimeout = 60 * time.Second
lastIdx = uint64(0)
)
// TODO(peering): Configure with TLS
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s1.dc1"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
t.Cleanup(cancel)
conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-s2",
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)
var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))
// Bring up s2 and store s1's token so that it attempts to dial.
_, s2 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s2.dc2"
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
})
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Simulate exporting services in the tracker
{
// Simulate a peering initiation event by writing a peering with data from a peering token.
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
p := &pbpeering.Peering{
ID: s2PeerID1,
Name: "my-peer-s1",
PeerID: token.PeerID,
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
require.True(t, p.ShouldDial())
lastIdx++
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p))
p2 := &pbpeering.Peering{
ID: s2PeerID2,
Name: "my-peer-s3",
PeerID: token.PeerID, // doesn't much matter what these values are
PeerCAPems: token.CA,
PeerServerName: token.ServerName,
PeerServerAddresses: token.ServerAddresses,
}
require.True(t, p2.ShouldDial())
lastIdx++
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p2))
// connect the stream
mst1, err := s2.peeringServer.Tracker.Connected(s2PeerID1)
require.NoError(t, err)
// mimic tracking exported services
mst1.TrackExportedService(structs.ServiceName{Name: "a-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "b-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "c-service"})
// connect the stream
mst2, err := s2.peeringServer.Tracker.Connected(s2PeerID2)
require.NoError(t, err)
// mimic tracking exported services
mst2.TrackExportedService(structs.ServiceName{Name: "d-service"})
mst2.TrackExportedService(structs.ServiceName{Name: "e-service"})
}
// set up a metrics sink
sink := metrics.NewInmemSink(testContextTimeout, testContextTimeout)
cfg := metrics.DefaultConfig("us-west")
cfg.EnableHostname = false
met, err := metrics.New(cfg, sink)
require.NoError(t, err)
errM := s2.emitPeeringMetricsOnce(s2.logger, met)
require.NoError(t, errM)
retry.Run(t, func(r *retry.R) {
intervals := sink.Data()
require.Len(r, intervals, 1)
intv := intervals[0]
// the keys for a Gauge value look like: {serviceName}.{prefix}.{key_name};{label=value};...
keyMetric1 := fmt.Sprintf("us-west.consul.peering.exported_services;peer_name=my-peer-s1;peer_id=%s", s2PeerID1)
metric1, ok := intv.Gauges[keyMetric1]
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric1))
require.Equal(r, float32(3), metric1.Value) // for a, b, c services
keyMetric2 := fmt.Sprintf("us-west.consul.peering.exported_services;peer_name=my-peer-s3;peer_id=%s", s2PeerID2)
metric2, ok := intv.Gauges[keyMetric2]
require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric2))
require.Equal(r, float32(2), metric2.Value) // for d, e services
})
}

View File

@ -127,6 +127,7 @@ const (
virtualIPCheckRoutineName = "virtual IP version check"
peeringStreamsRoutineName = "streaming peering resources"
peeringDeletionRoutineName = "peering deferred deletion"
peeringStreamsMetricsRoutineName = "metrics for streaming peering resources"
)
var (

View File

@ -539,8 +539,8 @@ func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) {
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
}
func (s *Server) StreamStatus(peer string) (resp Status, found bool) {
return s.Tracker.StreamStatus(peer)
func (s *Server) StreamStatus(peerID string) (resp Status, found bool) {
return s.Tracker.StreamStatus(peerID)
}
// ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.

View File

@ -231,7 +231,8 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
if isServer {
gauges = append(gauges,
consul.AutopilotGauges,
consul.LeaderCertExpirationGauges)
consul.LeaderCertExpirationGauges,
consul.LeaderPeeringMetrics)
}
// Flatten definitions

View File

@ -51,6 +51,7 @@ const (
Snapshot string = "snapshot"
Partition string = "partition"
Peering string = "peering"
PeeringMetrics string = "peering_metrics"
TerminatingGateway string = "terminating_gateway"
TLSUtil string = "tlsutil"
Transaction string = "txn"