Rate Limit tests (#953)

This commit is contained in:
diegomrsantos 2023-10-05 17:12:07 +02:00 committed by GitHub
parent 459f6851e7
commit 18b0f726df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 4 deletions

View File

@ -10,8 +10,9 @@
{.used.} {.used.}
import sequtils, options, tables, sets, sugar import sequtils, options, tables, sets, sugar
import chronos, stew/byteutils import chronos, stew/byteutils, chronos/ratelimit
import chronicles import chronicles
import metrics
import utils, ../../libp2p/[errors, import utils, ../../libp2p/[errors,
peerid, peerid,
peerinfo, peerinfo,
@ -20,6 +21,7 @@ import utils, ../../libp2p/[errors,
crypto/crypto, crypto/crypto,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/gossipsub, protocols/pubsub/gossipsub,
protocols/pubsub/gossipsub/scoring,
protocols/pubsub/pubsubpeer, protocols/pubsub/pubsubpeer,
protocols/pubsub/peertable, protocols/pubsub/peertable,
protocols/pubsub/timedcache, protocols/pubsub/timedcache,
@ -928,3 +930,99 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat()) await allFuturesThrowing(nodesFut.concat())
proc initializeGossipTest(): Future[(seq[PubSub], GossipSub, GossipSub)] {.async.} =
let nodes = generateNodes(
2,
gossip = true,
overheadRateLimit = Opt.some((20, 1.millis)))
discard await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
await subscribeNodes(nodes)
proc handle(topic: string, data: seq[byte]) {.async, gcsafe.} = discard
let gossip0 = GossipSub(nodes[0])
let gossip1 = GossipSub(nodes[1])
gossip0.subscribe("foobar", handle)
gossip1.subscribe("foobar", handle)
await waitSubGraph(nodes, "foobar")
return (nodes, gossip0, gossip1)
proc currentRateLimitHits(): float64 =
try:
libp2p_gossipsub_peers_rate_limit_hits.valueByName("libp2p_gossipsub_peers_rate_limit_hits_total", @["nim-libp2p"])
except KeyError:
0
asyncTest "e2e - GossipSub should not rate limit decodable messages below the size allowed":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()
let msg = RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: "Valid data".toBytes)])
gossip0.broadcast(gossip0.mesh["foobar"], msg)
await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], msg)
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
check currentRateLimitHits() == rateLimitHits
await stopNodes(nodes)
asyncTest "e2e - GossipSub should rate limit undecodable messages above the size allowed":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()
# Simulate sending an undecodable message
let msg = newSeqWith[byte](30, 1.byte)
await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(msg)
await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(msg)
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
await stopNodes(nodes)
asyncTest "e2e - GossipSub should rate limit decodable messages above the size allowed":
let rateLimitHits = currentRateLimitHits()
let (nodes, gossip0, gossip1) = await initializeGossipTest()
let msg = RPCMsg(control: some(ControlMessage(prune: @[
ControlPrune(topicID: "foobar", peers: @[
PeerInfoMsg(peerId: PeerId(data: newSeq[byte](30)))
], backoff: 123'u64)
])))
gossip0.broadcast(gossip0.mesh["foobar"], msg)
await sleepAsync(300.millis)
check currentRateLimitHits() == rateLimitHits + 1
check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true
# Disconnect peer when rate limiting is enabled
gossip1.parameters.disconnectPeerAboveRateLimit = true
gossip0.broadcast(gossip0.mesh["foobar"], msg)
checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false
check currentRateLimitHits() == rateLimitHits + 2
await stopNodes(nodes)

View File

@ -5,7 +5,7 @@ const
libp2p_pubsub_anonymize {.booldefine.} = false libp2p_pubsub_anonymize {.booldefine.} = false
import hashes, random, tables, sets, sequtils import hashes, random, tables, sets, sequtils
import chronos, stew/[byteutils, results] import chronos, stew/[byteutils, results], chronos/ratelimit
import ../../libp2p/[builders, import ../../libp2p/[builders,
protocols/pubsub/errors, protocols/pubsub/errors,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
@ -67,7 +67,8 @@ proc generateNodes*(
sendSignedPeerRecord = false, sendSignedPeerRecord = false,
unsubscribeBackoff = 1.seconds, unsubscribeBackoff = 1.seconds,
maxMessageSize: int = 1024 * 1024, maxMessageSize: int = 1024 * 1024,
enablePX: bool = false): seq[PubSub] = enablePX: bool = false,
overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = Opt.none(tuple[bytes: int, interval: Duration])): seq[PubSub] =
for i in 0..<num: for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers, sendSignedPeerRecord = sendSignedPeerRecord) let switch = newStandardSwitch(secureManagers = secureManagers, sendSignedPeerRecord = sendSignedPeerRecord)
@ -80,7 +81,7 @@ proc generateNodes*(
msgIdProvider = msgIdProvider, msgIdProvider = msgIdProvider,
anonymize = anonymize, anonymize = anonymize,
maxMessageSize = maxMessageSize, maxMessageSize = maxMessageSize,
parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = unsubscribeBackoff; p.enablePX = enablePX; p)) parameters = (var p = GossipSubParams.init(); p.floodPublish = false; p.historyLength = 20; p.historyGossip = 20; p.unsubscribeBackoff = unsubscribeBackoff; p.enablePX = enablePX; p.overheadRateLimit = overheadRateLimit; p))
# set some testing params, to enable scores # set some testing params, to enable scores
g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0 g.topicParams.mgetOrPut("foobar", TopicParams.init()).topicWeight = 1.0
g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0 g.topicParams.mgetOrPut("foo", TopicParams.init()).topicWeight = 1.0