Rework the metrics and add metrics for dropped envelopes due to full queue

This commit is contained in:
kdeme 2020-01-23 15:28:24 +01:00 committed by zah
parent 84015d0d1d
commit 3ee5651b7c
3 changed files with 42 additions and 39 deletions

View File

@ -47,8 +47,6 @@ export
logScope: logScope:
topics = "waku" topics = "waku"
declarePublicCounter valid_envelopes,
"Received & posted valid envelopes"
declarePublicCounter dropped_low_pow_envelopes, declarePublicCounter dropped_low_pow_envelopes,
"Dropped envelopes because of too low PoW" "Dropped envelopes because of too low PoW"
declarePublicCounter dropped_too_large_envelopes, declarePublicCounter dropped_too_large_envelopes,
@ -57,10 +55,8 @@ declarePublicCounter dropped_bloom_filter_mismatch_envelopes,
"Dropped envelopes because not matching with bloom filter" "Dropped envelopes because not matching with bloom filter"
declarePublicCounter dropped_topic_mismatch_envelopes, declarePublicCounter dropped_topic_mismatch_envelopes,
"Dropped envelopes because of not matching topics" "Dropped envelopes because of not matching topics"
declarePublicCounter dropped_benign_duplicate_envelopes, declarePublicCounter dropped_duplicate_envelopes,
"Dropped benign duplicate envelopes" "Dropped duplicate envelopes"
declarePublicCounter dropped_malicious_duplicate_envelopes,
"Dropped malicious duplicate envelopes"
const const
defaultQueueCapacity = 2048 defaultQueueCapacity = 2048
@ -272,8 +268,11 @@ p2pProtocol Waku(version = wakuVersion,
# broadcasting this message. This too is seen here as a duplicate message # broadcasting this message. This too is seen here as a duplicate message
# (see above comment). If we want to seperate these cases (e.g. when peer # (see above comment). If we want to seperate these cases (e.g. when peer
# rating), then we have to add a "peer.state.send" HashSet. # rating), then we have to add a "peer.state.send" HashSet.
# Note: it could also be a race between the arrival of a message send by
# this node to a peer and that same message arriving from that peer (after
# it was received from another peer) here.
if peer.state.received.containsOrIncl(msg.hash): if peer.state.received.containsOrIncl(msg.hash):
dropped_malicious_duplicate_envelopes.inc() dropped_duplicate_envelopes.inc()
trace "Peer sending duplicate messages", peer, hash = $msg.hash trace "Peer sending duplicate messages", peer, hash = $msg.hash
# await peer.disconnect(SubprotocolReason) # await peer.disconnect(SubprotocolReason)
continue continue
@ -281,11 +280,8 @@ p2pProtocol Waku(version = wakuVersion,
# This can still be a duplicate message, but from another peer than # This can still be a duplicate message, but from another peer than
# the peer who send the message. # the peer who send the message.
if peer.networkState.queue[].add(msg): if peer.networkState.queue[].add(msg):
valid_envelopes.inc()
# notify filters of this message # notify filters of this message
peer.networkState.filters.notify(msg) peer.networkState.filters.notify(msg)
else:
dropped_benign_duplicate_envelopes.inc()
proc powRequirement(peer: Peer, value: uint64) = proc powRequirement(peer: Peer, value: uint64) =
if not peer.state.initialized: if not peer.state.initialized:
@ -444,7 +440,6 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
trace "Adding message to queue", hash = $msg.hash trace "Adding message to queue", hash = $msg.hash
if wakuNet.queue[].add(msg): if wakuNet.queue[].add(msg):
valid_envelopes.inc()
# Also notify our own filters of the message we are sending, # Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp # e.g. msg from local Dapp to Dapp
wakuNet.filters.notify(msg) wakuNet.filters.notify(msg)

View File

@ -17,10 +17,18 @@ import
logScope: logScope:
topics = "whisper_types" topics = "whisper_types"
declarePublicCounter valid_envelopes,
"Received & posted valid envelopes"
declarePublicCounter dropped_benign_duplicate_envelopes,
"Dropped benign duplicate envelopes"
declarePublicCounter dropped_expired_envelopes, declarePublicCounter dropped_expired_envelopes,
"Dropped envelopes because expired" "Dropped envelopes because expired"
declarePublicCounter dropped_from_future_envelopes, declarePublicCounter dropped_from_future_envelopes,
"Dropped envelopes because of future timestamp" "Dropped envelopes because of future timestamp"
declarePublicCounter dropped_full_queue_new_envelopes,
"New valid envelopes dropped because of full queue"
declarePublicCounter dropped_full_queue_old_envelopes,
"Old valid envelopes dropped because of full queue"
const const
flagsLen = 1 ## payload flags field length, bytes flagsLen = 1 ## payload flags field length, bytes
@ -549,25 +557,30 @@ proc add*(self: var Queue, msg: Message): bool =
## * expired messages ## * expired messages
## * lowest proof-of-work message - this may be `msg` itself! ## * lowest proof-of-work message - this may be `msg` itself!
if self.items.len >= self.capacity: # check for duplicate before pruning
self.prune() # Only prune if needed if self.itemHashes.contains(msg.hash):
dropped_benign_duplicate_envelopes.inc()
if self.items.len >= self.capacity:
# Still no room - go by proof-of-work quantity
let last = self.items[^1]
if last.pow > msg.pow or
(last.pow == msg.pow and last.env.expiry > msg.env.expiry):
# The new message has less pow or will expire earlier - drop it
return false
self.items.del(self.items.len() - 1)
self.itemHashes.excl(last.hash)
# check for duplicate
if self.itemHashes.containsOrIncl(msg.hash):
return false return false
else: else:
valid_envelopes.inc()
if self.items.len >= self.capacity:
self.prune() # Only prune if needed
if self.items.len >= self.capacity:
# Still no room - go by proof-of-work quantity
let last = self.items[^1]
if last.pow > msg.pow or
(last.pow == msg.pow and last.env.expiry > msg.env.expiry):
# The new message has less pow or will expire earlier - drop it
dropped_full_queue_new_envelopes.inc()
return false
self.items.del(self.items.len() - 1)
self.itemHashes.excl(last.hash)
dropped_full_queue_old_envelopes.inc()
self.itemHashes.incl(msg.hash)
self.items.insert(msg, self.items.lowerBound(msg, cmpPow)) self.items.insert(msg, self.items.lowerBound(msg, cmpPow))
return true return true

View File

@ -42,18 +42,14 @@ export
logScope: logScope:
topics = "whisper" topics = "whisper"
declarePublicCounter valid_envelopes,
"Received & posted valid envelopes"
declarePublicCounter dropped_low_pow_envelopes, declarePublicCounter dropped_low_pow_envelopes,
"Dropped envelopes because of too low PoW" "Dropped envelopes because of too low PoW"
declarePublicCounter dropped_too_large_envelopes, declarePublicCounter dropped_too_large_envelopes,
"Dropped envelopes because larger than maximum allowed size" "Dropped envelopes because larger than maximum allowed size"
declarePublicCounter dropped_bloom_filter_mismatch_envelopes, declarePublicCounter dropped_bloom_filter_mismatch_envelopes,
"Dropped envelopes because not matching with bloom filter" "Dropped envelopes because not matching with bloom filter"
declarePublicCounter dropped_benign_duplicate_envelopes, declarePublicCounter dropped_duplicate_envelopes,
"Dropped benign duplicate envelopes" "Dropped duplicate envelopes"
declarePublicCounter dropped_malicious_duplicate_envelopes,
"Dropped malicious duplicate envelopes"
const const
defaultQueueCapacity = 2048 defaultQueueCapacity = 2048
@ -202,8 +198,11 @@ p2pProtocol Whisper(version = whisperVersion,
# broadcasting this message. This too is seen here as a duplicate message # broadcasting this message. This too is seen here as a duplicate message
# (see above comment). If we want to seperate these cases (e.g. when peer # (see above comment). If we want to seperate these cases (e.g. when peer
# rating), then we have to add a "peer.state.send" HashSet. # rating), then we have to add a "peer.state.send" HashSet.
# Note: it could also be a race between the arrival of a message send by
# this node to a peer and that same message arriving from that peer (after
# it was received from another peer) here.
if peer.state.received.containsOrIncl(msg.hash): if peer.state.received.containsOrIncl(msg.hash):
dropped_malicious_duplicate_envelopes.inc() dropped_duplicate_envelopes.inc()
trace "Peer sending duplicate messages", peer, hash = $msg.hash trace "Peer sending duplicate messages", peer, hash = $msg.hash
# await peer.disconnect(SubprotocolReason) # await peer.disconnect(SubprotocolReason)
continue continue
@ -211,11 +210,8 @@ p2pProtocol Whisper(version = whisperVersion,
# This can still be a duplicate message, but from another peer than # This can still be a duplicate message, but from another peer than
# the peer who send the message. # the peer who send the message.
if peer.networkState.queue[].add(msg): if peer.networkState.queue[].add(msg):
valid_envelopes.inc()
# notify filters of this message # notify filters of this message
peer.networkState.filters.notify(msg) peer.networkState.filters.notify(msg)
else:
dropped_benign_duplicate_envelopes.inc()
proc powRequirement(peer: Peer, value: uint64) = proc powRequirement(peer: Peer, value: uint64) =
if not peer.state.initialized: if not peer.state.initialized:
@ -337,7 +333,6 @@ proc queueMessage(node: EthereumNode, msg: Message): bool =
trace "Adding message to queue", hash = $msg.hash trace "Adding message to queue", hash = $msg.hash
if whisperNet.queue[].add(msg): if whisperNet.queue[].add(msg):
valid_envelopes.inc()
# Also notify our own filters of the message we are sending, # Also notify our own filters of the message we are sending,
# e.g. msg from local Dapp to Dapp # e.g. msg from local Dapp to Dapp
whisperNet.filters.notify(msg) whisperNet.filters.notify(msg)