diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 9a4d9aa..25562b3 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -10,8 +10,9 @@ {.used.} import sequtils, options, tables, sets, sugar -import chronos, stew/byteutils +import chronos, stew/byteutils, chronos/ratelimit import chronicles +import metrics import utils, ../../libp2p/[errors, peerid, peerinfo, @@ -20,6 +21,7 @@ import utils, ../../libp2p/[errors, crypto/crypto, protocols/pubsub/pubsub, protocols/pubsub/gossipsub, + protocols/pubsub/gossipsub/scoring, protocols/pubsub/pubsubpeer, protocols/pubsub/peertable, protocols/pubsub/timedcache, @@ -928,3 +930,99 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + proc initializeGossipTest(): Future[(seq[PubSub], GossipSub, GossipSub)] {.async.} = + let nodes = generateNodes( + 2, + gossip = true, + overheadRateLimit = Opt.some((20, 1.millis))) + + discard await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + await subscribeNodes(nodes) + + proc handle(topic: string, data: seq[byte]) {.async, gcsafe.} = discard + + let gossip0 = GossipSub(nodes[0]) + let gossip1 = GossipSub(nodes[1]) + + gossip0.subscribe("foobar", handle) + gossip1.subscribe("foobar", handle) + await waitSubGraph(nodes, "foobar") + + return (nodes, gossip0, gossip1) + + proc currentRateLimitHits(): float64 = + try: + libp2p_gossipsub_peers_rate_limit_hits.valueByName("libp2p_gossipsub_peers_rate_limit_hits_total", @["nim-libp2p"]) + except KeyError: + 0 + + asyncTest "e2e - GossipSub should not rate limit decodable messages below the size allowed": + let rateLimitHits = currentRateLimitHits() + let (nodes, gossip0, gossip1) = await initializeGossipTest() + + let msg = RPCMsg(messages: @[Message(topicIDs: @["foobar"], data: "Valid data".toBytes)]) + gossip0.broadcast(gossip0.mesh["foobar"], msg) + await sleepAsync(300.millis) + + check currentRateLimitHits() == rateLimitHits + check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true + + # Disconnect peer when rate limiting is enabled + gossip1.parameters.disconnectPeerAboveRateLimit = true + gossip0.broadcast(gossip0.mesh["foobar"], msg) + + checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true + check currentRateLimitHits() == rateLimitHits + + await stopNodes(nodes) + + asyncTest "e2e - GossipSub should rate limit undecodable messages above the size allowed": + let rateLimitHits = currentRateLimitHits() + + let (nodes, gossip0, gossip1) = await initializeGossipTest() + + # Simulate sending an undecodable message + let msg = newSeqWith[byte](30, 1.byte) + await gossip1.peers[gossip0.switch.peerInfo.peerId].sendEncoded(msg) + await sleepAsync(300.millis) + + check currentRateLimitHits() == rateLimitHits + 1 + check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true + + # Disconnect peer when rate limiting is enabled + gossip1.parameters.disconnectPeerAboveRateLimit = true + await gossip0.peers[gossip1.switch.peerInfo.peerId].sendEncoded(msg) + + checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false + check currentRateLimitHits() == rateLimitHits + 2 + + await stopNodes(nodes) + + asyncTest "e2e - GossipSub should rate limit decodable messages above the size allowed": + let rateLimitHits = currentRateLimitHits() + let (nodes, gossip0, gossip1) = await initializeGossipTest() + + let msg = RPCMsg(control: some(ControlMessage(prune: @[ + ControlPrune(topicID: "foobar", peers: @[ + PeerInfoMsg(peerId: PeerId(data: newSeq[byte](30))) + ], backoff: 123'u64) + ]))) + + gossip0.broadcast(gossip0.mesh["foobar"], msg) + await sleepAsync(300.millis) + + check currentRateLimitHits() == rateLimitHits + 1 + check gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == true + + # Disconnect peer when rate limiting is enabled + gossip1.parameters.disconnectPeerAboveRateLimit = true + gossip0.broadcast(gossip0.mesh["foobar"], msg) + + checkExpiring gossip1.switch.isConnected(gossip0.switch.peerInfo.peerId) == false + check currentRateLimitHits() == rateLimitHits + 2 + + await stopNodes(nodes) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 82209dc..b1b9d21 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -5,7 +5,7 @@ const libp2p_pubsub_anonymize {.booldefine.} = false import hashes, random, tables, sets, sequtils -import chronos, stew/[byteutils, results] +import chronos, stew/[byteutils, results], chronos/ratelimit import ../../libp2p/[builders, protocols/pubsub/errors, protocols/pubsub/pubsub, @@ -67,7 +67,8 @@ proc generateNodes*( sendSignedPeerRecord = false, unsubscribeBackoff = 1.seconds, maxMessageSize: int = 1024 * 1024, - enablePX: bool = false): seq[PubSub] = + enablePX: bool = false, + overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = Opt.none(tuple[bytes: int, interval: Duration])): seq[PubSub] = for i in 0..