From 55b763264e52798937d4ab33795f88ff359b271c Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 12 Nov 2020 21:44:02 -0600 Subject: [PATCH] Cleanup tests (#435) * add async testing methods * refactor with async testing methods * use iffy in async tests --- tests/helpers.nim | 21 + tests/pubsub/testfloodsub.nim | 595 ++++++------- tests/pubsub/testgossipinternal.nim | 675 +++++++------- tests/pubsub/testgossipinternal10.nim | 639 ++++++------- tests/pubsub/testgossipsub.nim | 1016 ++++++++++----------- tests/testbufferstream.nim | 304 +++---- tests/testconnection.nim | 54 +- tests/testidentify.nim | 114 ++- tests/testinterop.nim | 429 ++++----- tests/testmplex.nim | 477 ++++------ tests/testmultistream.nim | 412 ++++----- tests/testnoise.nim | 363 ++++---- tests/testswitch.nim | 1187 ++++++++++++------------- tests/testtransport.nim | 268 +++--- 14 files changed, 3010 insertions(+), 3544 deletions(-) diff --git a/tests/helpers.nim b/tests/helpers.nim index 9bf6d2cfa..60d4aaf0e 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -46,6 +46,27 @@ template checkTrackers*() = # Also test the GC is not fooling with us GC_fullCollect() +template asyncTeardown*(body: untyped): untyped = + teardown: + waitFor(( + proc() {.async.} = + body + )()) + +template asyncSetup*(body: untyped): untyped = + setup: + waitFor(( + proc() {.async.} = + body + )()) + +template asyncTest*(name: string, body: untyped): untyped = + test name: + waitFor(( + proc() {.async.} = + body + )()) + type RngWrap = object rng: ref BrHmacDrbgContext diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 367ad25d6..389483eb3 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -38,357 +38,336 @@ suite "FloodSub": teardown: checkTrackers() - test "FloodSub basic publish/subscribe A -> B": - proc runTests() {.async.} = - var completionFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - completionFut.complete(true) + asyncTest "FloodSub basic publish/subscribe A -> B": + var completionFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + completionFut.complete(true) - let - nodes = generateNodes(2) + let + nodes = generateNodes(2) - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) - # start pubsub - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) - await subscribeNodes(nodes) + await subscribeNodes(nodes) - await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") - check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 - check (await completionFut.wait(5.seconds)) == true + check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 + check (await completionFut.wait(5.seconds)) == true - await allFuturesThrowing( + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "FloodSub basic publish/subscribe B -> A": + var completionFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + completionFut.complete(true) + + let + nodes = generateNodes(2) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsubcon + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + await nodes[0].subscribe("foobar", handler) + await waitSub(nodes[1], nodes[0], "foobar") + + check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0 + + check (await completionFut.wait(5.seconds)) == true + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut) + + asyncTest "FloodSub validation should succeed": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsubcon + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + var validatorFut = newFuture[bool]() + proc validator(topic: string, + message: Message): Future[ValidationResult] {.async.} = + check topic == "foobar" + validatorFut.complete(true) + result = ValidationResult.Accept + + nodes[1].addValidator("foobar", validator) + + check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 + check (await handlerFut) == true + + await allFuturesThrowing( nodes[0].switch.stop(), nodes[1].switch.stop() ) - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut) + + asyncTest "FloodSub validation should fail": + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check false # if we get here, it should fail + + let + nodes = generateNodes(2) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), ) - await allFuturesThrowing(nodesFut.concat()) + # start pubsubcon + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) - waitFor(runTests()) + await subscribeNodes(nodes) + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") - test "FloodSub basic publish/subscribe B -> A": - proc runTests() {.async.} = - var completionFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - completionFut.complete(true) + var validatorFut = newFuture[bool]() + proc validator(topic: string, + message: Message): Future[ValidationResult] {.async.} = + validatorFut.complete(true) + result = ValidationResult.Reject - let - nodes = generateNodes(2) + nodes[1].addValidator("foobar", validator) - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) + discard await nodes[0].publish("foobar", "Hello!".toBytes()) - # start pubsubcon - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - await subscribeNodes(nodes) - - await nodes[0].subscribe("foobar", handler) - await waitSub(nodes[1], nodes[0], "foobar") - - check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0 - - check (await completionFut.wait(5.seconds)) == true - - await allFuturesThrowing( + await allFuturesThrowing( nodes[0].switch.stop(), nodes[1].switch.stop() ) - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut) + + asyncTest "FloodSub validation one fails and one succeeds": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foo" + handlerFut.complete(true) + + let + nodes = generateNodes(2) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), ) - await allFuturesThrowing(nodesFut) + # start pubsubcon + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) - waitFor(runTests()) + await subscribeNodes(nodes) + await nodes[1].subscribe("foo", handler) + await waitSub(nodes[0], nodes[1], "foo") + await nodes[1].subscribe("bar", handler) + await waitSub(nodes[0], nodes[1], "bar") - test "FloodSub validation should succeed": - proc runTests() {.async.} = - var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - handlerFut.complete(true) - - let - nodes = generateNodes(2) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - # start pubsubcon - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - await subscribeNodes(nodes) - - await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") - - var validatorFut = newFuture[bool]() - proc validator(topic: string, - message: Message): Future[ValidationResult] {.async.} = - check topic == "foobar" - validatorFut.complete(true) + proc validator(topic: string, + message: Message): Future[ValidationResult] {.async.} = + if topic == "foo": result = ValidationResult.Accept - - nodes[1].addValidator("foobar", validator) - - check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 - check (await handlerFut) == true - - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() - ) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() - ) - - await allFuturesThrowing(nodesFut) - - waitFor(runTests()) - - test "FloodSub validation should fail": - proc runTests() {.async.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check false # if we get here, it should fail - - let - nodes = generateNodes(2) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - # start pubsubcon - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - await subscribeNodes(nodes) - await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") - - var validatorFut = newFuture[bool]() - proc validator(topic: string, - message: Message): Future[ValidationResult] {.async.} = - validatorFut.complete(true) + else: result = ValidationResult.Reject - nodes[1].addValidator("foobar", validator) + nodes[1].addValidator("foo", "bar", validator) - discard await nodes[0].publish("foobar", "Hello!".toBytes()) + check (await nodes[0].publish("foo", "Hello!".toBytes())) > 0 + check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0 - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut) + + asyncTest "FloodSub multiple peers, no self trigger": + var runs = 10 + + var futs = newSeq[(Future[void], TopicHandler, ref int)](runs) + for i in 0.. 0 - check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0 + var pubs: seq[Future[int]] + for i in 0.. B": - proc runTests() {.async.} = - var passed = newFuture[void]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed.complete() - - let - nodes = generateNodes( - 2, - gossip = true, - secureManagers = [SecureProtocol.Secio]) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - # start pubsub - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - await subscribeNodes(nodes) - - await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") - - var observed = 0 - let - obs1 = PubSubObserver(onRecv: proc(peer: PubSubPeer; msgs: var RPCMsg) = - inc observed - ) - obs2 = PubSubObserver(onSend: proc(peer: PubSubPeer; msgs: var RPCMsg) = - inc observed - ) - - nodes[1].addObserver(obs1) - nodes[0].addObserver(obs2) - - tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 - - var gossip1: GossipSub = GossipSub(nodes[0]) - var gossip2: GossipSub = GossipSub(nodes[1]) - - check: - "foobar" in gossip1.gossipsub - gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId) - not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId) - - await passed.wait(2.seconds) - - trace "test done, stopping..." - - await nodes[0].stop() - await nodes[1].stop() - - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() - ) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() - ) - - await allFuturesThrowing(nodesFut.concat()) - check observed == 2 - - waitFor(runTests()) - - test "e2e - GossipSub send over mesh A -> B": - proc runTests(): Future[bool] {.async.} = - var passed: Future[bool] = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - passed.complete(true) - - let - nodes = generateNodes( - 2, - gossip = true, - secureManagers = [SecureProtocol.Secio]) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - # start pubsub - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - await subscribeNodes(nodes) - - await nodes[0].subscribe("foobar", handler) - await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") - - tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 - - result = await passed - - var gossip1: GossipSub = GossipSub(nodes[0]) - var gossip2: GossipSub = GossipSub(nodes[1]) - - check: - "foobar" in gossip1.gossipsub - "foobar" in gossip2.gossipsub - gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId) - not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId) - gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.peerId) - not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.peerId) - - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() - ) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() - ) - - await allFuturesThrowing(nodesFut.concat()) + let gossip1 = GossipSub(nodes[0]) + let gossip2 = GossipSub(nodes[1]) check: - waitFor(runTests()) == true + gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout + gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout - test "e2e - GossipSub with multiple peers": - proc runTests() {.async.} = - var runs = 10 + var validatorFut = newFuture[bool]() + proc validator(topic: string, + message: Message): + Future[ValidationResult] {.async.} = + result = ValidationResult.Reject + validatorFut.complete(true) - let - nodes = generateNodes(runs, gossip = true, triggerSelf = true) - nodesFut = nodes.mapIt(it.switch.start()) + nodes[1].addValidator("foobar", validator) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 - await allFuturesThrowing(nodes.mapIt(it.start())) - await subscribeNodes(nodes) + check (await validatorFut) == true - var seen: Table[string, int] - var seenFut = newFuture[void]() - for dialer in nodes: - var handler: TopicHandler - closureScope: - var peerName = $dialer.peerInfo.peerId - handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if peerName notin seen: - seen[peerName] = 0 - seen[peerName].inc - check topic == "foobar" - if not seenFut.finished() and seen.len >= runs: - seenFut.complete() + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) - await dialer.subscribe("foobar", handler) - await waitSub(nodes[0], dialer, "foobar") - - tryPublish await wait(nodes[0].publish("foobar", - toBytes("from node " & - $nodes[0].peerInfo.peerId)), - 1.minutes), 1, 5.seconds + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) - await wait(seenFut, 2.minutes) - check: seen.len >= runs - for k, v in seen.pairs: - check: v >= 1 + await allFuturesThrowing(nodesFut.concat()) - for node in nodes: - var gossip = GossipSub(node) + asyncTest "GossipSub validation one fails and one succeeds": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foo" + handlerFut.complete(true) - check: - "foobar" in gossip.gossipsub + let + nodes = generateNodes(2, gossip = true) - await allFuturesThrowing( - nodes.mapIt( - allFutures( - it.stop(), - it.switch.stop()))) + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) - await allFuturesThrowing(nodesFut) + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) - waitFor(runTests()) + await subscribeNodes(nodes) - test "e2e - GossipSub with multiple peers (sparse)": - proc runTests() {.async.} = - var runs = 10 + await nodes[1].subscribe("foo", handler) + await nodes[1].subscribe("bar", handler) - let - nodes = generateNodes(runs, gossip = true, triggerSelf = true) - nodesFut = nodes.mapIt(it.switch.start()) + var passed, failed: Future[bool] = newFuture[bool]() + proc validator(topic: string, + message: Message): + Future[ValidationResult] {.async.} = + result = if topic == "foo": + passed.complete(true) + ValidationResult.Accept + else: + failed.complete(true) + ValidationResult.Reject - await allFuturesThrowing(nodes.mapIt(it.start())) - await subscribeNodes(nodes) + nodes[1].addValidator("foo", "bar", validator) + tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1 + tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 - var seen: Table[string, int] - var seenFut = newFuture[void]() - for dialer in nodes: - var handler: TopicHandler - closureScope: - var peerName = $dialer.peerInfo.peerId - handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = - if peerName notin seen: - seen[peerName] = 0 - seen[peerName].inc - check topic == "foobar" - if not seenFut.finished() and seen.len >= runs: - seenFut.complete() + check ((await passed) and (await failed) and (await handlerFut)) - await dialer.subscribe("foobar", handler) - await waitSub(nodes[0], dialer, "foobar") + let gossip1 = GossipSub(nodes[0]) + let gossip2 = GossipSub(nodes[1]) - tryPublish await wait(nodes[0].publish("foobar", - toBytes("from node " & - $nodes[0].peerInfo.peerId)), - 1.minutes), 1, 5.seconds + check: + "foo" notin gossip1.mesh and gossip1.fanout["foo"].len == 1 + "foo" notin gossip2.mesh and "foo" notin gossip2.fanout + "bar" notin gossip1.mesh and gossip1.fanout["bar"].len == 1 + "bar" notin gossip2.mesh and "bar" notin gossip2.fanout - await wait(seenFut, 5.minutes) - check: seen.len >= runs - for k, v in seen.pairs: - check: v >= 1 + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) - for node in nodes: - var gossip = GossipSub(node) - check: - "foobar" in gossip.gossipsub - gossip.fanout.len == 0 - gossip.mesh["foobar"].len > 0 + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) - await allFuturesThrowing( - nodes.mapIt( - allFutures( - it.stop(), - it.switch.stop()))) + await allFuturesThrowing(nodesFut.concat()) - await allFuturesThrowing(nodesFut) + asyncTest "e2e - GossipSub should add remote peer topic subscriptions": + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard - waitFor(runTests()) + let + nodes = generateNodes( + 2, + gossip = true, + secureManagers = [SecureProtocol.Noise]) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + await nodes[1].subscribe("foobar", handler) + await sleepAsync(10.seconds) + + let gossip1 = GossipSub(nodes[0]) + let gossip2 = GossipSub(nodes[1]) + + check: + "foobar" in gossip2.topics + "foobar" in gossip1.gossipsub + gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.peerId) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed": + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + discard + + let + nodes = generateNodes( + 2, + gossip = true, + secureManagers = [SecureProtocol.Secio]) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + await nodes[0].subscribe("foobar", handler) + await nodes[1].subscribe("foobar", handler) + + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + + await allFuturesThrowing(subs) + + let + gossip1 = GossipSub(nodes[0]) + gossip2 = GossipSub(nodes[1]) + + check: + "foobar" in gossip1.topics + "foobar" in gossip2.topics + + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub + + gossip1.gossipsub.hasPeerID("foobar", gossip2.peerInfo.peerId) or + gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId) + + gossip2.gossipsub.hasPeerID("foobar", gossip1.peerInfo.peerId) or + gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.peerId) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "e2e - GossipSub send over fanout A -> B": + var passed = newFuture[void]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.complete() + + let + nodes = generateNodes( + 2, + gossip = true, + secureManagers = [SecureProtocol.Secio]) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + var observed = 0 + let + obs1 = PubSubObserver(onRecv: proc(peer: PubSubPeer; msgs: var RPCMsg) = + inc observed + ) + obs2 = PubSubObserver(onSend: proc(peer: PubSubPeer; msgs: var RPCMsg) = + inc observed + ) + + nodes[1].addObserver(obs1) + nodes[0].addObserver(obs2) + + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + var gossip1: GossipSub = GossipSub(nodes[0]) + var gossip2: GossipSub = GossipSub(nodes[1]) + + check: + "foobar" in gossip1.gossipsub + gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId) + not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId) + + await passed.wait(2.seconds) + + trace "test done, stopping..." + + await nodes[0].stop() + await nodes[1].stop() + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + check observed == 2 + + asyncTest "e2e - GossipSub send over mesh A -> B": + var passed: Future[bool] = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.complete(true) + + let + nodes = generateNodes( + 2, + gossip = true, + secureManagers = [SecureProtocol.Secio]) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + await subscribeNodes(nodes) + + await nodes[0].subscribe("foobar", handler) + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + + check await passed + + var gossip1: GossipSub = GossipSub(nodes[0]) + var gossip2: GossipSub = GossipSub(nodes[1]) + + check: + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub + gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId) + not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId) + gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.peerId) + not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.peerId) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "e2e - GossipSub with multiple peers": + var runs = 10 + + let + nodes = generateNodes(runs, gossip = true, triggerSelf = true) + nodesFut = nodes.mapIt(it.switch.start()) + + await allFuturesThrowing(nodes.mapIt(it.start())) + await subscribeNodes(nodes) + + var seen: Table[string, int] + var seenFut = newFuture[void]() + for dialer in nodes: + var handler: TopicHandler + closureScope: + var peerName = $dialer.peerInfo.peerId + handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc + check topic == "foobar" + if not seenFut.finished() and seen.len >= runs: + seenFut.complete() + + await dialer.subscribe("foobar", handler) + await waitSub(nodes[0], dialer, "foobar") + + tryPublish await wait(nodes[0].publish("foobar", + toBytes("from node " & + $nodes[0].peerInfo.peerId)), + 1.minutes), 1, 5.seconds + + await wait(seenFut, 2.minutes) + check: seen.len >= runs + for k, v in seen.pairs: + check: v >= 1 + + for node in nodes: + var gossip = GossipSub(node) + + check: + "foobar" in gossip.gossipsub + + await allFuturesThrowing( + nodes.mapIt( + allFutures( + it.stop(), + it.switch.stop()))) + + await allFuturesThrowing(nodesFut) + + asyncTest "e2e - GossipSub with multiple peers (sparse)": + var runs = 10 + + let + nodes = generateNodes(runs, gossip = true, triggerSelf = true) + nodesFut = nodes.mapIt(it.switch.start()) + + await allFuturesThrowing(nodes.mapIt(it.start())) + await subscribeNodes(nodes) + + var seen: Table[string, int] + var seenFut = newFuture[void]() + for dialer in nodes: + var handler: TopicHandler + closureScope: + var peerName = $dialer.peerInfo.peerId + handler = proc(topic: string, data: seq[byte]) {.async, gcsafe, closure.} = + if peerName notin seen: + seen[peerName] = 0 + seen[peerName].inc + check topic == "foobar" + if not seenFut.finished() and seen.len >= runs: + seenFut.complete() + + await dialer.subscribe("foobar", handler) + await waitSub(nodes[0], dialer, "foobar") + + tryPublish await wait(nodes[0].publish("foobar", + toBytes("from node " & + $nodes[0].peerInfo.peerId)), + 1.minutes), 1, 5.seconds + + await wait(seenFut, 5.minutes) + check: seen.len >= runs + for k, v in seen.pairs: + check: v >= 1 + + for node in nodes: + var gossip = GossipSub(node) + check: + "foobar" in gossip.gossipsub + gossip.fanout.len == 0 + gossip.mesh["foobar"].len > 0 + + await allFuturesThrowing( + nodes.mapIt( + allFutures( + it.stop(), + it.switch.stop()))) + + await allFuturesThrowing(nodesFut) diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index 6af89adab..866e3e62b 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -3,6 +3,8 @@ import chronos, stew/byteutils import ../libp2p/stream/bufferstream, ../libp2p/stream/lpstream +import ./helpers + {.used.} suite "BufferStream": @@ -10,224 +12,154 @@ suite "BufferStream": # echo getTracker(BufferStreamTrackerName).dump() check getTracker(BufferStreamTrackerName).isLeaked() == false - test "push data to buffer": - proc testpushData(): Future[bool] {.async.} = - let buff = newBufferStream() - check buff.len == 0 - var data = "12345" - await buff.pushData(data.toBytes()) - check buff.len == 5 - result = true + asyncTest "push data to buffer": + let buff = newBufferStream() + check buff.len == 0 + var data = "12345" + await buff.pushData(data.toBytes()) + check buff.len == 5 + await buff.close() - await buff.close() + asyncTest "push and wait": + let buff = newBufferStream() + check buff.len == 0 - check: - waitFor(testpushData()) == true + let fut0 = buff.pushData("1234".toBytes()) + let fut1 = buff.pushData("5".toBytes()) + check buff.len == 4 # the second write should not be visible yet - test "push and wait": - proc testpushData(): Future[bool] {.async.} = - let buff = newBufferStream() - check buff.len == 0 + var data: array[1, byte] + check: 1 == await buff.readOnce(addr data[0], data.len) - let fut0 = buff.pushData("1234".toBytes()) - let fut1 = buff.pushData("5".toBytes()) - check buff.len == 4 # the second write should not be visible yet + check ['1'] == string.fromBytes(data) + await fut0 + await fut1 + check buff.len == 4 + await buff.close() - var data: array[1, byte] - check: 1 == await buff.readOnce(addr data[0], data.len) + asyncTest "read with size": + let buff = newBufferStream() + check buff.len == 0 - check ['1'] == string.fromBytes(data) - await fut0 - await fut1 - check buff.len == 4 + await buff.pushData("12345".toBytes()) + var data: array[3, byte] + await buff.readExactly(addr data[0], data.len) + check ['1', '2', '3'] == string.fromBytes(data) + await buff.close() - result = true + asyncTest "readExactly": + let buff = newBufferStream() + check buff.len == 0 - await buff.close() + await buff.pushData("12345".toBytes()) + check buff.len == 5 + var data: array[2, byte] + await buff.readExactly(addr data[0], data.len) + check string.fromBytes(data) == ['1', '2'] + await buff.close() - check: - waitFor(testpushData()) == true + asyncTest "readExactly raises": + let buff = newBufferStream() + check buff.len == 0 - test "read with size": - proc testRead(): Future[bool] {.async.} = - let buff = newBufferStream() - check buff.len == 0 + await buff.pushData("123".toBytes()) + var data: array[5, byte] + var readFut = buff.readExactly(addr data[0], data.len) + await buff.close() - await buff.pushData("12345".toBytes()) - var data: array[3, byte] - await buff.readExactly(addr data[0], data.len) - check ['1', '2', '3'] == string.fromBytes(data) + expect LPStreamIncompleteError: + await readFut - result = true + asyncTest "readOnce": + let buff = newBufferStream() + check buff.len == 0 - await buff.close() + var data: array[3, byte] + let readFut = buff.readOnce(addr data[0], data.len) + await buff.pushData("123".toBytes()) + check buff.len == 3 - check: - waitFor(testRead()) == true + check (await readFut) == 3 + check string.fromBytes(data) == ['1', '2', '3'] + await buff.close() - test "readExactly": - proc testReadExactly(): Future[bool] {.async.} = - let buff = newBufferStream() - check buff.len == 0 + asyncTest "reads should happen in order": + let buff = newBufferStream() + check buff.len == 0 - await buff.pushData("12345".toBytes()) - check buff.len == 5 - var data: array[2, byte] - await buff.readExactly(addr data[0], data.len) - check string.fromBytes(data) == ['1', '2'] + let w1 = buff.pushData("Msg 1".toBytes()) + let w2 = buff.pushData("Msg 2".toBytes()) + let w3 = buff.pushData("Msg 3".toBytes()) - result = true + var data: array[5, byte] + await buff.readExactly(addr data[0], data.len) - await buff.close() + check string.fromBytes(data) == "Msg 1" - check: - waitFor(testReadExactly()) == true + await buff.readExactly(addr data[0], data.len) + check string.fromBytes(data) == "Msg 2" - test "readExactly raises": - proc testReadExactly(): Future[bool] {.async.} = - let buff = newBufferStream() - check buff.len == 0 + await buff.readExactly(addr data[0], data.len) + check string.fromBytes(data) == "Msg 3" - await buff.pushData("123".toBytes()) - var data: array[5, byte] - var readFut = buff.readExactly(addr data[0], data.len) - await buff.close() + for f in [w1, w2, w3]: await f - try: - await readFut - except LPStreamIncompleteError: - result = true + let w4 = buff.pushData("Msg 4".toBytes()) + let w5 = buff.pushData("Msg 5".toBytes()) + let w6 = buff.pushData("Msg 6".toBytes()) - check: - waitFor(testReadExactly()) == true + await buff.close() - test "readOnce": - proc testReadOnce(): Future[bool] {.async.} = - let buff = newBufferStream() - check buff.len == 0 + await buff.readExactly(addr data[0], data.len) + check string.fromBytes(data) == "Msg 4" - var data: array[3, byte] - let readFut = buff.readOnce(addr data[0], data.len) - await buff.pushData("123".toBytes()) - check buff.len == 3 + await buff.readExactly(addr data[0], data.len) + check string.fromBytes(data) == "Msg 5" - check (await readFut) == 3 - check string.fromBytes(data) == ['1', '2', '3'] + await buff.readExactly(addr data[0], data.len) + check string.fromBytes(data) == "Msg 6" + for f in [w4, w5, w6]: await f - result = true + asyncTest "small reads": + let buff = newBufferStream() + check buff.len == 0 - await buff.close() + var writes: seq[Future[void]] + var str: string + for i in 0..<10: + writes.add buff.pushData("123".toBytes()) + str &= "123" + await buff.close() # all data should still be read after close - check: - waitFor(testReadOnce()) == true + var str2: string + var data: array[2, byte] + expect LPStreamEOFError: + while true: + let x = await buff.readOnce(addr data[0], data.len) + str2 &= string.fromBytes(data[0.. 0.uint: await s.readExactly(addr result[0], int(size)) -proc testPubSubDaemonPublish(gossip: bool = false, - count: int = 1): Future[bool] {.async.} = +proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1) {.async.} = var pubsubData = "TEST MESSAGE" var testTopic = "test-topic" var msgData = pubsubData.toBytes() @@ -120,14 +119,12 @@ proc testPubSubDaemonPublish(gossip: bool = false, await wait(publisher(), 5.minutes) # should be plenty of time - result = true await nativeNode.stop() await pubsub.stop() await allFutures(awaiters) await daemonNode.close() -proc testPubSubNodePublish(gossip: bool = false, - count: int = 1): Future[bool] {.async.} = +proc testPubSubNodePublish(gossip: bool = false, count: int = 1) {.async.} = var pubsubData = "TEST MESSAGE" var testTopic = "test-topic" var msgData = pubsubData.toBytes() @@ -187,7 +184,7 @@ proc testPubSubNodePublish(gossip: bool = false, await wait(publisher(), 5.minutes) # should be plenty of time - result = finished + check finished await nativeNode.stop() await pubsub.stop() await allFutures(awaiters) @@ -199,268 +196,236 @@ suite "Interop": # and libp2p, so not sure which one it is, # need to investigate more # teardown: - # for tracker in testTrackers(): - # # echo tracker.dump() - # # check tracker.isLeaked() == false + # checkTrackers() # TODO: this test is failing sometimes on windows # For some reason we receive EOF before test 4 sometimes - test "native -> daemon multiple reads and writes": - proc runTests(): Future[bool] {.async.} = - var protos = @["/test-stream"] + asyncTest "native -> daemon multiple reads and writes": + var protos = @["/test-stream"] - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], - outTimeout = 5.minutes) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], + outTimeout = 5.minutes) - let awaiters = await nativeNode.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() + let awaiters = await nativeNode.start() + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() - var testFuture = newFuture[void]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = - check string.fromBytes(await stream.transp.readLp()) == "test 1" - discard await stream.transp.writeLp("test 2") - check string.fromBytes(await stream.transp.readLp()) == "test 3" - discard await stream.transp.writeLp("test 4") - testFuture.complete() + var testFuture = newFuture[void]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + check string.fromBytes(await stream.transp.readLp()) == "test 1" + discard await stream.transp.writeLp("test 2") + check string.fromBytes(await stream.transp.readLp()) == "test 3" + discard await stream.transp.writeLp("test 4") + testFuture.complete() - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, - daemonPeer.addresses), - protos[0]) - await conn.writeLp("test 1") - check "test 2" == string.fromBytes((await conn.readLp(1024))) - - await conn.writeLp("test 3") - check "test 4" == string.fromBytes((await conn.readLp(1024))) + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, + daemonPeer.addresses), + protos[0]) + await conn.writeLp("test 1") + check "test 2" == string.fromBytes((await conn.readLp(1024))) - await wait(testFuture, 10.secs) + await conn.writeLp("test 3") + check "test 4" == string.fromBytes((await conn.readLp(1024))) - await nativeNode.stop() - await daemonNode.close() - await allFutures(awaiters) + await wait(testFuture, 10.secs) - await sleepAsync(1.seconds) - result = true + await nativeNode.stop() + await daemonNode.close() + await allFutures(awaiters) - check: - waitFor(runTests()) == true + await sleepAsync(1.seconds) - test "native -> daemon connection": - proc runTests(): Future[bool] {.async.} = - var protos = @["/test-stream"] - var test = "TEST STRING" - # We are preparing expect string, which should be prefixed with varint - # length and do not have `\r\n` suffix, because we going to use - # readLine(). - var buffer = initVBuffer() - buffer.writeSeq(test & "\r\n") - buffer.finish() - var expect = newString(len(buffer) - 2) - copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) + asyncTest "native -> daemon connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + # We are preparing expect string, which should be prefixed with varint + # length and do not have `\r\n` suffix, because we going to use + # readLine(). + var buffer = initVBuffer() + buffer.writeSeq(test & "\r\n") + buffer.finish() + var expect = newString(len(buffer) - 2) + copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], - outTimeout = 5.minutes) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], + outTimeout = 5.minutes) - let awaiters = await nativeNode.start() + let awaiters = await nativeNode.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() - var testFuture = newFuture[string]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = - # We should perform `readLp()` instead of `readLine()`. `readLine()` - # here reads actually length prefixed string. - var line = await stream.transp.readLine() - check line == expect - testFuture.complete(line) - await stream.close() + var testFuture = newFuture[string]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + # We should perform `readLp()` instead of `readLine()`. `readLine()` + # here reads actually length prefixed string. + var line = await stream.transp.readLine() + check line == expect + testFuture.complete(line) + await stream.close() - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, - daemonPeer.addresses), - protos[0]) - await conn.writeLp(test & "\r\n") - result = expect == (await wait(testFuture, 10.secs)) + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer, + daemonPeer.addresses), + protos[0]) + await conn.writeLp(test & "\r\n") + check expect == (await wait(testFuture, 10.secs)) + await conn.close() + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + + asyncTest "daemon -> native connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + + var testFuture = newFuture[string]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + var line = string.fromBytes(await conn.readLp(1024)) + check line == test + testFuture.complete(line) await conn.close() - await nativeNode.stop() - await allFutures(awaiters) - await daemonNode.close() - check: - waitFor(runTests()) == true + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec - test "daemon -> native connection": - proc runTests(): Future[bool] {.async.} = - var protos = @["/test-stream"] - var test = "TEST STRING" + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) - var testFuture = newFuture[string]("test.future") - proc nativeHandler(conn: Connection, proto: string) {.async.} = + nativeNode.mount(proto) + + let awaiters = await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + discard await stream.transp.writeLp(test) + + check test == (await wait(testFuture, 10.secs)) + + await stream.close() + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + await sleepAsync(1.seconds) + + asyncTest "daemon -> multiple reads and writes": + var protos = @["/test-stream"] + + var testFuture = newFuture[void]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + check "test 1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test 2".toBytes()) + + check "test 3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test 4".toBytes()) + + testFuture.complete() + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) + + nativeNode.mount(proto) + + let awaiters = await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + + asyncDiscard stream.transp.writeLp("test 1") + check "test 2" == string.fromBytes(await stream.transp.readLp()) + + asyncDiscard stream.transp.writeLp("test 3") + check "test 4" == string.fromBytes(await stream.transp.readLp()) + + await wait(testFuture, 10.secs) + + await stream.close() + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() + + asyncTest "read write multiple": + var protos = @["/test-stream"] + var test = "TEST STRING" + + var count = 0 + var testFuture = newFuture[int]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + while count < 10: var line = string.fromBytes(await conn.readLp(1024)) check line == test - testFuture.complete(line) - await conn.close() + await conn.writeLp(test.toBytes()) + count.inc() - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec + testFuture.complete(count) + await conn.close() - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec - nativeNode.mount(proto) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) - let awaiters = await nativeNode.start() - let nativePeer = nativeNode.peerInfo + nativeNode.mount(proto) - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) + let awaiters = await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + + var count2 = 0 + while count2 < 10: discard await stream.transp.writeLp(test) + let line = await stream.transp.readLp() + check test == string.fromBytes(line) + inc(count2) - result = test == (await wait(testFuture, 10.secs)) + check 10 == (await wait(testFuture, 1.minutes)) + await stream.close() + await nativeNode.stop() + await allFutures(awaiters) + await daemonNode.close() - await stream.close() - await nativeNode.stop() - await allFutures(awaiters) - await daemonNode.close() - await sleepAsync(1.seconds) + asyncTest "floodsub: daemon publish one": + await testPubSubDaemonPublish() - check: - waitFor(runTests()) == true + asyncTest "floodsub: daemon publish many": + await testPubSubDaemonPublish(count = 10) - test "daemon -> multiple reads and writes": - proc runTests(): Future[bool] {.async.} = - var protos = @["/test-stream"] + asyncTest "gossipsub: daemon publish one": + await testPubSubDaemonPublish(gossip = true) - var testFuture = newFuture[void]("test.future") - proc nativeHandler(conn: Connection, proto: string) {.async.} = - check "test 1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test 2".toBytes()) + asyncTest "gossipsub: daemon publish many": + await testPubSubDaemonPublish(gossip = true, count = 10) - check "test 3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test 4".toBytes()) + asyncTest "floodsub: node publish one": + await testPubSubNodePublish() - testFuture.complete() - await conn.close() + asyncTest "floodsub: node publish many": + await testPubSubNodePublish(count = 10) - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec + asyncTest "gossipsub: node publish one": + await testPubSubNodePublish(gossip = true) - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) - - nativeNode.mount(proto) - - let awaiters = await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - - asyncDiscard stream.transp.writeLp("test 1") - check "test 2" == string.fromBytes(await stream.transp.readLp()) - - asyncDiscard stream.transp.writeLp("test 3") - check "test 4" == string.fromBytes(await stream.transp.readLp()) - - await wait(testFuture, 10.secs) - - result = true - await stream.close() - await nativeNode.stop() - await allFutures(awaiters) - await daemonNode.close() - - check: - waitFor(runTests()) == true - - test "read write multiple": - proc runTests(): Future[bool] {.async.} = - var protos = @["/test-stream"] - var test = "TEST STRING" - - var count = 0 - var testFuture = newFuture[int]("test.future") - proc nativeHandler(conn: Connection, proto: string) {.async.} = - while count < 10: - var line = string.fromBytes(await conn.readLp(1024)) - check line == test - await conn.writeLp(test.toBytes()) - count.inc() - - testFuture.complete(count) - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) - - nativeNode.mount(proto) - - let awaiters = await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - - var count2 = 0 - while count2 < 10: - discard await stream.transp.writeLp(test) - let line = await stream.transp.readLp() - check test == string.fromBytes(line) - inc(count2) - - result = 10 == (await wait(testFuture, 1.minutes)) - await stream.close() - await nativeNode.stop() - await allFutures(awaiters) - await daemonNode.close() - - check: - waitFor(runTests()) == true - - test "floodsub: daemon publish one": - check: - waitFor(testPubSubDaemonPublish()) == true - - test "floodsub: daemon publish many": - check: - waitFor(testPubSubDaemonPublish(count = 10)) == true - - test "gossipsub: daemon publish one": - check: - waitFor(testPubSubDaemonPublish(gossip = true)) == true - - test "gossipsub: daemon publish many": - check: - waitFor(testPubSubDaemonPublish(gossip = true, count = 10)) == true - - test "floodsub: node publish one": - check: - waitFor(testPubSubNodePublish()) == true - - test "floodsub: node publish many": - check: - waitFor(testPubSubNodePublish(count = 10)) == true - - test "gossipsub: node publish one": - check: - waitFor(testPubSubNodePublish(gossip = true)) == true - - test "gossipsub: node publish many": - check: - waitFor(testPubSubNodePublish(gossip = true, count = 10)) == true + asyncTest "gossipsub: node publish many": + await testPubSubNodePublish(gossip = true, count = 10) diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 433bd573c..cb12d5cc5 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -20,8 +20,8 @@ suite "Mplex": teardown: checkTrackers() - test "encode header with channel id 0": - proc testEncodeHeader() {.async.} = + suite "channel encoding": + asyncTest "encode header with channel id 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("000873747265616d2031") @@ -29,10 +29,7 @@ suite "Mplex": await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes) await conn.close() - waitFor(testEncodeHeader()) - - test "encode header with channel id other than 0": - proc testEncodeHeader() {.async.} = + asyncTest "encode header with channel id other than 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("88010873747265616d2031") @@ -40,10 +37,7 @@ suite "Mplex": await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes) await conn.close() - waitFor(testEncodeHeader()) - - test "encode header and body with channel id 0": - proc testEncodeHeaderBody() {.async.} = + asyncTest "encode header and body with channel id 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("020873747265616d2031") @@ -51,10 +45,7 @@ suite "Mplex": await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() - waitFor(testEncodeHeaderBody()) - - test "encode header and body with channel id other than 0": - proc testEncodeHeaderBody() {.async.} = + asyncTest "encode header and body with channel id other than 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("8a010873747265616d2031") @@ -62,10 +53,7 @@ suite "Mplex": await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() - waitFor(testEncodeHeaderBody()) - - test "decode header with channel id 0": - proc testDecodeHeader() {.async.} = + asyncTest "decode header with channel id 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("000873747265616d2031")) @@ -75,10 +63,7 @@ suite "Mplex": check msg.msgType == MessageType.New await conn.close() - waitFor(testDecodeHeader()) - - test "decode header and body with channel id 0": - proc testDecodeHeader() {.async.} = + asyncTest "decode header and body with channel id 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) @@ -89,10 +74,7 @@ suite "Mplex": check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() - waitFor(testDecodeHeader()) - - test "decode header and body with channel id other than 0": - proc testDecodeHeader() {.async.} = + asyncTest "decode header and body with channel id other than 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) @@ -103,28 +85,21 @@ suite "Mplex": check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() - waitFor(testDecodeHeader()) - - test "half closed (local close) - should close for write": - proc testClosedForWrite(): Future[bool] {.async.} = + suite "channel half-closed": + asyncTest "(local close) - should close for write": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) + await chann.close() - try: + expect LPStreamClosedError: await chann.write("Hello") - except LPStreamClosedError: - result = true - finally: - await chann.reset() - await conn.close() - check: - waitFor(testClosedForWrite()) == true + await chann.reset() + await conn.close() - test "half closed (local close) - should allow reads until remote closes": - proc testOpenForRead(): Future[bool] {.async.} = + asyncTest "(local close) - should allow reads until remote closes": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = @@ -142,21 +117,16 @@ suite "Mplex": let closeFut = chann.pushEof() # should still allow reading until buffer EOF await chann.readExactly(addr data[3], 3) - try: + + expect LPStreamEOFError: # this should fail now await chann.readExactly(addr data[0], 3) - except LPStreamEOFError: - result = true - finally: - await chann.close() - await conn.close() + + await chann.close() + await conn.close() await closeFut - check: - waitFor(testOpenForRead()) == true - - test "half closed (remote close) - channel should close for reading by remote": - proc testClosedForRead(): Future[bool] {.async.} = + asyncTest "(remote close) - channel should close for reading by remote": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = @@ -171,19 +141,14 @@ suite "Mplex": let closeFut = chann.pushEof() # closing channel let readFut = chann.readExactly(addr data[3], 3) await all(closeFut, readFut) - try: + + expect LPStreamEOFError: await chann.readExactly(addr data[0], 6) # this should fail now - except LPStreamEOFError: - result = true - finally: - await chann.close() - await conn.close() - check: - waitFor(testClosedForRead()) == true + await chann.close() + await conn.close() - test "half closed (remote close) - channel should allow writing on remote close": - proc testClosedForRead(): Future[bool] {.async.} = + asyncTest "(remote close) - channel should allow writing on remote close": let testData = "Hello!".toBytes conn = newBufferStream( @@ -195,16 +160,11 @@ suite "Mplex": await chann.pushEof() # closing channel try: await chann.writeLp(testData) - return true finally: await chann.reset() # there's nobody reading the EOF! await conn.close() - check: - waitFor(testClosedForRead()) == true - - test "should not allow pushing data to channel when remote end closed": - proc testResetWrite(): Future[bool] {.async.} = + asyncTest "should not allow pushing data to channel when remote end closed": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) @@ -212,19 +172,16 @@ suite "Mplex": await chann.pushEof() var buf: array[1, byte] check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read - try: + + expect LPStreamEOFError: await chann.pushData(@[byte(1)]) - except LPStreamEOFError: - result = true - finally: - await chann.close() - await conn.close() - check: - waitFor(testResetWrite()) == true + await chann.close() + await conn.close() - test "reset - channel should fail reading": - proc testResetRead(): Future[bool] {.async.} = + suite "channel reset": + + asyncTest "channel should fail reading": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) @@ -232,18 +189,12 @@ suite "Mplex": await chann.reset() var data = newSeq[byte](1) - try: + expect LPStreamEOFError: await chann.readExactly(addr data[0], 1) - except LPStreamEOFError: - result = true - finally: - await conn.close() - check: - waitFor(testResetRead()) == true + await conn.close() - test "reset - should complete read": - proc testResetRead(): Future[bool] {.async.} = + asyncTest "should complete read": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) @@ -251,19 +202,14 @@ suite "Mplex": var data = newSeq[byte](1) let fut = chann.readExactly(addr data[0], 1) + await chann.reset() - try: + expect LPStreamEOFError: await fut - except LPStreamEOFError: - result = true - finally: - await conn.close() - check: - waitFor(testResetRead()) == true + await conn.close() - test "reset - should complete pushData": - proc testResetRead(): Future[bool] {.async.} = + asyncTest "should complete pushData": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) @@ -272,14 +218,10 @@ suite "Mplex": await chann.pushData(@[0'u8]) let fut = chann.pushData(@[0'u8]) await chann.reset() - result = await fut.withTimeout(100.millis) + check await fut.withTimeout(100.millis) await conn.close() - check: - waitFor(testResetRead()) == true - - test "reset - should complete both read and push": - proc testResetRead(): Future[bool] {.async.} = + asyncTest "should complete both read and push": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) @@ -290,46 +232,33 @@ suite "Mplex": let wfut = chann.pushData(@[0'u8]) let wfut2 = chann.pushData(@[0'u8]) await chann.reset() - result = await allFutures(rfut, wfut, wfut2).withTimeout(100.millis) + check await allFutures(rfut, wfut, wfut2).withTimeout(100.millis) await conn.close() - check: - waitFor(testResetRead()) == true - - test "reset - channel should fail writing": - proc testResetWrite(): Future[bool] {.async.} = + asyncTest "channel should fail writing": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() - try: + + expect LPStreamClosedError: await chann.write(("Hello!").toBytes) - except LPStreamClosedError: - result = true - finally: - await conn.close() - check: - waitFor(testResetWrite()) == true + await conn.close() - test "reset - channel should reset on timeout": - proc testResetWrite(): Future[bool] {.async.} = + asyncTest "channel should reset on timeout": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init( 1, conn, true, timeout = 100.millis) - check await chann.closeEvent.wait().withTimeout(1.minutes) + check await chann.join().withTimeout(1.minutes) await conn.close() - result = true - check: - waitFor(testResetWrite()) - - test "e2e - read/write receiver": - proc testNewStream() {.async.} = + suite "mplex e2e": + asyncTest "read/write receiver": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var done = newFuture[void]() @@ -364,12 +293,10 @@ suite "Mplex": await allFuturesThrowing( transport1.close(), transport2.close()) + await listenFut - waitFor(testNewStream()) - - test "e2e - read/write receiver lazy": - proc testNewStream() {.async.} = + asyncTest "read/write receiver lazy": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var done = newFuture[void]() @@ -407,10 +334,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - write fragmented": - proc testNewStream() {.async.} = + asyncTest "write fragmented": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() listenJob = newFuture[void]() @@ -449,10 +373,7 @@ suite "Mplex": let stream = await mplexDial.newStream() await stream.writeLp(bigseq) - try: - await listenJob.wait(10.seconds) - except AsyncTimeoutError: - check false + await listenJob.wait(10.seconds) await stream.close() await conn.close() @@ -460,12 +381,10 @@ suite "Mplex": await allFuturesThrowing( transport1.close(), transport2.close()) + await listenFut - waitFor(testNewStream()) - - test "e2e - read/write initiator": - proc testNewStream() {.async.} = + asyncTest "read/write initiator": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let done = newFuture[void]() @@ -501,10 +420,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - multiple streams": - proc testNewStream() {.async.} = + asyncTest "multiple streams": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let done = newFuture[void]() @@ -545,10 +461,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - multiple read/write streams": - proc testNewStream() {.async.} = + asyncTest "multiple read/write streams": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let done = newFuture[void]() @@ -592,10 +505,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - channel closes listener with EOF": - proc testNewStream() {.async.} = + asyncTest "channel closes listener with EOF": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var listenStreams: seq[Connection] @@ -641,10 +551,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - channel closes dialer with EOF": - proc testNewStream() {.async.} = + asyncTest "channel closes dialer with EOF": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var listenStreams: seq[Connection] @@ -707,10 +614,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - dialing mplex closes both ends": - proc testNewStream() {.async.} = + asyncTest "dialing mplex closes both ends": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var listenStreams: seq[Connection] @@ -750,10 +654,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - listening mplex closes both ends": - proc testNewStream() {.async.} = + asyncTest "listening mplex closes both ends": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var mplexListen: Mplex @@ -794,10 +695,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - canceling mplex handler closes both ends": - proc testNewStream() {.async.} = + asyncTest "canceling mplex handler closes both ends": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var mplexHandle: Future[void] @@ -839,10 +737,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - closing dialing connection should close both ends": - proc testNewStream() {.async.} = + asyncTest "closing dialing connection should close both ends": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var listenStreams: seq[Connection] @@ -882,10 +777,7 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) - - test "e2e - canceling listening connection should close both ends": - proc testNewStream() {.async.} = + asyncTest "canceling listening connection should close both ends": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var listenConn: Connection @@ -927,137 +819,130 @@ suite "Mplex": transport2.close()) await listenFut - waitFor(testNewStream()) + suite "jitter": + asyncTest "channel should be able to handle erratic read/writes": + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - test "jitter - channel should be able to handle erratic read/writes": - proc test() {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + var complete = newFuture[void]() + const MsgSize = 1024 + proc connHandler(conn: Connection) {.async, gcsafe.} = + let mplexListen = Mplex.init(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = + try: + let msg = await stream.readLp(MsgSize) + check msg.len == MsgSize + except CatchableError as e: + echo e.msg + await stream.close() + complete.complete() - var complete = newFuture[void]() - const MsgSize = 1024 - proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = Mplex.init(conn) - mplexListen.streamHandler = proc(stream: Connection) - {.async, gcsafe.} = - try: + await mplexListen.handle() + await mplexListen.close() + + let transport1: TcpTransport = TcpTransport.init() + let listenFut = await transport1.listen(ma, connHandler) + + let transport2: TcpTransport = TcpTransport.init() + let conn = await transport2.dial(transport1.ma) + + let mplexDial = Mplex.init(conn) + let mplexDialFut = mplexDial.handle() + let stream = await mplexDial.newStream() + var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1) + for _ in 0.. buf.buffer.len: buf.buffer.len else: size + var send = buf.buffer[0.. buf.buffer.len: buf.buffer.len else: size - var send = buf.buffer[0..