diff --git a/tests/testpubsub.nim b/tests/testpubsub.nim index f74efb6f0..10c9d77f9 100644 --- a/tests/testpubsub.nim +++ b/tests/testpubsub.nim @@ -32,7 +32,8 @@ proc createMplex(conn: Connection): Muxer = result = newMplex(conn) proc createNode(privKey: Option[PrivateKey] = none(PrivateKey), - address: string = "/ip4/127.0.0.1/tcp/0"): Switch = + address: string = "/ip4/127.0.0.1/tcp/0", + triggerSelf: bool = false): Switch = var peerInfo: PeerInfo var seckey = privKey if privKey.isNone: @@ -46,7 +47,7 @@ proc createNode(privKey: Option[PrivateKey] = none(PrivateKey), let muxers = [(MplexCodec, mplexProvider)].toTable() let identify = newIdentify(peerInfo) let secureManagers = [(SecioCodec, Secure(newSecio(seckey.get())))].toTable() - let pubSub = some(PubSub(newFloodSub(peerInfo))) + let pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf))) result = newSwitch(peerInfo, transports, identify, @@ -66,62 +67,65 @@ proc subscribeNodes*(nodes: seq[Switch]) {.async.} = await allFutures(pending) suite "PubSub": - test "FloodSub basic publish/subscribe A -> B": - proc testBasicPubSub(): Future[bool] {.async.} = - var passed: bool - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed = true + # test "FloodSub basic publish/subscribe A -> B": + # proc testBasicPubSub(): Future[bool] {.async.} = + # var passed: bool + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # check topic == "foobar" + # passed = true - var nodes = generateNodes(2) - var wait = await nodes[1].start() + # var nodes = generateNodes(2) + # var wait = await nodes[1].start() - await nodes[0].subscribeToPeer(nodes[1].peerInfo) + # await nodes[0].subscribeToPeer(nodes[1].peerInfo) - await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) + # await nodes[1].subscribe("foobar", handler) + # await sleepAsync(100.millis) - await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) + # await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) + # await sleepAsync(100.millis) - await nodes[1].stop() - await allFutures(wait) - result = passed + # await nodes[1].stop() + # await allFutures(wait) + # result = passed - check: - waitFor(testBasicPubSub()) == true + # check: + # waitFor(testBasicPubSub()) == true - test "FloodSub basic publish/subscribe B -> A": - proc testBasicPubSub(): Future[bool] {.async.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" + # test "FloodSub basic publish/subscribe B -> A": + # proc testBasicPubSub(): Future[bool] {.async.} = + # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + # check topic == "foobar" - var nodes = generateNodes(2) - var wait = await nodes[1].start() + # var nodes = generateNodes(2) + # var wait = await nodes[1].start() - await nodes[0].subscribeToPeer(nodes[1].peerInfo) + # await nodes[0].subscribeToPeer(nodes[1].peerInfo) - await nodes[0].subscribe("foobar", handler) - await sleepAsync(10.millis) + # await nodes[0].subscribe("foobar", handler) + # await sleepAsync(10.millis) - await nodes[1].publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(10.millis) + # await nodes[1].publish("foobar", cast[seq[byte]]("Hello!")) + # await sleepAsync(10.millis) - await nodes[1].stop() - await allFutures(wait) - result = true + # await nodes[1].stop() + # await allFutures(wait) + # result = true - check: - waitFor(testBasicPubSub()) == true + # check: + # waitFor(testBasicPubSub()) == true - test "FloodSub multiple peers": + test "FloodSub multiple peers, no self trigger": proc testBasicFloodSub(): Future[bool] {.async.} = var passed: int proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = check topic == "foobar" passed.inc() - var nodes: seq[Switch] = generateNodes(10) + var nodes: seq[Switch] = newSeq[Switch]() + for i in 0..<10: + nodes.add(createNode()) + var awaitters: seq[Future[void]] for node in nodes: awaitters.add(await node.start()) @@ -142,3 +146,35 @@ suite "PubSub": check: waitFor(testBasicFloodSub()) == true + + test "FloodSub multiple peers, with self trigger": + proc testBasicFloodSub(): Future[bool] {.async.} = + var passed: int + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.inc() + + var nodes: seq[Switch] = newSeq[Switch]() + for i in 0..<10: + nodes.add(createNode(none(PrivateKey), "/ip4/127.0.0.1/tcp/0", true)) + + var awaitters: seq[Future[void]] + for node in nodes: + awaitters.add(await node.start()) + await node.subscribe("foobar", handler) + await sleepAsync(10.millis) + + await subscribeNodes(nodes) + await sleepAsync(50.millis) + + for node in nodes: + await node.publish("foobar", cast[seq[byte]]("Hello!")) + await sleepAsync(100.millis) + + await allFutures(nodes.mapIt(it.stop())) + await allFutures(awaitters) + + result = passed == 20 + + check: + waitFor(testBasicFloodSub()) == true