diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index e1a40c59f..7a2bf21a2 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -242,7 +242,8 @@ proc updateScores*(g: GossipSub) = # avoid async trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold: + if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold and + peer.peerId notin g.parameters.directPeers: debug "disconnecting bad score peer", peer, score = peer.score asyncSpawn(try: g.disconnectPeer(peer) except Exception as exc: raiseAssert exc.msg) diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 6e9c8913d..4c960d1ce 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -182,10 +182,6 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) asyncTest "GossipSub test directPeers": - var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - handlerFut.complete(true) let nodes = generateNodes(2, gossip = true) @@ -221,6 +217,7 @@ suite "GossipSub": # DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN ### await subscribeNodes(nodes) + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard nodes[0].subscribe("foobar", handler) nodes[1].subscribe("foobar", handler) @@ -238,6 +235,113 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipSub directPeers: always forward messages": + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + GossipSub(nodes[0]).parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs + GossipSub(nodes[1]).parameters.directPeers[nodes[0].switch.peerInfo.peerId] = nodes[0].switch.peerInfo.addrs + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + var handlerFut = newFuture[void]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete() + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + tryPublish await nodes[0].publish("foobar", toBytes("hellow")), 1 + + await handlerFut + + # peer shouldn't be in our mesh + check "foobar" notin GossipSub(nodes[0]).mesh + check "foobar" notin GossipSub(nodes[1]).mesh + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "GossipSub directPeers: don't kick direct peer with low score": + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + GossipSub(nodes[0]).parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs + GossipSub(nodes[1]).parameters.directPeers[nodes[0].switch.peerInfo.peerId] = nodes[0].switch.peerInfo.addrs + + GossipSub(nodes[1]).parameters.disconnectBadPeers = true + GossipSub(nodes[1]).parameters.graylistThreshold = 100000 + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + var handlerFut = newFuture[void]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete() + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + tryPublish await nodes[0].publish("foobar", toBytes("hellow")), 1 + + await handlerFut + + GossipSub(nodes[1]).updateScores() + # peer shouldn't be in our mesh + check: + GossipSub(nodes[1]).peerStats[nodes[0].switch.peerInfo.peerId].score < GossipSub(nodes[1]).parameters.graylistThreshold + GossipSub(nodes[1]).updateScores() + + handlerFut = newFuture[void]() + tryPublish await nodes[0].publish("foobar", toBytes("hellow2")), 1 + + # Without directPeers, this would fail + await handlerFut.wait(1.seconds) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipsSub peers disconnections mechanics": var runs = 10