Remove EnvelopeTranced and add Whisper metrics as a patch (#1177)
This commit is contained in:
parent
5664a24b10
commit
bf1d1ee0ea
|
@ -1,111 +0,0 @@
|
|||
diff --git i/whisper/whisperv6/doc.go w/whisper/whisperv6/doc.go
|
||||
index 066a9766d..4bbf5546b 100644
|
||||
--- i/whisper/whisperv6/doc.go
|
||||
+++ w/whisper/whisperv6/doc.go
|
||||
@@ -95,3 +95,40 @@ type MailServer interface {
|
||||
Archive(env *Envelope)
|
||||
DeliverMail(whisperPeer *Peer, request *Envelope)
|
||||
}
|
||||
+
|
||||
+type envelopeSource int
|
||||
+
|
||||
+const (
|
||||
+ _ = iota
|
||||
+ // peerSource indicates a source as a regular peer.
|
||||
+ peerSource envelopeSource = iota
|
||||
+ // p2pSource indicates that envelop was received from a trusted peer.
|
||||
+ p2pSource
|
||||
+)
|
||||
+
|
||||
+// EnvelopeMeta keeps metadata of received envelopes.
|
||||
+type EnvelopeMeta struct {
|
||||
+ Hash string
|
||||
+ Topic TopicType
|
||||
+ Size uint32
|
||||
+ Source envelopeSource
|
||||
+ IsNew bool
|
||||
+ Peer string
|
||||
+}
|
||||
+
|
||||
+// SourceString converts source to string.
|
||||
+func (m *EnvelopeMeta) SourceString() string {
|
||||
+ switch m.Source {
|
||||
+ case peerSource:
|
||||
+ return "peer"
|
||||
+ case p2pSource:
|
||||
+ return "p2p"
|
||||
+ default:
|
||||
+ return "unknown"
|
||||
+ }
|
||||
+}
|
||||
+
|
||||
+// EnvelopeTracer tracks received envelopes.
|
||||
+type EnvelopeTracer interface {
|
||||
+ Trace(*EnvelopeMeta)
|
||||
+}
|
||||
diff --git i/whisper/whisperv6/whisper.go w/whisper/whisperv6/whisper.go
|
||||
index 702556079..9aaf5687c 100644
|
||||
--- i/whisper/whisperv6/whisper.go
|
||||
+++ w/whisper/whisperv6/whisper.go
|
||||
@@ -87,7 +87,8 @@ type Whisper struct {
|
||||
statsMu sync.Mutex // guard stats
|
||||
stats Statistics // Statistics of whisper node
|
||||
|
||||
- mailServer MailServer // MailServer interface
|
||||
+ mailServer MailServer // MailServer interface
|
||||
+ envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
|
||||
}
|
||||
|
||||
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
|
||||
@@ -211,6 +212,12 @@ func (whisper *Whisper) RegisterServer(server MailServer) {
|
||||
whisper.mailServer = server
|
||||
}
|
||||
|
||||
+// RegisterEnvelopeTracer registers an EnveloperTracer to collect information
|
||||
+// about received envelopes.
|
||||
+func (whisper *Whisper) RegisterEnvelopeTracer(tracer EnvelopeTracer) {
|
||||
+ whisper.envelopeTracer = tracer
|
||||
+}
|
||||
+
|
||||
// Protocols returns the whisper sub-protocols ran by this particular client.
|
||||
func (whisper *Whisper) Protocols() []p2p.Protocol {
|
||||
return []p2p.Protocol{whisper.protocol}
|
||||
@@ -736,6 +743,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||
|
||||
trouble := false
|
||||
for _, env := range envelopes {
|
||||
+ whisper.traceEnvelope(env, !whisper.isEnvelopeCached(env.Hash()), peerSource, p)
|
||||
cached, err := whisper.add(env, whisper.lightClient)
|
||||
if err != nil {
|
||||
trouble = true
|
||||
@@ -786,6 +794,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
||||
return errors.New("invalid direct message")
|
||||
}
|
||||
whisper.postEvent(&envelope, true)
|
||||
+ whisper.traceEnvelope(&envelope, false, p2pSource, p)
|
||||
}
|
||||
case p2pRequestCode:
|
||||
// Must be processed if mail server is implemented. Otherwise ignore.
|
||||
@@ -883,6 +892,22 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
+// traceEnvelope collects basic metadata about an envelope and sender peer.
|
||||
+func (whisper *Whisper) traceEnvelope(envelope *Envelope, isNew bool, source envelopeSource, peer *Peer) {
|
||||
+ if whisper.envelopeTracer == nil {
|
||||
+ return
|
||||
+ }
|
||||
+
|
||||
+ whisper.envelopeTracer.Trace(&EnvelopeMeta{
|
||||
+ Hash: envelope.Hash().String(),
|
||||
+ Topic: BytesToTopic(envelope.Topic[:]),
|
||||
+ Size: uint32(envelope.size()),
|
||||
+ Source: source,
|
||||
+ IsNew: isNew,
|
||||
+ Peer: peer.peer.Info().ID,
|
||||
+ })
|
||||
+}
|
||||
+
|
||||
// postEvent queues the message for further processing.
|
||||
func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) {
|
||||
if isP2P {
|
|
@ -41,7 +41,7 @@ index 79cc2127..018d8f82 100644
|
|||
+ Peer: peer.peer.ID(), // specifically discover.NodeID because it can be pretty printed
|
||||
+ })
|
||||
}
|
||||
|
||||
|
||||
log.Trace("broadcast", "num. messages", len(bundle))
|
||||
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
|
||||
index 414aa788..3c3c66ad 100644
|
||||
|
@ -55,19 +55,18 @@ index 414aa788..3c3c66ad 100644
|
|||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
@@ -89,6 +90,8 @@ type Whisper struct {
|
||||
|
||||
mailServer MailServer // MailServer interface
|
||||
envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
|
||||
@@ -89,5 +90,7 @@ type Whisper struct {
|
||||
|
||||
mailServer MailServer // MailServer interface
|
||||
+
|
||||
+ envelopeFeed event.Feed
|
||||
}
|
||||
|
||||
|
||||
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
|
||||
@@ -133,6 +136,12 @@ func New(cfg *Config) *Whisper {
|
||||
return whisper
|
||||
}
|
||||
|
||||
|
||||
+// SubscribeEnvelopeEvents subscribes to envelopes feed.
|
||||
+// In order to prevent blocking whisper producers events must be amply buffered.
|
||||
+func (whisper *Whisper) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription {
|
||||
|
|
|
@ -4,7 +4,7 @@ index 2de99f293..e0c3284b6 100644
|
|||
+++ b/whisper/whisperv6/api.go
|
||||
@@ -285,7 +285,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (hexutil.
|
||||
}
|
||||
|
||||
|
||||
var result []byte
|
||||
- env, err := whisperMsg.Wrap(params)
|
||||
+ env, err := whisperMsg.Wrap(params, api.w.GetCurrentTime())
|
||||
|
@ -16,9 +16,9 @@ index 61419de00..3c28263e5 100644
|
|||
--- a/whisper/whisperv6/config.go
|
||||
+++ b/whisper/whisperv6/config.go
|
||||
@@ -16,14 +16,18 @@
|
||||
|
||||
|
||||
package whisperv6
|
||||
|
||||
|
||||
+import "time"
|
||||
+
|
||||
// Config represents the configuration state of a whisper node.
|
||||
|
@ -27,7 +27,7 @@ index 61419de00..3c28263e5 100644
|
|||
MinimumAcceptedPOW float64 `toml:",omitempty"`
|
||||
+ TimeSource func() time.Time
|
||||
}
|
||||
|
||||
|
||||
// DefaultConfig represents (shocker!) the default configuration.
|
||||
var DefaultConfig = Config{
|
||||
MaxMessageSize: DefaultMaxMessageSize,
|
||||
|
@ -39,7 +39,7 @@ index c42d1fa8a..3b65fdba0 100644
|
|||
--- a/whisper/whisperv6/envelope.go
|
||||
+++ b/whisper/whisperv6/envelope.go
|
||||
@@ -62,9 +62,9 @@ func (e *Envelope) rlpWithoutNonce() []byte {
|
||||
|
||||
|
||||
// NewEnvelope wraps a Whisper message with expiration and destination data
|
||||
// included into an envelope for network forwarding.
|
||||
-func NewEnvelope(ttl uint32, topic TopicType, msg *sentMessage) *Envelope {
|
||||
|
@ -59,12 +59,12 @@ index 2d4e86244..a12b445e2 100644
|
|||
mrand "math/rand"
|
||||
"strconv"
|
||||
+ "time"
|
||||
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
@@ -234,7 +235,7 @@ func generateSecureRandomData(length int) ([]byte, error) {
|
||||
}
|
||||
|
||||
|
||||
// Wrap bundles the message into an Envelope to transmit over the network.
|
||||
-func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err error) {
|
||||
+func (msg *sentMessage) Wrap(options *MessageParams, now time.Time) (envelope *Envelope, err error) {
|
||||
|
@ -74,7 +74,7 @@ index 2d4e86244..a12b445e2 100644
|
|||
@@ -254,7 +255,7 @@ func (msg *sentMessage) Wrap(options *MessageParams) (envelope *Envelope, err er
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
- envelope = NewEnvelope(options.TTL, options.Topic, msg)
|
||||
+ envelope = NewEnvelope(options.TTL, options.Topic, msg, now)
|
||||
if err = envelope.Seal(options); err != nil {
|
||||
|
@ -84,14 +84,12 @@ diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
|
|||
index 8d56ece64..697f0ecb8 100644
|
||||
--- a/whisper/whisperv6/whisper.go
|
||||
+++ b/whisper/whisperv6/whisper.go
|
||||
@@ -92,6 +92,8 @@ type Whisper struct {
|
||||
envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
|
||||
|
||||
@@ -93,4 +92,6 @@ type Whisper struct {
|
||||
envelopeFeed event.Feed
|
||||
+
|
||||
+ timeSource func() time.Time // source of time for whisper
|
||||
}
|
||||
|
||||
|
||||
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
|
||||
@@ -110,6 +112,7 @@ func New(cfg *Config) *Whisper {
|
||||
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
|
||||
|
@ -99,12 +97,12 @@ index 8d56ece64..697f0ecb8 100644
|
|||
syncAllowance: DefaultSyncAllowance,
|
||||
+ timeSource: cfg.TimeSource,
|
||||
}
|
||||
|
||||
|
||||
whisper.filters = NewFilters(whisper)
|
||||
@@ -215,6 +218,11 @@ func (whisper *Whisper) APIs() []rpc.API {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+// GetCurrentTime returns current time.
|
||||
+func (whisper *Whisper) GetCurrentTime() time.Time {
|
||||
+ return whisper.timeSource()
|
||||
|
@ -120,7 +118,7 @@ index 8d56ece64..697f0ecb8 100644
|
|||
- now := uint32(time.Now().Unix())
|
||||
+ now := uint32(whisper.timeSource().Unix())
|
||||
sent := envelope.Expiry - envelope.TTL
|
||||
|
||||
|
||||
if sent > now {
|
||||
@@ -988,7 +996,7 @@ func (whisper *Whisper) expire() {
|
||||
whisper.statsMu.Lock()
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
diff --git a/whisper/whisperv6/metrics.go b/whisper/whisperv6/metrics.go
|
||||
new file mode 100644
|
||||
index 00000000..b0e899da
|
||||
--- /dev/null
|
||||
+++ b/whisper/whisperv6/metrics.go
|
||||
@@ -0,0 +1,16 @@
|
||||
+package whisperv6
|
||||
+
|
||||
+import "github.com/ethereum/go-ethereum/metrics"
|
||||
+
|
||||
+var (
|
||||
+ envelopeAddedCounter = metrics.NewRegisteredCounter("whisper/envelopeAdded", nil)
|
||||
+ envelopeNewAddedCounter = metrics.NewRegisteredCounter("whisper/envelopeNewAdded", nil)
|
||||
+ envelopeClearedCounter = metrics.NewRegisteredCounter("whisper/envelopeCleared", nil)
|
||||
+ envelopeErrFromFutureCounter = metrics.NewRegisteredCounter("whisper/envelopeErrFromFuture", nil)
|
||||
+ envelopeErrVeryOldCounter = metrics.NewRegisteredCounter("whisper/envelopeErrVeryOld", nil)
|
||||
+ envelopeErrExpiredCounter = metrics.NewRegisteredCounter("whisper/envelopeErrExpired", nil)
|
||||
+ envelopeErrOversizedCounter = metrics.NewRegisteredCounter("whisper/envelopeErrOversized", nil)
|
||||
+ envelopeErrLowPowCounter = metrics.NewRegisteredCounter("whisper/envelopeErrLowPow", nil)
|
||||
+ envelopeErrNoBloomMatchCounter = metrics.NewRegisteredCounter("whisper/envelopeErrNoBloomMatch", nil)
|
||||
+ envelopeSizeMeter = metrics.NewRegisteredMeter("whisper/envelopeSize", nil)
|
||||
+)
|
||||
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
|
||||
index 482457cb..002aaadf 100644
|
||||
--- a/whisper/whisperv6/whisper.go
|
||||
+++ b/whisper/whisperv6/whisper.go
|
||||
@@ -894,8 +894,11 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||
now := uint32(whisper.timeSource().Unix())
|
||||
sent := envelope.Expiry - envelope.TTL
|
||||
|
||||
+ envelopeAddedCounter.Inc(1)
|
||||
+
|
||||
if sent > now {
|
||||
if sent-DefaultSyncAllowance > now {
|
||||
+ envelopeErrFromFutureCounter.Inc(1)
|
||||
return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
|
||||
}
|
||||
// recalculate PoW, adjusted for the time difference, plus one second for latency
|
||||
@@ -904,13 +907,16 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||
|
||||
if envelope.Expiry < now {
|
||||
if envelope.Expiry+DefaultSyncAllowance*2 < now {
|
||||
+ envelopeErrVeryOldCounter.Inc(1)
|
||||
return false, fmt.Errorf("very old message")
|
||||
}
|
||||
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
|
||||
+ envelopeErrExpiredCounter.Inc(1)
|
||||
return false, nil // drop envelope without error
|
||||
}
|
||||
|
||||
if uint32(envelope.size()) > whisper.MaxMessageSize() {
|
||||
+ envelopeErrOversizedCounter.Inc(1)
|
||||
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
|
||||
}
|
||||
|
||||
@@ -919,6 +925,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||
// in this case the previous value is retrieved by MinPowTolerance()
|
||||
// for a short period of peer synchronization.
|
||||
if envelope.PoW() < whisper.MinPowTolerance() {
|
||||
+ envelopeErrLowPowCounter.Inc(1)
|
||||
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
|
||||
}
|
||||
}
|
||||
@@ -928,6 +935,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||
// in this case the previous value is retrieved by BloomFilterTolerance()
|
||||
// for a short period of peer synchronization.
|
||||
if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
|
||||
+ envelopeErrNoBloomMatchCounter.Inc(1)
|
||||
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
|
||||
envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
|
||||
}
|
||||
@@ -952,6 +960,8 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
||||
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
|
||||
} else {
|
||||
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
|
||||
+ envelopeNewAddedCounter.Inc(1)
|
||||
+ envelopeSizeMeter.Mark(int64(envelope.size()))
|
||||
whisper.statsMu.Lock()
|
||||
whisper.stats.memoryUsed += envelope.size()
|
||||
whisper.statsMu.Unlock()
|
||||
@@ -1053,6 +1063,7 @@ func (whisper *Whisper) expire() {
|
||||
hashSet.Each(func(v interface{}) bool {
|
||||
sz := whisper.envelopes[v.(common.Hash)].size()
|
||||
delete(whisper.envelopes, v.(common.Hash))
|
||||
+ envelopeClearedCounter.Inc(1)
|
||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||
Hash: v.(common.Hash),
|
||||
Event: EventEnvelopeExpired,
|
|
@ -1,25 +0,0 @@
|
|||
// Package whisper collects Whisper envelope metrics using expvar.
|
||||
package whisper
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||
)
|
||||
|
||||
var (
|
||||
envelopeCounter = metrics.NewRegisteredCounter("whisper/Envelope", nil)
|
||||
envelopeNewCounter = metrics.NewRegisteredCounter("whisper/EnvelopeNew", nil)
|
||||
envelopeMeter = metrics.NewRegisteredMeter("whisper/EnvelopeSize", nil)
|
||||
)
|
||||
|
||||
// EnvelopeTracer traces incoming envelopes.
|
||||
type EnvelopeTracer struct{}
|
||||
|
||||
// Trace is called for every incoming envelope.
|
||||
func (t *EnvelopeTracer) Trace(envelope *whisper.EnvelopeMeta) {
|
||||
envelopeCounter.Inc(1)
|
||||
if envelope.IsNew {
|
||||
envelopeNewCounter.Inc(1)
|
||||
}
|
||||
envelopeMeter.Mark(int64(envelope.Size))
|
||||
}
|
|
@ -22,7 +22,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/p2p/nat"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||
"github.com/status-im/status-go/mailserver"
|
||||
shhmetrics "github.com/status-im/status-go/metrics/whisper"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/services/peer"
|
||||
"github.com/status-im/status-go/services/personal"
|
||||
|
@ -253,9 +252,6 @@ func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb
|
|||
|
||||
whisperService := whisper.New(whisperServiceConfig)
|
||||
|
||||
// enable metrics
|
||||
whisperService.RegisterEnvelopeTracer(&shhmetrics.EnvelopeTracer{})
|
||||
|
||||
// enable mail service
|
||||
if config.WhisperConfig.EnableMailServer {
|
||||
if err := registerMailServer(whisperService, config.WhisperConfig); err != nil {
|
||||
|
|
|
@ -89,40 +89,3 @@ type MailServer interface {
|
|||
Archive(env *Envelope)
|
||||
DeliverMail(whisperPeer *Peer, request *Envelope)
|
||||
}
|
||||
|
||||
type envelopeSource int
|
||||
|
||||
const (
|
||||
_ = iota
|
||||
// peerSource indicates a source as a regular peer.
|
||||
peerSource envelopeSource = iota
|
||||
// p2pSource indicates that envelop was received from a trusted peer.
|
||||
p2pSource
|
||||
)
|
||||
|
||||
// EnvelopeMeta keeps metadata of received envelopes.
|
||||
type EnvelopeMeta struct {
|
||||
Hash string
|
||||
Topic TopicType
|
||||
Size uint32
|
||||
Source envelopeSource
|
||||
IsNew bool
|
||||
Peer string
|
||||
}
|
||||
|
||||
// SourceString converts source to string.
|
||||
func (m *EnvelopeMeta) SourceString() string {
|
||||
switch m.Source {
|
||||
case peerSource:
|
||||
return "peer"
|
||||
case p2pSource:
|
||||
return "p2p"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// EnvelopeTracer tracks received envelopes.
|
||||
type EnvelopeTracer interface {
|
||||
Trace(*EnvelopeMeta)
|
||||
}
|
||||
|
|
16
vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/metrics.go
generated
vendored
Normal file
16
vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/metrics.go
generated
vendored
Normal file
|
@ -0,0 +1,16 @@
|
|||
package whisperv6
|
||||
|
||||
import "github.com/ethereum/go-ethereum/metrics"
|
||||
|
||||
var (
|
||||
envelopeAddedCounter = metrics.NewRegisteredCounter("whisper/envelopeAdded", nil)
|
||||
envelopeNewAddedCounter = metrics.NewRegisteredCounter("whisper/envelopeNewAdded", nil)
|
||||
envelopeClearedCounter = metrics.NewRegisteredCounter("whisper/envelopeCleared", nil)
|
||||
envelopeErrFromFutureCounter = metrics.NewRegisteredCounter("whisper/envelopeErrFromFuture", nil)
|
||||
envelopeErrVeryOldCounter = metrics.NewRegisteredCounter("whisper/envelopeErrVeryOld", nil)
|
||||
envelopeErrExpiredCounter = metrics.NewRegisteredCounter("whisper/envelopeErrExpired", nil)
|
||||
envelopeErrOversizedCounter = metrics.NewRegisteredCounter("whisper/envelopeErrOversized", nil)
|
||||
envelopeErrLowPowCounter = metrics.NewRegisteredCounter("whisper/envelopeErrLowPow", nil)
|
||||
envelopeErrNoBloomMatchCounter = metrics.NewRegisteredCounter("whisper/envelopeErrNoBloomMatch", nil)
|
||||
envelopeSizeMeter = metrics.NewRegisteredMeter("whisper/envelopeSize", nil)
|
||||
)
|
|
@ -94,8 +94,7 @@ type Whisper struct {
|
|||
statsMu sync.Mutex // guard stats
|
||||
stats Statistics // Statistics of whisper node
|
||||
|
||||
mailServer MailServer // MailServer interface
|
||||
envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
|
||||
mailServer MailServer // MailServer interface
|
||||
|
||||
envelopeFeed event.Feed
|
||||
|
||||
|
@ -235,12 +234,6 @@ func (whisper *Whisper) RegisterServer(server MailServer) {
|
|||
whisper.mailServer = server
|
||||
}
|
||||
|
||||
// RegisterEnvelopeTracer registers an EnveloperTracer to collect information
|
||||
// about received envelopes.
|
||||
func (whisper *Whisper) RegisterEnvelopeTracer(tracer EnvelopeTracer) {
|
||||
whisper.envelopeTracer = tracer
|
||||
}
|
||||
|
||||
// Protocols returns the whisper sub-protocols ran by this particular client.
|
||||
func (whisper *Whisper) Protocols() []p2p.Protocol {
|
||||
return []p2p.Protocol{whisper.protocol}
|
||||
|
@ -775,7 +768,6 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
|||
|
||||
trouble := false
|
||||
for _, env := range envelopes {
|
||||
whisper.traceEnvelope(env, !whisper.isEnvelopeCached(env.Hash()), peerSource, p)
|
||||
cached, err := whisper.add(env, whisper.lightClient)
|
||||
if err != nil {
|
||||
trouble = true
|
||||
|
@ -826,7 +818,6 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
|||
return errors.New("invalid direct message")
|
||||
}
|
||||
whisper.postEvent(&envelope, true)
|
||||
whisper.traceEnvelope(&envelope, false, p2pSource, p)
|
||||
}
|
||||
case p2pRequestCode:
|
||||
// Must be processed if mail server is implemented. Otherwise ignore.
|
||||
|
@ -903,8 +894,11 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|||
now := uint32(whisper.timeSource().Unix())
|
||||
sent := envelope.Expiry - envelope.TTL
|
||||
|
||||
envelopeAddedCounter.Inc(1)
|
||||
|
||||
if sent > now {
|
||||
if sent-DefaultSyncAllowance > now {
|
||||
envelopeErrFromFutureCounter.Inc(1)
|
||||
return false, fmt.Errorf("envelope created in the future [%x]", envelope.Hash())
|
||||
}
|
||||
// recalculate PoW, adjusted for the time difference, plus one second for latency
|
||||
|
@ -913,13 +907,16 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|||
|
||||
if envelope.Expiry < now {
|
||||
if envelope.Expiry+DefaultSyncAllowance*2 < now {
|
||||
envelopeErrVeryOldCounter.Inc(1)
|
||||
return false, fmt.Errorf("very old message")
|
||||
}
|
||||
log.Debug("expired envelope dropped", "hash", envelope.Hash().Hex())
|
||||
envelopeErrExpiredCounter.Inc(1)
|
||||
return false, nil // drop envelope without error
|
||||
}
|
||||
|
||||
if uint32(envelope.size()) > whisper.MaxMessageSize() {
|
||||
envelopeErrOversizedCounter.Inc(1)
|
||||
return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash())
|
||||
}
|
||||
|
||||
|
@ -928,6 +925,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|||
// in this case the previous value is retrieved by MinPowTolerance()
|
||||
// for a short period of peer synchronization.
|
||||
if envelope.PoW() < whisper.MinPowTolerance() {
|
||||
envelopeErrLowPowCounter.Inc(1)
|
||||
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
|
||||
}
|
||||
}
|
||||
|
@ -937,6 +935,7 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|||
// in this case the previous value is retrieved by BloomFilterTolerance()
|
||||
// for a short period of peer synchronization.
|
||||
if !BloomFilterMatch(whisper.BloomFilterTolerance(), envelope.Bloom()) {
|
||||
envelopeErrNoBloomMatchCounter.Inc(1)
|
||||
return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x",
|
||||
envelope.Hash().Hex(), whisper.BloomFilter(), envelope.Bloom(), envelope.Topic)
|
||||
}
|
||||
|
@ -961,6 +960,8 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|||
log.Trace("whisper envelope already cached", "hash", envelope.Hash().Hex())
|
||||
} else {
|
||||
log.Trace("cached whisper envelope", "hash", envelope.Hash().Hex())
|
||||
envelopeNewAddedCounter.Inc(1)
|
||||
envelopeSizeMeter.Mark(int64(envelope.size()))
|
||||
whisper.statsMu.Lock()
|
||||
whisper.stats.memoryUsed += envelope.size()
|
||||
whisper.statsMu.Unlock()
|
||||
|
@ -976,22 +977,6 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// traceEnvelope collects basic metadata about an envelope and sender peer.
|
||||
func (whisper *Whisper) traceEnvelope(envelope *Envelope, isNew bool, source envelopeSource, peer *Peer) {
|
||||
if whisper.envelopeTracer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
whisper.envelopeTracer.Trace(&EnvelopeMeta{
|
||||
Hash: envelope.Hash().String(),
|
||||
Topic: BytesToTopic(envelope.Topic[:]),
|
||||
Size: uint32(envelope.size()),
|
||||
Source: source,
|
||||
IsNew: isNew,
|
||||
Peer: peer.peer.Info().ID,
|
||||
})
|
||||
}
|
||||
|
||||
// postEvent queues the message for further processing.
|
||||
func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) {
|
||||
if isP2P {
|
||||
|
@ -1078,6 +1063,7 @@ func (whisper *Whisper) expire() {
|
|||
hashSet.Each(func(v interface{}) bool {
|
||||
sz := whisper.envelopes[v.(common.Hash)].size()
|
||||
delete(whisper.envelopes, v.(common.Hash))
|
||||
envelopeClearedCounter.Inc(1)
|
||||
whisper.envelopeFeed.Send(EnvelopeEvent{
|
||||
Hash: v.(common.Hash),
|
||||
Event: EventEnvelopeExpired,
|
||||
|
|
Loading…
Reference in New Issue