add a metric for received/validated/rejected envelopes

Signed-off-by: Jakub Sokołowski <jakub@status.im>
This commit is contained in:
Jakub Sokołowski 2019-11-16 18:02:13 +01:00 committed by Jakub
parent fad088a1f7
commit 25321b2c03
4 changed files with 36 additions and 15 deletions

View File

@ -3,10 +3,18 @@ package whisperv6
import prom "github.com/prometheus/client_golang/prometheus"
var (
envelopesCounter = prom.NewCounter(prom.CounterOpts{
Name: "whisper_envelopes_total",
Help: "Number of envelopes processed.",
envelopesReceivedCounter = prom.NewCounter(prom.CounterOpts{
Name: "whisper_envelopes_received_total",
Help: "Number of envelopes received.",
})
envelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{
Name: "whisper_envelopes_validated_total",
Help: "Number of envelopes processed successfully.",
})
envelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "whisper_envelopes_rejected_total",
Help: "Number of envelopes rejected.",
}, []string{"reason"})
envelopesCacheFailedCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "whisper_envelopes_cache_failures_total",
Help: "Number of envelopes which failed to be cached.",
@ -18,7 +26,7 @@ var (
envelopesSizeMeter = prom.NewHistogram(prom.HistogramOpts{
Name: "whisper_envelopes_size_bytes",
Help: "Size of processed Whisper envelopes in bytes.",
Buckets: prom.ExponentialBuckets(1024, 4, 10),
Buckets: prom.ExponentialBuckets(256, 4, 10),
})
// rate limiter metrics
rateLimitsProcessed = prom.NewCounter(prom.CounterOpts{
@ -32,7 +40,8 @@ var (
)
func init() {
prom.MustRegister(envelopesCounter)
prom.MustRegister(envelopesReceivedCounter)
prom.MustRegister(envelopesRejectedCounter)
prom.MustRegister(envelopesCacheFailedCounter)
prom.MustRegister(envelopesCachedCounter)
prom.MustRegister(envelopesSizeMeter)

View File

@ -40,7 +40,7 @@ type PeerRateLimiterConfig struct {
WhitelistedPeerIDs []enode.ID
}
var defaultPeerRateLimiterConfig = PeerRateLimiterConfig{
var peerRateLimiterDefaults = PeerRateLimiterConfig{
LimitPerSecIP: 10,
LimitPerSecPeerID: 5,
WhitelistedIPs: nil,
@ -62,7 +62,7 @@ type PeerRateLimiter struct {
func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig) *PeerRateLimiter {
if cfg == nil {
copy := defaultPeerRateLimiterConfig
copy := peerRateLimiterDefaults
cfg = &copy
}

View File

@ -65,7 +65,7 @@ func TestPeerLimiterHandler(t *testing.T) {
peer: p2p.NewPeer(enode.ID{0xaa, 0xbb, 0xcc}, "test-peer", nil),
}
rw1, rw2 := p2p.MsgPipe()
count := 100
var count int64 = 100
go func() {
err := echoMessages(r, p, rw2)
@ -74,7 +74,7 @@ func TestPeerLimiterHandler(t *testing.T) {
done := make(chan struct{})
go func() {
for i := 0; i < count; i++ {
for i := int64(0); i < count; i++ {
msg, err := rw1.ReadMsg()
require.NoError(t, err)
require.EqualValues(t, 101, msg.Code)
@ -82,15 +82,16 @@ func TestPeerLimiterHandler(t *testing.T) {
close(done)
}()
for i := 0; i < count; i += 1 {
for i := int64(0); i < count; i += 1 {
err := rw1.WriteMsg(p2p.Msg{Code: 101})
require.NoError(t, err)
}
<-done
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecIP, h.exceedIPLimit)
require.EqualValues(t, 100-defaultPeerRateLimiterConfig.LimitPerSecPeerID, h.exceedPeerLimit)
require.EqualValues(t, count, h.processed)
require.EqualValues(t, count-peerRateLimiterDefaults.LimitPerSecIP, h.exceedIPLimit)
require.EqualValues(t, count-peerRateLimiterDefaults.LimitPerSecPeerID, h.exceedPeerLimit)
}
func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) {
@ -129,6 +130,7 @@ func TestPeerLimiterHandlerWithWhitelisting(t *testing.T) {
<-done
require.Equal(t, count, h.processed)
require.Equal(t, 0, h.exceedIPLimit)
require.Equal(t, 0, h.exceedPeerLimit)
}
@ -149,9 +151,11 @@ func echoMessages(r *PeerRateLimiter, p *Peer, rw p2p.MsgReadWriter) error {
}
type mockRateLimiterHandler struct {
processed int
exceedPeerLimit int
exceedIPLimit int
}
func (m *mockRateLimiterHandler) ExceedPeerLimit() { m.exceedPeerLimit += 1 }
func (m *mockRateLimiterHandler) ExceedIPLimit() { m.exceedIPLimit += 1 }
func (m *mockRateLimiterHandler) IncProcessed() { m.processed += 1 }
func (m *mockRateLimiterHandler) IncExceedPeerLimit() { m.exceedPeerLimit += 1 }
func (m *mockRateLimiterHandler) IncExceedIPLimit() { m.exceedIPLimit += 1 }

View File

@ -934,12 +934,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// decode the contained envelopes
data, err := ioutil.ReadAll(packet.Payload)
if err != nil {
envelopesRejectedCounter.WithLabelValues("failed_read").Inc()
log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err)
return errors.New("invalid enveloopes")
}
var envelopes []*Envelope
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
envelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid envelopes")
}
@ -962,6 +964,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
Hash: env.Hash(),
Peer: p.peer.ID(),
})
envelopesValidatedCounter.Inc()
if cached {
p.mark(env)
}
@ -976,12 +979,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
case messageResponseCode:
var multiResponse MultiVersionResponse
if err := packet.Decode(&multiResponse); err != nil {
envelopesRejectedCounter.WithLabelValues("failed_read").Inc()
log.Error("failed to decode messages response", "peer", p.peer.ID(), "error", err)
return errors.New("invalid response message")
}
if multiResponse.Version == 1 {
response, err := multiResponse.DecodeResponse1()
if err != nil {
envelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
log.Error("failed to decode messages response into first version of response", "peer", p.peer.ID(), "error", err)
}
whisper.envelopeFeed.Send(EnvelopeEvent{
@ -1008,11 +1013,13 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
i, err := s.Uint()
if err != nil {
envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc()
log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid powRequirementCode message")
}
f := math.Float64frombits(i)
if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc()
log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid value in powRequirementCode message")
}
@ -1026,6 +1033,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if err != nil {
log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
envelopesRejectedCounter.WithLabelValues("invalid_bloom").Inc()
return errors.New("invalid bloom filter exchange message")
}
p.setBloomFilter(bloom)
@ -1206,7 +1214,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
now := uint32(whisper.timeSource().Unix())
sent := envelope.Expiry - envelope.TTL
envelopesCounter.Inc()
envelopesReceivedCounter.Inc()
if sent > now {
if sent-DefaultSyncAllowance > now {
envelopesCacheFailedCounter.WithLabelValues("in_future").Inc()