diff --git a/agent/consul/leader_peering.go b/agent/consul/leader_peering.go index 28a8397df9..3288a141a1 100644 --- a/agent/consul/leader_peering.go +++ b/agent/consul/leader_peering.go @@ -8,6 +8,8 @@ import ( "fmt" "time" + "github.com/armon/go-metrics" + "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" @@ -27,8 +29,72 @@ import ( "github.com/hashicorp/consul/proto/pbpeerstream" ) +var leaderExportedServicesCountKey = []string{"consul", "peering", "exported_services"} +var LeaderPeeringMetrics = []prometheus.GaugeDefinition{ + { + Name: leaderExportedServicesCountKey, + Help: "A gauge that tracks how many services are exported for the peering. " + + "The labels are \"peering\" and, for enterprise, \"partition\". " + + "We emit this metric every 9 seconds", + }, +} + func (s *Server) startPeeringStreamSync(ctx context.Context) { s.leaderRoutineManager.Start(ctx, peeringStreamsRoutineName, s.runPeeringSync) + s.leaderRoutineManager.Start(ctx, peeringStreamsMetricsRoutineName, s.runPeeringMetrics) +} + +func (s *Server) runPeeringMetrics(ctx context.Context) error { + ticker := time.NewTicker(s.config.MetricsReportingInterval) + defer ticker.Stop() + + logger := s.logger.Named(logging.PeeringMetrics) + defaultMetrics := metrics.Default + + for { + select { + case <-ctx.Done(): + logger.Info("stopping peering metrics") + + // "Zero-out" the metric on exit so that when prometheus scrapes this + // metric from a non-leader, it does not get a stale value. + metrics.SetGauge(leaderExportedServicesCountKey, float32(0)) + return nil + case <-ticker.C: + if err := s.emitPeeringMetricsOnce(logger, defaultMetrics()); err != nil { + s.logger.Error("error emitting peering stream metrics", "error", err) + } + } + } +} + +func (s *Server) emitPeeringMetricsOnce(logger hclog.Logger, metricsImpl *metrics.Metrics) error { + _, peers, err := s.fsm.State().PeeringList(nil, *structs.NodeEnterpriseMetaInPartition(structs.WildcardSpecifier)) + if err != nil { + return err + } + + for _, peer := range peers { + status, found := s.peerStreamServer.StreamStatus(peer.ID) + if !found { + logger.Trace("did not find status for", "peer_name", peer.Name) + continue + } + + esc := status.GetExportedServicesCount() + part := peer.Partition + labels := []metrics.Label{ + {Name: "peer_name", Value: peer.Name}, + {Name: "peer_id", Value: peer.ID}, + } + if part != "" { + labels = append(labels, metrics.Label{Name: "partition", Value: part}) + } + + metricsImpl.SetGaugeWithLabels(leaderExportedServicesCountKey, float32(esc), labels) + } + + return nil } func (s *Server) runPeeringSync(ctx context.Context) error { @@ -51,6 +117,7 @@ func (s *Server) runPeeringSync(ctx context.Context) error { func (s *Server) stopPeeringStreamSync() { // will be a no-op when not started s.leaderRoutineManager.Stop(peeringStreamsRoutineName) + s.leaderRoutineManager.Stop(peeringStreamsMetricsRoutineName) } // syncPeeringsAndBlock is a long-running goroutine that is responsible for watching diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index c3196a54ec..06cbda43d9 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -4,10 +4,12 @@ import ( "context" "encoding/base64" "encoding/json" + "fmt" "io/ioutil" "testing" "time" + "github.com/armon/go-metrics" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -615,7 +617,7 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId return lastIdx } -// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe +// TODO(peering): once we move away from keeping state in stream tracker only on leaders, move this test to consul/server_test maybe func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -904,3 +906,133 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) { }) } } + +// TODO(peering): once we move away from keeping state in stream tracker only on leaders, move this test to consul/server_test maybe +func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + var ( + s2PeerID1 = generateUUID() + s2PeerID2 = generateUUID() + testContextTimeout = 60 * time.Second + lastIdx = uint64(0) + ) + + // TODO(peering): Configure with TLS + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Create a peering by generating a token + ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + peeringClient := pbpeering.NewPeeringServiceClient(conn) + + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-s2", + } + resp, err := peeringClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken) + require.NoError(t, err) + + var token structs.PeeringToken + require.NoError(t, json.Unmarshal(tokenJSON, &token)) + + // Bring up s2 and store s1's token so that it attempts to dial. + _, s2 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s2.dc2" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + }) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Simulate exporting services in the tracker + { + // Simulate a peering initiation event by writing a peering with data from a peering token. + // Eventually the leader in dc2 should dial and connect to the leader in dc1. + p := &pbpeering.Peering{ + ID: s2PeerID1, + Name: "my-peer-s1", + PeerID: token.PeerID, + PeerCAPems: token.CA, + PeerServerName: token.ServerName, + PeerServerAddresses: token.ServerAddresses, + } + require.True(t, p.ShouldDial()) + lastIdx++ + require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p)) + + p2 := &pbpeering.Peering{ + ID: s2PeerID2, + Name: "my-peer-s3", + PeerID: token.PeerID, // doesn't much matter what these values are + PeerCAPems: token.CA, + PeerServerName: token.ServerName, + PeerServerAddresses: token.ServerAddresses, + } + require.True(t, p2.ShouldDial()) + lastIdx++ + require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p2)) + + // connect the stream + mst1, err := s2.peeringServer.Tracker.Connected(s2PeerID1) + require.NoError(t, err) + + // mimic tracking exported services + mst1.TrackExportedService(structs.ServiceName{Name: "a-service"}) + mst1.TrackExportedService(structs.ServiceName{Name: "b-service"}) + mst1.TrackExportedService(structs.ServiceName{Name: "c-service"}) + + // connect the stream + mst2, err := s2.peeringServer.Tracker.Connected(s2PeerID2) + require.NoError(t, err) + + // mimic tracking exported services + mst2.TrackExportedService(structs.ServiceName{Name: "d-service"}) + mst2.TrackExportedService(structs.ServiceName{Name: "e-service"}) + } + + // set up a metrics sink + sink := metrics.NewInmemSink(testContextTimeout, testContextTimeout) + cfg := metrics.DefaultConfig("us-west") + cfg.EnableHostname = false + met, err := metrics.New(cfg, sink) + require.NoError(t, err) + + errM := s2.emitPeeringMetricsOnce(s2.logger, met) + require.NoError(t, errM) + + retry.Run(t, func(r *retry.R) { + intervals := sink.Data() + require.Len(r, intervals, 1) + intv := intervals[0] + + // the keys for a Gauge value look like: {serviceName}.{prefix}.{key_name};{label=value};... + keyMetric1 := fmt.Sprintf("us-west.consul.peering.exported_services;peer_name=my-peer-s1;peer_id=%s", s2PeerID1) + metric1, ok := intv.Gauges[keyMetric1] + require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric1)) + + require.Equal(r, float32(3), metric1.Value) // for a, b, c services + + keyMetric2 := fmt.Sprintf("us-west.consul.peering.exported_services;peer_name=my-peer-s3;peer_id=%s", s2PeerID2) + metric2, ok := intv.Gauges[keyMetric2] + require.True(r, ok, fmt.Sprintf("did not find the key %q", keyMetric2)) + + require.Equal(r, float32(2), metric2.Value) // for d, e services + }) +} diff --git a/agent/consul/server.go b/agent/consul/server.go index d4753bb3f3..3c240c5f7d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -127,6 +127,7 @@ const ( virtualIPCheckRoutineName = "virtual IP version check" peeringStreamsRoutineName = "streaming peering resources" peeringDeletionRoutineName = "peering deferred deletion" + peeringStreamsMetricsRoutineName = "metrics for streaming peering resources" ) var ( diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 3d10cdfa0c..5702f0e135 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -539,8 +539,8 @@ func getTrustDomain(store StateStore, logger hclog.Logger) (string, error) { return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil } -func (s *Server) StreamStatus(peer string) (resp Status, found bool) { - return s.Tracker.StreamStatus(peer) +func (s *Server) StreamStatus(peerID string) (resp Status, found bool) { + return s.Tracker.StreamStatus(peerID) } // ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down. diff --git a/agent/setup.go b/agent/setup.go index 9ac506ab67..c329605181 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -231,7 +231,8 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau if isServer { gauges = append(gauges, consul.AutopilotGauges, - consul.LeaderCertExpirationGauges) + consul.LeaderCertExpirationGauges, + consul.LeaderPeeringMetrics) } // Flatten definitions diff --git a/logging/names.go b/logging/names.go index 5fd904c7fa..7f5e2bf603 100644 --- a/logging/names.go +++ b/logging/names.go @@ -51,6 +51,7 @@ const ( Snapshot string = "snapshot" Partition string = "partition" Peering string = "peering" + PeeringMetrics string = "peering_metrics" TerminatingGateway string = "terminating_gateway" TLSUtil string = "tlsutil" Transaction string = "txn"