From a2027003cd4f8fe34b7c66999d33f950b0f660c0 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 21 Mar 2024 14:11:40 +0100 Subject: [PATCH] Avoid unnecessary rate limit message copy (#1067) --- libp2p/protocols/pubsub/gossipsub.nim | 45 +++++++++++---------------- tests/pubsub/testgossipinternal.nim | 11 +++++++ 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 079808129..d93b8b9b5 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -398,34 +398,26 @@ proc validateAndRelay(g: GossipSub, proc dataAndTopicsIdSize(msgs: seq[Message]): int = msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0) -proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.async.} = +proc messageOverhead(g: GossipSub, msg: RPCMsg, msgSize: int): int = # In this way we count even ignored fields by protobuf + let + payloadSize = + if g.verifySignature: + byteSize(msg.messages) + else: + dataAndTopicsIdSize(msg.messages) + controlSize = msg.control.withValue(control): + byteSize(control.ihave) + byteSize(control.iwant) + do: # no control message + 0 - var rmsg = rpcMsgOpt.valueOr: - peer.overheadRateLimitOpt.withValue(overheadRateLimit): - if not overheadRateLimit.tryConsume(msgSize): - libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - debug "Peer sent a msg that couldn't be decoded and it's above rate limit.", peer, uselessAppBytesNum = msgSize - if g.parameters.disconnectPeerAboveRateLimit: - await g.disconnectPeer(peer) - raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.") - - raise newException(CatchableError, "Peer msg couldn't be decoded") - - let usefulMsgBytesNum = - if g.verifySignature: - byteSize(rmsg.messages) - else: - dataAndTopicsIdSize(rmsg.messages) - - var uselessAppBytesNum = msgSize - usefulMsgBytesNum - rmsg.control.withValue(control): - uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant)) + msgSize - payloadSize - controlSize +proc rateLimit*(g: GossipSub, peer: PubSubPeer, overhead: int) {.async.} = peer.overheadRateLimitOpt.withValue(overheadRateLimit): - if not overheadRateLimit.tryConsume(uselessAppBytesNum): + if not overheadRateLimit.tryConsume(overhead): libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes. - debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum, rmsg + debug "Peer sent too much useless application data and it's above rate limit.", peer, overhead if g.parameters.disconnectPeerAboveRateLimit: await g.disconnectPeer(peer) raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.") @@ -433,12 +425,11 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: method rpcHandler*(g: GossipSub, peer: PubSubPeer, data: seq[byte]) {.async.} = - let msgSize = data.len var rpcMsg = decodeRpcMsg(data).valueOr: debug "failed to decode msg from peer", peer, err = error - await rateLimit(g, peer, Opt.none(RPCMsg), msgSize) - return + await rateLimit(g, peer, msgSize) + raise newException(CatchableError, "Peer msg couldn't be decoded") when defined(libp2p_expensive_metrics): for m in rpcMsg.messages: @@ -446,7 +437,7 @@ method rpcHandler*(g: GossipSub, libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, t]) trace "decoded msg from peer", peer, msg = rpcMsg.shortLog - await rateLimit(g, peer, Opt.some(rpcMsg), msgSize) + await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize)) # trigger hooks peer.recvObservers(rpcMsg) diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index e3a99721f..971029832 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -525,6 +525,17 @@ suite "GossipSub internal": await conn.close() await gossipSub.switch.stop() + asyncTest "invalid message bytes": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let peerId = randomPeerId() + let peer = gossipSub.getPubSubPeer(peerId) + + expect(CatchableError): + await gossipSub.rpcHandler(peer, @[byte 1, 2, 3]) + + await gossipSub.switch.stop() + asyncTest "rebalanceMesh fail due to backoff": let gossipSub = TestGossipSub.init(newStandardSwitch()) let topic = "foobar"