mirror of https://github.com/vacp2p/nim-libp2p.git
Avoid unnecessary rate limit message copy (#1067)
This commit is contained in:
parent
c5db35d9b0
commit
a2027003cd
|
@ -398,34 +398,26 @@ proc validateAndRelay(g: GossipSub,
|
||||||
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
|
proc dataAndTopicsIdSize(msgs: seq[Message]): int =
|
||||||
msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0)
|
msgs.mapIt(it.data.len + it.topicIds.mapIt(it.len).foldl(a + b, 0)).foldl(a + b, 0)
|
||||||
|
|
||||||
proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize: int) {.async.} =
|
proc messageOverhead(g: GossipSub, msg: RPCMsg, msgSize: int): int =
|
||||||
# In this way we count even ignored fields by protobuf
|
# In this way we count even ignored fields by protobuf
|
||||||
|
let
|
||||||
|
payloadSize =
|
||||||
|
if g.verifySignature:
|
||||||
|
byteSize(msg.messages)
|
||||||
|
else:
|
||||||
|
dataAndTopicsIdSize(msg.messages)
|
||||||
|
controlSize = msg.control.withValue(control):
|
||||||
|
byteSize(control.ihave) + byteSize(control.iwant)
|
||||||
|
do: # no control message
|
||||||
|
0
|
||||||
|
|
||||||
var rmsg = rpcMsgOpt.valueOr:
|
msgSize - payloadSize - controlSize
|
||||||
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
|
|
||||||
if not overheadRateLimit.tryConsume(msgSize):
|
|
||||||
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
|
|
||||||
debug "Peer sent a msg that couldn't be decoded and it's above rate limit.", peer, uselessAppBytesNum = msgSize
|
|
||||||
if g.parameters.disconnectPeerAboveRateLimit:
|
|
||||||
await g.disconnectPeer(peer)
|
|
||||||
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")
|
|
||||||
|
|
||||||
raise newException(CatchableError, "Peer msg couldn't be decoded")
|
|
||||||
|
|
||||||
let usefulMsgBytesNum =
|
|
||||||
if g.verifySignature:
|
|
||||||
byteSize(rmsg.messages)
|
|
||||||
else:
|
|
||||||
dataAndTopicsIdSize(rmsg.messages)
|
|
||||||
|
|
||||||
var uselessAppBytesNum = msgSize - usefulMsgBytesNum
|
|
||||||
rmsg.control.withValue(control):
|
|
||||||
uselessAppBytesNum -= (byteSize(control.ihave) + byteSize(control.iwant))
|
|
||||||
|
|
||||||
|
proc rateLimit*(g: GossipSub, peer: PubSubPeer, overhead: int) {.async.} =
|
||||||
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
|
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
|
||||||
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
|
if not overheadRateLimit.tryConsume(overhead):
|
||||||
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
|
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
|
||||||
debug "Peer sent too much useless application data and it's above rate limit.", peer, msgSize, uselessAppBytesNum, rmsg
|
debug "Peer sent too much useless application data and it's above rate limit.", peer, overhead
|
||||||
if g.parameters.disconnectPeerAboveRateLimit:
|
if g.parameters.disconnectPeerAboveRateLimit:
|
||||||
await g.disconnectPeer(peer)
|
await g.disconnectPeer(peer)
|
||||||
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")
|
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")
|
||||||
|
@ -433,12 +425,11 @@ proc rateLimit*(g: GossipSub, peer: PubSubPeer, rpcMsgOpt: Opt[RPCMsg], msgSize:
|
||||||
method rpcHandler*(g: GossipSub,
|
method rpcHandler*(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
data: seq[byte]) {.async.} =
|
data: seq[byte]) {.async.} =
|
||||||
|
|
||||||
let msgSize = data.len
|
let msgSize = data.len
|
||||||
var rpcMsg = decodeRpcMsg(data).valueOr:
|
var rpcMsg = decodeRpcMsg(data).valueOr:
|
||||||
debug "failed to decode msg from peer", peer, err = error
|
debug "failed to decode msg from peer", peer, err = error
|
||||||
await rateLimit(g, peer, Opt.none(RPCMsg), msgSize)
|
await rateLimit(g, peer, msgSize)
|
||||||
return
|
raise newException(CatchableError, "Peer msg couldn't be decoded")
|
||||||
|
|
||||||
when defined(libp2p_expensive_metrics):
|
when defined(libp2p_expensive_metrics):
|
||||||
for m in rpcMsg.messages:
|
for m in rpcMsg.messages:
|
||||||
|
@ -446,7 +437,7 @@ method rpcHandler*(g: GossipSub,
|
||||||
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, t])
|
libp2p_pubsub_received_messages.inc(labelValues = [$peer.peerId, t])
|
||||||
|
|
||||||
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
|
||||||
await rateLimit(g, peer, Opt.some(rpcMsg), msgSize)
|
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
|
||||||
|
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
peer.recvObservers(rpcMsg)
|
peer.recvObservers(rpcMsg)
|
||||||
|
|
|
@ -525,6 +525,17 @@ suite "GossipSub internal":
|
||||||
await conn.close()
|
await conn.close()
|
||||||
await gossipSub.switch.stop()
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
|
asyncTest "invalid message bytes":
|
||||||
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||||
|
|
||||||
|
let peerId = randomPeerId()
|
||||||
|
let peer = gossipSub.getPubSubPeer(peerId)
|
||||||
|
|
||||||
|
expect(CatchableError):
|
||||||
|
await gossipSub.rpcHandler(peer, @[byte 1, 2, 3])
|
||||||
|
|
||||||
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
asyncTest "rebalanceMesh fail due to backoff":
|
asyncTest "rebalanceMesh fail due to backoff":
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||||
let topic = "foobar"
|
let topic = "foobar"
|
||||||
|
|
Loading…
Reference in New Issue