diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index c53b5d9..78bdf41 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -47,8 +47,6 @@ export logScope: topics = "waku" -declarePublicCounter valid_envelopes, - "Received & posted valid envelopes" declarePublicCounter dropped_low_pow_envelopes, "Dropped envelopes because of too low PoW" declarePublicCounter dropped_too_large_envelopes, @@ -57,10 +55,8 @@ declarePublicCounter dropped_bloom_filter_mismatch_envelopes, "Dropped envelopes because not matching with bloom filter" declarePublicCounter dropped_topic_mismatch_envelopes, "Dropped envelopes because of not matching topics" -declarePublicCounter dropped_benign_duplicate_envelopes, - "Dropped benign duplicate envelopes" -declarePublicCounter dropped_malicious_duplicate_envelopes, - "Dropped malicious duplicate envelopes" +declarePublicCounter dropped_duplicate_envelopes, + "Dropped duplicate envelopes" const defaultQueueCapacity = 2048 @@ -272,8 +268,11 @@ p2pProtocol Waku(version = wakuVersion, # broadcasting this message. This too is seen here as a duplicate message # (see above comment). If we want to seperate these cases (e.g. when peer # rating), then we have to add a "peer.state.send" HashSet. + # Note: it could also be a race between the arrival of a message send by + # this node to a peer and that same message arriving from that peer (after + # it was received from another peer) here. if peer.state.received.containsOrIncl(msg.hash): - dropped_malicious_duplicate_envelopes.inc() + dropped_duplicate_envelopes.inc() trace "Peer sending duplicate messages", peer, hash = $msg.hash # await peer.disconnect(SubprotocolReason) continue @@ -281,11 +280,8 @@ p2pProtocol Waku(version = wakuVersion, # This can still be a duplicate message, but from another peer than # the peer who send the message. if peer.networkState.queue[].add(msg): - valid_envelopes.inc() # notify filters of this message peer.networkState.filters.notify(msg) - else: - dropped_benign_duplicate_envelopes.inc() proc powRequirement(peer: Peer, value: uint64) = if not peer.state.initialized: @@ -444,7 +440,6 @@ proc queueMessage(node: EthereumNode, msg: Message): bool = trace "Adding message to queue", hash = $msg.hash if wakuNet.queue[].add(msg): - valid_envelopes.inc() # Also notify our own filters of the message we are sending, # e.g. msg from local Dapp to Dapp wakuNet.filters.notify(msg) diff --git a/eth/p2p/rlpx_protocols/whisper/whisper_types.nim b/eth/p2p/rlpx_protocols/whisper/whisper_types.nim index 9197135..3b953de 100644 --- a/eth/p2p/rlpx_protocols/whisper/whisper_types.nim +++ b/eth/p2p/rlpx_protocols/whisper/whisper_types.nim @@ -17,10 +17,18 @@ import logScope: topics = "whisper_types" +declarePublicCounter valid_envelopes, + "Received & posted valid envelopes" +declarePublicCounter dropped_benign_duplicate_envelopes, + "Dropped benign duplicate envelopes" declarePublicCounter dropped_expired_envelopes, "Dropped envelopes because expired" declarePublicCounter dropped_from_future_envelopes, "Dropped envelopes because of future timestamp" +declarePublicCounter dropped_full_queue_new_envelopes, + "New valid envelopes dropped because of full queue" +declarePublicCounter dropped_full_queue_old_envelopes, + "Old valid envelopes dropped because of full queue" const flagsLen = 1 ## payload flags field length, bytes @@ -549,25 +557,30 @@ proc add*(self: var Queue, msg: Message): bool = ## * expired messages ## * lowest proof-of-work message - this may be `msg` itself! - if self.items.len >= self.capacity: - self.prune() # Only prune if needed - - if self.items.len >= self.capacity: - # Still no room - go by proof-of-work quantity - let last = self.items[^1] - - if last.pow > msg.pow or - (last.pow == msg.pow and last.env.expiry > msg.env.expiry): - # The new message has less pow or will expire earlier - drop it - return false - - self.items.del(self.items.len() - 1) - self.itemHashes.excl(last.hash) - - # check for duplicate - if self.itemHashes.containsOrIncl(msg.hash): + # check for duplicate before pruning + if self.itemHashes.contains(msg.hash): + dropped_benign_duplicate_envelopes.inc() return false else: + valid_envelopes.inc() + if self.items.len >= self.capacity: + self.prune() # Only prune if needed + + if self.items.len >= self.capacity: + # Still no room - go by proof-of-work quantity + let last = self.items[^1] + + if last.pow > msg.pow or + (last.pow == msg.pow and last.env.expiry > msg.env.expiry): + # The new message has less pow or will expire earlier - drop it + dropped_full_queue_new_envelopes.inc() + return false + + self.items.del(self.items.len() - 1) + self.itemHashes.excl(last.hash) + dropped_full_queue_old_envelopes.inc() + + self.itemHashes.incl(msg.hash) self.items.insert(msg, self.items.lowerBound(msg, cmpPow)) return true diff --git a/eth/p2p/rlpx_protocols/whisper_protocol.nim b/eth/p2p/rlpx_protocols/whisper_protocol.nim index c83b669..a1d6bfa 100644 --- a/eth/p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth/p2p/rlpx_protocols/whisper_protocol.nim @@ -42,18 +42,14 @@ export logScope: topics = "whisper" -declarePublicCounter valid_envelopes, - "Received & posted valid envelopes" declarePublicCounter dropped_low_pow_envelopes, "Dropped envelopes because of too low PoW" declarePublicCounter dropped_too_large_envelopes, "Dropped envelopes because larger than maximum allowed size" declarePublicCounter dropped_bloom_filter_mismatch_envelopes, "Dropped envelopes because not matching with bloom filter" -declarePublicCounter dropped_benign_duplicate_envelopes, - "Dropped benign duplicate envelopes" -declarePublicCounter dropped_malicious_duplicate_envelopes, - "Dropped malicious duplicate envelopes" +declarePublicCounter dropped_duplicate_envelopes, + "Dropped duplicate envelopes" const defaultQueueCapacity = 2048 @@ -202,8 +198,11 @@ p2pProtocol Whisper(version = whisperVersion, # broadcasting this message. This too is seen here as a duplicate message # (see above comment). If we want to seperate these cases (e.g. when peer # rating), then we have to add a "peer.state.send" HashSet. + # Note: it could also be a race between the arrival of a message send by + # this node to a peer and that same message arriving from that peer (after + # it was received from another peer) here. if peer.state.received.containsOrIncl(msg.hash): - dropped_malicious_duplicate_envelopes.inc() + dropped_duplicate_envelopes.inc() trace "Peer sending duplicate messages", peer, hash = $msg.hash # await peer.disconnect(SubprotocolReason) continue @@ -211,11 +210,8 @@ p2pProtocol Whisper(version = whisperVersion, # This can still be a duplicate message, but from another peer than # the peer who send the message. if peer.networkState.queue[].add(msg): - valid_envelopes.inc() # notify filters of this message peer.networkState.filters.notify(msg) - else: - dropped_benign_duplicate_envelopes.inc() proc powRequirement(peer: Peer, value: uint64) = if not peer.state.initialized: @@ -337,7 +333,6 @@ proc queueMessage(node: EthereumNode, msg: Message): bool = trace "Adding message to queue", hash = $msg.hash if whisperNet.queue[].add(msg): - valid_envelopes.inc() # Also notify our own filters of the message we are sending, # e.g. msg from local Dapp to Dapp whisperNet.filters.notify(msg)