upgrade whisper: 1.6.1 > 1.6.2
This adds new Prometheus metrics Signed-off-by: Jakub Sokołowski <jakub@status.im>
This commit is contained in:
parent
26b6d7c36a
commit
f4d4c66d5b
2
go.mod
2
go.mod
|
@ -40,7 +40,7 @@ require (
|
||||||
github.com/status-im/status-go/extkeys v1.0.0
|
github.com/status-im/status-go/extkeys v1.0.0
|
||||||
github.com/status-im/status-go/protocol v0.5.2
|
github.com/status-im/status-go/protocol v0.5.2
|
||||||
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
|
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
|
||||||
github.com/status-im/whisper v1.6.1
|
github.com/status-im/whisper v1.6.2
|
||||||
github.com/stretchr/testify v1.4.0
|
github.com/stretchr/testify v1.4.0
|
||||||
github.com/syndtr/goleveldb v1.0.0
|
github.com/syndtr/goleveldb v1.0.0
|
||||||
go.uber.org/zap v1.13.0
|
go.uber.org/zap v1.13.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -586,8 +586,8 @@ github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcw
|
||||||
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 h1:oa0KU5jJRNtXaM/P465MhvSFo/HM2O8qi2DDuPcd7ro=
|
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501 h1:oa0KU5jJRNtXaM/P465MhvSFo/HM2O8qi2DDuPcd7ro=
|
||||||
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501/go.mod h1:RYo/itke1oU5k/6sj9DNM3QAwtE5rZSgg5JnkOv83hk=
|
github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501/go.mod h1:RYo/itke1oU5k/6sj9DNM3QAwtE5rZSgg5JnkOv83hk=
|
||||||
github.com/status-im/whisper v1.5.2/go.mod h1:emrOxzJme0k66QtbbQ2bdd3P8RCdLZ8sTD7SkwH1s2s=
|
github.com/status-im/whisper v1.5.2/go.mod h1:emrOxzJme0k66QtbbQ2bdd3P8RCdLZ8sTD7SkwH1s2s=
|
||||||
github.com/status-im/whisper v1.6.1 h1:C/T1HQHZfUI2jbccf3yIe8yfkl435I3BILIKeNASJDc=
|
github.com/status-im/whisper v1.6.2 h1:68WS0R2PzfM1VFKq/LW/hxijYHrqStfSlEoYOH8KZE4=
|
||||||
github.com/status-im/whisper v1.6.1/go.mod h1:lygchT4p9Y1/hR451OhNNqfinvy9EYEDxtXU2T/U30Q=
|
github.com/status-im/whisper v1.6.2/go.mod h1:uacoZIYbpf7iVk+YgFIuym6R0Qv2asjn6GoZEZ3dBLI=
|
||||||
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=
|
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=
|
||||||
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw=
|
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw=
|
||||||
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM=
|
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM=
|
||||||
|
|
|
@ -38,6 +38,7 @@ const (
|
||||||
// EnvelopeEvent used for envelopes events.
|
// EnvelopeEvent used for envelopes events.
|
||||||
type EnvelopeEvent struct {
|
type EnvelopeEvent struct {
|
||||||
Event EventType
|
Event EventType
|
||||||
|
Topic TopicType
|
||||||
Hash common.Hash
|
Hash common.Hash
|
||||||
Batch common.Hash
|
Batch common.Hash
|
||||||
Peer enode.ID
|
Peer enode.ID
|
||||||
|
|
|
@ -1,21 +1,50 @@
|
||||||
package whisperv6
|
package whisperv6
|
||||||
|
|
||||||
import "github.com/ethereum/go-ethereum/metrics"
|
import prom "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
envelopeAddedCounter = metrics.NewRegisteredCounter("whisper/envelopeAdded", nil)
|
envelopesReceivedCounter = prom.NewCounter(prom.CounterOpts{
|
||||||
envelopeNewAddedCounter = metrics.NewRegisteredCounter("whisper/envelopeNewAdded", nil)
|
Name: "whisper_envelopes_received_total",
|
||||||
envelopeClearedCounter = metrics.NewRegisteredCounter("whisper/envelopeCleared", nil)
|
Help: "Number of envelopes received.",
|
||||||
envelopeErrFromFutureCounter = metrics.NewRegisteredCounter("whisper/envelopeErrFromFuture", nil)
|
})
|
||||||
envelopeErrVeryOldCounter = metrics.NewRegisteredCounter("whisper/envelopeErrVeryOld", nil)
|
envelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{
|
||||||
envelopeErrExpiredCounter = metrics.NewRegisteredCounter("whisper/envelopeErrExpired", nil)
|
Name: "whisper_envelopes_validated_total",
|
||||||
envelopeErrOversizedCounter = metrics.NewRegisteredCounter("whisper/envelopeErrOversized", nil)
|
Help: "Number of envelopes processed successfully.",
|
||||||
envelopeErrLowPowCounter = metrics.NewRegisteredCounter("whisper/envelopeErrLowPow", nil)
|
})
|
||||||
envelopeErrNoBloomMatchCounter = metrics.NewRegisteredCounter("whisper/envelopeErrNoBloomMatch", nil)
|
envelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{
|
||||||
envelopeSizeMeter = metrics.NewRegisteredMeter("whisper/envelopeSize", nil)
|
Name: "whisper_envelopes_rejected_total",
|
||||||
|
Help: "Number of envelopes rejected.",
|
||||||
|
}, []string{"reason"})
|
||||||
|
envelopesCacheFailedCounter = prom.NewCounterVec(prom.CounterOpts{
|
||||||
|
Name: "whisper_envelopes_cache_failures_total",
|
||||||
|
Help: "Number of envelopes which failed to be cached.",
|
||||||
|
}, []string{"type"})
|
||||||
|
envelopesCachedCounter = prom.NewCounterVec(prom.CounterOpts{
|
||||||
|
Name: "whisper_envelopes_cached_total",
|
||||||
|
Help: "Number of envelopes cached.",
|
||||||
|
}, []string{"cache"})
|
||||||
|
envelopesSizeMeter = prom.NewHistogram(prom.HistogramOpts{
|
||||||
|
Name: "whisper_envelopes_size_bytes",
|
||||||
|
Help: "Size of processed Whisper envelopes in bytes.",
|
||||||
|
Buckets: prom.ExponentialBuckets(256, 4, 10),
|
||||||
|
})
|
||||||
// rate limiter metrics
|
// rate limiter metrics
|
||||||
rateLimiterProcessed = metrics.NewRegisteredCounter("whisper/rateLimiterProcessed", nil)
|
rateLimitsProcessed = prom.NewCounter(prom.CounterOpts{
|
||||||
rateLimiterIPExceeded = metrics.NewRegisteredCounter("whisper/rateLimiterIPExceeded", nil)
|
Name: "whisper_rate_limits_processed_total",
|
||||||
rateLimiterPeerExceeded = metrics.NewRegisteredCounter("whisper/rateLimiterPeerExceeded", nil)
|
Help: "Number of packets whisper rate limiter processed.",
|
||||||
|
})
|
||||||
|
rateLimitsExceeded = prom.NewCounterVec(prom.CounterOpts{
|
||||||
|
Name: "whisper_rate_limits_exceeded_total",
|
||||||
|
Help: "Number of times the whisper rate limits were exceeded",
|
||||||
|
}, []string{"type"})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prom.MustRegister(envelopesReceivedCounter)
|
||||||
|
prom.MustRegister(envelopesRejectedCounter)
|
||||||
|
prom.MustRegister(envelopesCacheFailedCounter)
|
||||||
|
prom.MustRegister(envelopesCachedCounter)
|
||||||
|
prom.MustRegister(envelopesSizeMeter)
|
||||||
|
prom.MustRegister(rateLimitsProcessed)
|
||||||
|
prom.MustRegister(rateLimitsExceeded)
|
||||||
|
}
|
||||||
|
|
|
@ -14,14 +14,24 @@ import (
|
||||||
type runLoop func(p *Peer, rw p2p.MsgReadWriter) error
|
type runLoop func(p *Peer, rw p2p.MsgReadWriter) error
|
||||||
|
|
||||||
type RateLimiterHandler interface {
|
type RateLimiterHandler interface {
|
||||||
ExceedPeerLimit()
|
IncProcessed()
|
||||||
ExceedIPLimit()
|
IncExceedPeerLimit()
|
||||||
|
IncExceedIPLimit()
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricsRateLimiterHandler struct{}
|
type MetricsRateLimiterHandler struct{}
|
||||||
|
|
||||||
func (MetricsRateLimiterHandler) ExceedPeerLimit() { rateLimiterPeerExceeded.Inc(1) }
|
func (MetricsRateLimiterHandler) IncProcessed() {
|
||||||
func (MetricsRateLimiterHandler) ExceedIPLimit() { rateLimiterIPExceeded.Inc(1) }
|
rateLimitsProcessed.Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (MetricsRateLimiterHandler) IncExceedPeerLimit() {
|
||||||
|
rateLimitsExceeded.WithLabelValues("max_peers").Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (MetricsRateLimiterHandler) IncExceedIPLimit() {
|
||||||
|
rateLimitsExceeded.WithLabelValues("max_ips").Inc()
|
||||||
|
}
|
||||||
|
|
||||||
type PeerRateLimiterConfig struct {
|
type PeerRateLimiterConfig struct {
|
||||||
LimitPerSecIP int64
|
LimitPerSecIP int64
|
||||||
|
@ -30,7 +40,7 @@ type PeerRateLimiterConfig struct {
|
||||||
WhitelistedPeerIDs []enode.ID
|
WhitelistedPeerIDs []enode.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultPeerRateLimiterConfig = PeerRateLimiterConfig{
|
var peerRateLimiterDefaults = PeerRateLimiterConfig{
|
||||||
LimitPerSecIP: 10,
|
LimitPerSecIP: 10,
|
||||||
LimitPerSecPeerID: 5,
|
LimitPerSecPeerID: 5,
|
||||||
WhitelistedIPs: nil,
|
WhitelistedIPs: nil,
|
||||||
|
@ -52,7 +62,7 @@ type PeerRateLimiter struct {
|
||||||
|
|
||||||
func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig) *PeerRateLimiter {
|
func NewPeerRateLimiter(handler RateLimiterHandler, cfg *PeerRateLimiterConfig) *PeerRateLimiter {
|
||||||
if cfg == nil {
|
if cfg == nil {
|
||||||
copy := defaultPeerRateLimiterConfig
|
copy := peerRateLimiterDefaults
|
||||||
cfg = ©
|
cfg = ©
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,14 +92,14 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rateLimiterProcessed.Inc(1)
|
r.handler.IncProcessed()
|
||||||
|
|
||||||
var ip string
|
var ip string
|
||||||
if p != nil && p.peer != nil {
|
if p != nil && p.peer != nil {
|
||||||
ip = p.peer.Node().IP().String()
|
ip = p.peer.Node().IP().String()
|
||||||
}
|
}
|
||||||
if halted := r.throttleIP(ip); halted {
|
if halted := r.throttleIP(ip); halted {
|
||||||
r.handler.ExceedIPLimit()
|
r.handler.IncExceedIPLimit()
|
||||||
}
|
}
|
||||||
|
|
||||||
var peerID []byte
|
var peerID []byte
|
||||||
|
@ -97,7 +107,7 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
|
||||||
peerID = p.ID()
|
peerID = p.ID()
|
||||||
}
|
}
|
||||||
if halted := r.throttlePeer(peerID); halted {
|
if halted := r.throttlePeer(peerID); halted {
|
||||||
r.handler.ExceedPeerLimit()
|
r.handler.IncExceedPeerLimit()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := in.WriteMsg(packet); err != nil {
|
if err := in.WriteMsg(packet); err != nil {
|
||||||
|
|
|
@ -441,6 +441,7 @@ func (whisper *Whisper) RequestHistoricMessagesWithTimeout(peerID []byte, envelo
|
||||||
}
|
}
|
||||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||||
Peer: p.peer.ID(),
|
Peer: p.peer.ID(),
|
||||||
|
Topic: envelope.Topic,
|
||||||
Hash: envelope.Hash(),
|
Hash: envelope.Hash(),
|
||||||
Event: EventMailServerRequestSent,
|
Event: EventMailServerRequestSent,
|
||||||
})
|
})
|
||||||
|
@ -933,12 +934,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||||
// decode the contained envelopes
|
// decode the contained envelopes
|
||||||
data, err := ioutil.ReadAll(packet.Payload)
|
data, err := ioutil.ReadAll(packet.Payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
envelopesRejectedCounter.WithLabelValues("failed_read").Inc()
|
||||||
log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err)
|
log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err)
|
||||||
return errors.New("invalid enveloopes")
|
return errors.New("invalid enveloopes")
|
||||||
}
|
}
|
||||||
|
|
||||||
var envelopes []*Envelope
|
var envelopes []*Envelope
|
||||||
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
|
if err := rlp.DecodeBytes(data, &envelopes); err != nil {
|
||||||
|
envelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
|
||||||
log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||||
return errors.New("invalid envelopes")
|
return errors.New("invalid envelopes")
|
||||||
}
|
}
|
||||||
|
@ -957,9 +960,11 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||||
|
|
||||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||||
Event: EventEnvelopeReceived,
|
Event: EventEnvelopeReceived,
|
||||||
|
Topic: env.Topic,
|
||||||
Hash: env.Hash(),
|
Hash: env.Hash(),
|
||||||
Peer: p.peer.ID(),
|
Peer: p.peer.ID(),
|
||||||
})
|
})
|
||||||
|
envelopesValidatedCounter.Inc()
|
||||||
if cached {
|
if cached {
|
||||||
p.mark(env)
|
p.mark(env)
|
||||||
}
|
}
|
||||||
|
@ -974,12 +979,14 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||||
case messageResponseCode:
|
case messageResponseCode:
|
||||||
var multiResponse MultiVersionResponse
|
var multiResponse MultiVersionResponse
|
||||||
if err := packet.Decode(&multiResponse); err != nil {
|
if err := packet.Decode(&multiResponse); err != nil {
|
||||||
|
envelopesRejectedCounter.WithLabelValues("failed_read").Inc()
|
||||||
log.Error("failed to decode messages response", "peer", p.peer.ID(), "error", err)
|
log.Error("failed to decode messages response", "peer", p.peer.ID(), "error", err)
|
||||||
return errors.New("invalid response message")
|
return errors.New("invalid response message")
|
||||||
}
|
}
|
||||||
if multiResponse.Version == 1 {
|
if multiResponse.Version == 1 {
|
||||||
response, err := multiResponse.DecodeResponse1()
|
response, err := multiResponse.DecodeResponse1()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
envelopesRejectedCounter.WithLabelValues("invalid_data").Inc()
|
||||||
log.Error("failed to decode messages response into first version of response", "peer", p.peer.ID(), "error", err)
|
log.Error("failed to decode messages response into first version of response", "peer", p.peer.ID(), "error", err)
|
||||||
}
|
}
|
||||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||||
|
@ -1006,11 +1013,13 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||||
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
|
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
|
||||||
i, err := s.Uint()
|
i, err := s.Uint()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc()
|
||||||
log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||||
return errors.New("invalid powRequirementCode message")
|
return errors.New("invalid powRequirementCode message")
|
||||||
}
|
}
|
||||||
f := math.Float64frombits(i)
|
f := math.Float64frombits(i)
|
||||||
if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
|
if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 {
|
||||||
|
envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc()
|
||||||
log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||||
return errors.New("invalid value in powRequirementCode message")
|
return errors.New("invalid value in powRequirementCode message")
|
||||||
}
|
}
|
||||||
|
@ -1024,6 +1033,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
||||||
|
envelopesRejectedCounter.WithLabelValues("invalid_bloom").Inc()
|
||||||
return errors.New("invalid bloom filter exchange message")
|
return errors.New("invalid bloom filter exchange message")
|
||||||
}
|
}
|
||||||
p.setBloomFilter(bloom)
|
p.setBloomFilter(bloom)
|
||||||
|
@ -1204,10 +1214,10 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||||
now := uint32(whisper.timeSource().Unix())
|
now := uint32(whisper.timeSource().Unix())
|
||||||
sent := envelope.Expiry - envelope.TTL
|
sent := envelope.Expiry - envelope.TTL
|
||||||
|
|
||||||
envelopeAddedCounter.Inc(1)
|
envelopesReceivedCounter.Inc()
|
||||||
if sent > now {
|
if sent > now {
|
||||||
if sent-DefaultSyncAllowance > now {
|
if sent-DefaultSyncAllowance > now {
|
||||||
envelopeErrFromFutureCounter.Inc(1)
|
envelopesCacheFailedCounter.WithLabelValues("in_future").Inc()
|
||||||
log.Warn("envelope created in the future", "hash", envelope.Hash())
|
log.Warn("envelope created in the future", "hash", envelope.Hash())
|
||||||
return false, TimeSyncError(errors.New("envelope from future"))
|
return false, TimeSyncError(errors.New("envelope from future"))
|
||||||
}
|
}
|
||||||
|
@ -1217,17 +1227,17 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||||
|
|
||||||
if envelope.Expiry < now {
|
if envelope.Expiry < now {
|
||||||
if envelope.Expiry+DefaultSyncAllowance*2 < now {
|
if envelope.Expiry+DefaultSyncAllowance*2 < now {
|
||||||
envelopeErrVeryOldCounter.Inc(1)
|
envelopesCacheFailedCounter.WithLabelValues("very_old").Inc()
|
||||||
log.Warn("very old envelope", "hash", envelope.Hash())
|
log.Warn("very old envelope", "hash", envelope.Hash())
|
||||||
return false, TimeSyncError(errors.New("very old envelope"))
|
return false, TimeSyncError(errors.New("very old envelope"))
|
||||||
}
|
}
|
||||||
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
|
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
|
||||||
envelopeErrExpiredCounter.Inc(1)
|
envelopesCacheFailedCounter.WithLabelValues("expired").Inc()
|
||||||
return false, nil // drop envelope without error
|
return false, nil // drop envelope without error
|
||||||
}
|
}
|
||||||
|
|
||||||
if uint32(envelope.size()) > whisper.MaxMessageSize() {
|
if uint32(envelope.size()) > whisper.MaxMessageSize() {
|
||||||
envelopeErrOversizedCounter.Inc(1)
|
envelopesCacheFailedCounter.WithLabelValues("oversized").Inc()
|
||||||
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
|
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1236,7 +1246,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||||
// in this case the previous value is retrieved by MinPowTolerance()
|
// in this case the previous value is retrieved by MinPowTolerance()
|
||||||
// for a short period of peer synchronization.
|
// for a short period of peer synchronization.
|
||||||
if envelope.PoW() < whisper.MinPowTolerance() {
|
if envelope.PoW() < whisper.MinPowTolerance() {
|
||||||
envelopeErrLowPowCounter.Inc(1)
|
envelopesCacheFailedCounter.WithLabelValues("low_pow").Inc()
|
||||||
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
|
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1246,7 +1256,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||||
// in this case the previous value is retrieved by BloomFilterTolerance()
|
// in this case the previous value is retrieved by BloomFilterTolerance()
|
||||||
// for a short period of peer synchronization.
|
// for a short period of peer synchronization.
|
||||||
if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
|
if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
|
||||||
envelopeErrNoBloomMatchCounter.Inc(1)
|
envelopesCacheFailedCounter.WithLabelValues("no_bloom_match").Inc()
|
||||||
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
|
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
|
||||||
envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
|
envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
|
||||||
}
|
}
|
||||||
|
@ -1269,10 +1279,11 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||||
|
|
||||||
if alreadyCached {
|
if alreadyCached {
|
||||||
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
|
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
|
||||||
|
envelopesCachedCounter.WithLabelValues("hit").Inc()
|
||||||
} else {
|
} else {
|
||||||
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
|
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
|
||||||
envelopeNewAddedCounter.Inc(1)
|
envelopesCachedCounter.WithLabelValues("miss").Inc()
|
||||||
envelopeSizeMeter.Mark(int64(envelope.size()))
|
envelopesSizeMeter.Observe(float64(envelope.size()))
|
||||||
whisper.statsMu.Lock()
|
whisper.statsMu.Lock()
|
||||||
whisper.stats.memoryUsed += envelope.size()
|
whisper.stats.memoryUsed += envelope.size()
|
||||||
whisper.statsMu.Unlock()
|
whisper.statsMu.Unlock()
|
||||||
|
@ -1280,6 +1291,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||||
if whisper.mailServer != nil {
|
if whisper.mailServer != nil {
|
||||||
whisper.mailServer.Archive(envelope)
|
whisper.mailServer.Archive(envelope)
|
||||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||||
|
Topic: envelope.Topic,
|
||||||
Hash: envelope.Hash(),
|
Hash: envelope.Hash(),
|
||||||
Event: EventMailServerEnvelopeArchived,
|
Event: EventMailServerEnvelopeArchived,
|
||||||
})
|
})
|
||||||
|
@ -1329,6 +1341,7 @@ func (whisper *Whisper) processQueue() {
|
||||||
case e := <-whisper.messageQueue:
|
case e := <-whisper.messageQueue:
|
||||||
whisper.filters.NotifyWatchers(e, false)
|
whisper.filters.NotifyWatchers(e, false)
|
||||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||||
|
Topic: e.Topic,
|
||||||
Hash: e.Hash(),
|
Hash: e.Hash(),
|
||||||
Event: EventEnvelopeAvailable,
|
Event: EventEnvelopeAvailable,
|
||||||
})
|
})
|
||||||
|
@ -1346,6 +1359,7 @@ func (whisper *Whisper) processP2P() {
|
||||||
case *Envelope:
|
case *Envelope:
|
||||||
whisper.filters.NotifyWatchers(event, true)
|
whisper.filters.NotifyWatchers(event, true)
|
||||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||||
|
Topic: event.Topic,
|
||||||
Hash: event.Hash(),
|
Hash: event.Hash(),
|
||||||
Event: EventEnvelopeAvailable,
|
Event: EventEnvelopeAvailable,
|
||||||
})
|
})
|
||||||
|
@ -1390,7 +1404,7 @@ func (whisper *Whisper) expire() {
|
||||||
hashSet.Each(func(v interface{}) bool {
|
hashSet.Each(func(v interface{}) bool {
|
||||||
sz := whisper.envelopes[v.(common.Hash)].size()
|
sz := whisper.envelopes[v.(common.Hash)].size()
|
||||||
delete(whisper.envelopes, v.(common.Hash))
|
delete(whisper.envelopes, v.(common.Hash))
|
||||||
envelopeClearedCounter.Inc(1)
|
envelopesCachedCounter.WithLabelValues("clear").Inc()
|
||||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||||
Hash: v.(common.Hash),
|
Hash: v.(common.Hash),
|
||||||
Event: EventEnvelopeExpired,
|
Event: EventEnvelopeExpired,
|
||||||
|
|
|
@ -398,7 +398,7 @@ github.com/status-im/status-go/protocol/v1
|
||||||
github.com/status-im/status-go/protocol/zaputil
|
github.com/status-im/status-go/protocol/zaputil
|
||||||
# github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
|
# github.com/status-im/tcp-shaker v0.0.0-20191114194237-215893130501
|
||||||
github.com/status-im/tcp-shaker
|
github.com/status-im/tcp-shaker
|
||||||
# github.com/status-im/whisper v1.6.1
|
# github.com/status-im/whisper v1.6.2
|
||||||
github.com/status-im/whisper/whisperv6
|
github.com/status-im/whisper/whisperv6
|
||||||
# github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570
|
# github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570
|
||||||
github.com/steakknife/bloomfilter
|
github.com/steakknife/bloomfilter
|
||||||
|
|
Loading…
Reference in New Issue