2022-03-17 23:02:26 +00:00
|
|
|
package middleware
|
|
|
|
|
|
|
|
import (
|
|
|
|
"reflect"
|
|
|
|
"strconv"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/armon/go-metrics"
|
2022-03-22 00:26:32 +00:00
|
|
|
"github.com/armon/go-metrics/prometheus"
|
2022-03-17 23:02:26 +00:00
|
|
|
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
)
|
|
|
|
|
|
|
|
// RPCTypeInternal identifies the "RPC" request as coming from some internal
|
|
|
|
// operation that runs on the cluster leader. Technically this is not an RPC
|
|
|
|
// request, but these raft.Apply operations have the same impact on blocking
|
|
|
|
// queries, and streaming subscriptions, so need to be tracked by the same metric
|
|
|
|
// and logs.
|
|
|
|
// Really what we are measuring here is a "cluster operation". The term we have
|
|
|
|
// used for this historically is "RPC", so we continue to use that here.
|
|
|
|
const RPCTypeInternal = "internal"
|
2022-03-22 16:31:54 +00:00
|
|
|
const RPCTypeNetRPC = "net/rpc"
|
2022-03-17 23:02:26 +00:00
|
|
|
|
2022-03-22 00:26:32 +00:00
|
|
|
var metricRPCRequest = []string{"rpc", "server", "call"}
|
2022-03-17 23:02:26 +00:00
|
|
|
var requestLogName = "rpc.server.request"
|
|
|
|
|
2022-03-24 21:41:30 +00:00
|
|
|
var NewRPCGauges = []prometheus.GaugeDefinition{
|
2022-03-22 00:26:32 +00:00
|
|
|
{
|
|
|
|
Name: metricRPCRequest,
|
|
|
|
Help: "Increments when a server makes an RPC service call. The labels on the metric have more information",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2022-03-17 23:02:26 +00:00
|
|
|
type RequestRecorder struct {
|
2022-03-22 16:31:54 +00:00
|
|
|
Logger hclog.Logger
|
|
|
|
recorderFunc func(key []string, start time.Time, labels []metrics.Label)
|
2022-03-17 23:02:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewRequestRecorder(logger hclog.Logger) *RequestRecorder {
|
2022-03-22 16:31:54 +00:00
|
|
|
return &RequestRecorder{Logger: logger, recorderFunc: metrics.MeasureSinceWithLabels}
|
2022-03-17 23:02:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) {
|
|
|
|
elapsed := time.Since(start)
|
|
|
|
|
|
|
|
reqType := requestType(request)
|
|
|
|
|
|
|
|
labels := []metrics.Label{
|
|
|
|
{Name: "method", Value: requestName},
|
|
|
|
{Name: "errored", Value: strconv.FormatBool(respErrored)},
|
|
|
|
{Name: "request_type", Value: reqType},
|
|
|
|
{Name: "rpc_type", Value: rpcType},
|
|
|
|
}
|
|
|
|
|
2022-03-22 16:31:54 +00:00
|
|
|
// TODO(FFMMM): it'd be neat if we could actually pass the elapsed observed above
|
|
|
|
r.recorderFunc(metricRPCRequest, start, labels)
|
2022-03-17 23:02:26 +00:00
|
|
|
|
|
|
|
r.Logger.Debug(requestLogName,
|
|
|
|
"method", requestName,
|
|
|
|
"errored", respErrored,
|
|
|
|
"request_type", reqType,
|
|
|
|
"rpc_type", rpcType,
|
|
|
|
"elapsed", elapsed)
|
|
|
|
}
|
|
|
|
|
|
|
|
func requestType(req interface{}) string {
|
|
|
|
if r, ok := req.(interface{ IsRead() bool }); ok && r.IsRead() {
|
|
|
|
return "read"
|
|
|
|
}
|
|
|
|
return "write"
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor {
|
|
|
|
return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) {
|
|
|
|
reqStart := time.Now()
|
|
|
|
|
|
|
|
err := handler()
|
|
|
|
|
2022-03-22 16:31:54 +00:00
|
|
|
recorder.Record(reqServiceMethod, RPCTypeNetRPC, reqStart, argv.Interface(), err != nil)
|
2022-03-17 23:02:26 +00:00
|
|
|
}
|
|
|
|
}
|