diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index fb1873c..92d3526 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -360,7 +360,7 @@ method rpcHandler*(g: GossipSub, template sub: untyped = rpcMsg.subscriptions[i] g.handleSubscribe(peer, sub.topic, sub.subscribe) - # the above call applied limtis to subs number + # the above call applied limits to subs number # in gossipsub we want to apply scoring as well if rpcMsg.subscriptions.len > g.topicsHigh: debug "received an rpc message with an oversized amount of subscriptions", peer, diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 8c4a766..83e3af7 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -9,7 +9,7 @@ {.used.} -import sequtils, options, tables, sets +import sequtils, options, tables, sets, sugar import chronos, stew/byteutils import chronicles import utils, ../../libp2p/[errors, @@ -29,26 +29,6 @@ import ../helpers proc `$`(peer: PubSubPeer): string = shortLog(peer) -proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = - if sender == receiver: - return - let timeout = Moment.now() + 5.seconds - let fsub = GossipSub(sender) - - # this is for testing purposes only - # peers can be inside `mesh` and `fanout`, not just `gossipsub` - while (not fsub.gossipsub.hasKey(key) or - not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and - (not fsub.mesh.hasKey(key) or - not fsub.mesh.hasPeerId(key, receiver.peerInfo.peerId)) and - (not fsub.fanout.hasKey(key) or - not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)): - trace "waitSub sleeping..." - - # await - await sleepAsync(5.milliseconds) - doAssert Moment.now() < timeout, "waitSub timeout!" - template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 5.seconds): untyped = var expiration = Moment.now() + timeout @@ -690,14 +670,14 @@ suite "GossipSub": seenFut.complete() dialer.subscribe("foobar", handler) - await waitSub(nodes[0], dialer, "foobar") + await waitSubGraph(nodes, "foobar") tryPublish await wait(nodes[0].publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId)), 1.minutes), 1 - await wait(seenFut, 2.minutes) + await wait(seenFut, 1.minutes) check: seen.len >= runs for k, v in seen.pairs: check: v >= 1 @@ -726,10 +706,11 @@ suite "GossipSub": var seen: Table[string, int] var seenFut = newFuture[void]() + for i in 0..= runs for k, v in seen.pairs: check: v >= 1 diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 3f21196..87318a4 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -10,8 +10,7 @@ {.used.} import sequtils, options, tables, sets -import chronos, stew/byteutils -import chronicles +import chronos, stew/byteutils, chronicles import utils, ../../libp2p/[errors, peerid, peerinfo, @@ -25,26 +24,6 @@ import utils, ../../libp2p/[errors, protocols/pubsub/rpc/messages] import ../helpers -proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = - if sender == receiver: - return - let timeout = Moment.now() + 5.seconds - let fsub = GossipSub(sender) - - # this is for testing purposes only - # peers can be inside `mesh` and `fanout`, not just `gossipsub` - while (not fsub.gossipsub.hasKey(key) or - not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and - (not fsub.mesh.hasKey(key) or - not fsub.mesh.hasPeerId(key, receiver.peerInfo.peerId)) and - (not fsub.fanout.hasKey(key) or - not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)): - trace "waitSub sleeping..." - - # await - await sleepAsync(5.milliseconds) - doAssert Moment.now() < timeout, "waitSub timeout!" - template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 10.seconds): untyped = var expiration = Moment.now() + timeout @@ -269,7 +248,7 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) - asyncTest "GossipsSub peers disconnections mechanics": + asyncTest "GossipSub peers disconnections mechanics": var runs = 10 let @@ -294,7 +273,8 @@ suite "GossipSub": seenFut.complete() dialer.subscribe("foobar", handler) - await waitSub(nodes[0], dialer, "foobar") + + await waitSubGraph(nodes, "foobar") # ensure peer stats are stored properly and kept properly check: diff --git a/tests/pubsub/testpubsub.nim b/tests/pubsub/testpubsub.nim index 0e910cb..08b9aba 100644 --- a/tests/pubsub/testpubsub.nim +++ b/tests/pubsub/testpubsub.nim @@ -2,11 +2,9 @@ import ../stublogger -import testfloodsub -when not defined(linux): - import testgossipsub, - testgossipsub2 -import +import testfloodsub, + testgossipsub, + testgossipsub2, testmcache, testtimedcache, testmessage diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 59918a7..095c68c 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -4,7 +4,7 @@ const libp2p_pubsub_verify {.booldefine.} = true libp2p_pubsub_anonymize {.booldefine.} = false -import hashes, random, tables +import hashes, random, tables, sets, sequtils import chronos, stew/[byteutils, results] import ../../libp2p/[builders, protocols/pubsub/errors, @@ -13,6 +13,7 @@ import ../../libp2p/[builders, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages, protocols/secure/secure] +import chronicles export builders @@ -102,3 +103,43 @@ proc subscribeRandom*(nodes: seq[PubSub]) {.async.} = if dialer.peerInfo.peerId != node.peerInfo.peerId: await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs) dialed.add(node.peerInfo.peerId) + +proc waitSub*(sender, receiver: auto; key: string) {.async, gcsafe.} = + if sender == receiver: + return + let timeout = Moment.now() + 5.seconds + let fsub = GossipSub(sender) + + # this is for testing purposes only + # peers can be inside `mesh` and `fanout`, not just `gossipsub` + while (not fsub.gossipsub.hasKey(key) or + not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and + (not fsub.mesh.hasKey(key) or + not fsub.mesh.hasPeerId(key, receiver.peerInfo.peerId)) and + (not fsub.fanout.hasKey(key) or + not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)): + trace "waitSub sleeping..." + + # await + await sleepAsync(5.milliseconds) + doAssert Moment.now() < timeout, "waitSub timeout!" + +proc waitSubGraph*(nodes: seq[PubSub], key: string) {.async, gcsafe.} = + let timeout = Moment.now() + 5.seconds + while true: + var + nodesMesh: Table[PeerId, seq[PeerId]] + seen: HashSet[PeerId] + for n in nodes: + nodesMesh[n.peerInfo.peerId] = toSeq(GossipSub(n).mesh.getOrDefault(key).items()).mapIt(it.peerId) + proc explore(p: PeerId) = + if p in seen: return + seen.incl(p) + for peer in nodesMesh.getOrDefault(p): + explore(peer) + explore(nodes[0].peerInfo.peerId) + if seen.len == nodes.len: return + trace "waitSubGraph sleeping..." + + await sleepAsync(5.milliseconds) + doAssert Moment.now() < timeout, "waitSubGraph timeout!"