mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-03 00:24:00 +00:00
Rate limit fixes (#965)
This commit is contained in:
parent
3fc1236659
commit
ebef85c9d7
@ -318,7 +318,7 @@ proc validateAndRelay(g: GossipSub,
|
||||
of ValidationResult.Reject:
|
||||
debug "Dropping message after validation, reason: reject",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishInvalidMessage(peer, msg)
|
||||
await g.punishInvalidMessage(peer, msg)
|
||||
return
|
||||
of ValidationResult.Ignore:
|
||||
debug "Dropping message after validation, reason: ignore",
|
||||
@ -492,14 +492,14 @@ method rpcHandler*(g: GossipSub,
|
||||
# always validate if signature is present or required
|
||||
debug "Dropping message due to failed signature verification",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishInvalidMessage(peer, msg)
|
||||
await g.punishInvalidMessage(peer, msg)
|
||||
continue
|
||||
|
||||
if msg.seqno.len > 0 and msg.seqno.len != 8:
|
||||
# if we have seqno should be 8 bytes long
|
||||
debug "Dropping message due to invalid seqno length",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishInvalidMessage(peer, msg)
|
||||
await g.punishInvalidMessage(peer, msg)
|
||||
continue
|
||||
|
||||
# g.anonymize needs no evaluation when receiving messages
|
||||
|
@ -240,15 +240,15 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} =
|
||||
trace "running scoring heartbeat", instance = cast[int](g)
|
||||
g.updateScores()
|
||||
|
||||
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) =
|
||||
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, msg: Message) {.async.} =
|
||||
let uselessAppBytesNum = msg.data.len
|
||||
peer.overheadRateLimitOpt.withValue(overheadRateLimit):
|
||||
if not overheadRateLimit.tryConsume(uselessAppBytesNum):
|
||||
debug "Peer sent invalid message and it's above rate limit", peer, uselessAppBytesNum
|
||||
libp2p_gossipsub_peers_rate_limit_hits.inc(labelValues = [peer.getAgent()]) # let's just measure at the beginning for test purposes.
|
||||
# discard g.disconnectPeer(peer)
|
||||
# debug "Peer disconnected", peer, uselessAppBytesNum
|
||||
# raise newException(PeerRateLimitError, "Peer sent invalid message and it's above rate limit")
|
||||
if g.parameters.disconnectPeerAboveRateLimit:
|
||||
await g.disconnectPeer(peer)
|
||||
raise newException(PeerRateLimitError, "Peer disconnected because it's above rate limit.")
|
||||
|
||||
|
||||
for tt in msg.topicIds:
|
||||
|
@ -952,6 +952,10 @@ suite "GossipSub":
|
||||
gossip1.subscribe("foobar", handle)
|
||||
await waitSubGraph(nodes, "foobar")
|
||||
|
||||
# Avoid being disconnected by failing signature verification
|
||||
gossip0.verifySignature = false
|
||||
gossip1.verifySignature = false
|
||||
|
||||
return (nodes, gossip0, gossip1)
|
||||
|
||||
proc currentRateLimitHits(): float64 =
|
||||
@ -964,8 +968,7 @@ suite "GossipSub":
|
||||
let rateLimitHits = currentRateLimitHits()
|
||||
let (nodes, gossip0, gossip1) = await initializeGossipTest()
|
||||
|
||||
let msg = RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: "Valid data".toBytes)])
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg)
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](10))]))
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check currentRateLimitHits() == rateLimitHits
|
||||
@ -973,9 +976,10 @@ suite "GossipSub":
|
||||
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg)
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: newSeq[byte](12))]))
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
|
||||
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
|
||||
check currentRateLimitHits() == rateLimitHits
|
||||
|
||||
await stopNodes(nodes)
|
||||
@ -986,8 +990,7 @@ suite "GossipSub":
|
||||
let (nodes, gossip0, gossip1) = await initializeGossipTest()
|
||||
|
||||
# Simulate sending an undecodable message
|
||||
let msg = newSeqWith[byte](30, 1.byte)
|
||||
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(msg)
|
||||
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](33, 1.byte))
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check currentRateLimitHits() == rateLimitHits + 1
|
||||
@ -995,7 +998,7 @@ suite "GossipSub":
|
||||
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(msg)
|
||||
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(newSeqWith[byte](35, 1.byte))
|
||||
|
||||
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
check currentRateLimitHits() == rateLimitHits + 2
|
||||
@ -1008,10 +1011,9 @@ suite "GossipSub":
|
||||
|
||||
let msg = RPCMsg(control: some(ControlMessage(prune: @[
|
||||
ControlPrune(topicID: "foobar", peers: @[
|
||||
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](30)))
|
||||
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](33)))
|
||||
], backoff: 123'u64)
|
||||
])))
|
||||
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
@ -1020,7 +1022,42 @@ suite "GossipSub":
|
||||
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg)
|
||||
let msg2 = RPCMsg(control: some(ControlMessage(prune: @[
|
||||
ControlPrune(topicID: "foobar", peers: @[
|
||||
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](35)))
|
||||
], backoff: 123'u64)
|
||||
])))
|
||||
gossip0.broadcast(gossip0.mesh["foobar"], msg2)
|
||||
|
||||
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
check currentRateLimitHits() == rateLimitHits + 2
|
||||
|
||||
await stopNodes(nodes)
|
||||
|
||||
asyncTest "e2e - GossipSub should rate limit invalid messages above the size allowed":
|
||||
let rateLimitHits = currentRateLimitHits()
|
||||
let (nodes, gossip0, gossip1) = await initializeGossipTest()
|
||||
|
||||
let topic = "foobar"
|
||||
proc execValidator(topic: string, message: messages.Message): Future[ValidationResult] {.raises: [].} =
|
||||
let res = newFuture[ValidationResult]()
|
||||
res.complete(ValidationResult.Reject)
|
||||
res
|
||||
|
||||
gossip0.addValidator(topic, execValidator)
|
||||
gossip1.addValidator(topic, execValidator)
|
||||
|
||||
let msg = RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](40))])
|
||||
|
||||
gossip0.broadcast(gossip0.mesh[topic], msg)
|
||||
await sleepAsync(300.millis)
|
||||
|
||||
check currentRateLimitHits() == rateLimitHits + 1
|
||||
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
|
||||
|
||||
# Disconnect peer when rate limiting is enabled
|
||||
gossip1.parameters.disconnectPeerAboveRateLimit = true
|
||||
gossip0.broadcast(gossip0.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]))
|
||||
|
||||
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
|
||||
check currentRateLimitHits() == rateLimitHits + 2
|
||||
|
Loading…
x
Reference in New Issue
Block a user