From d5f92663bc5faa6d163d28624d66d740af4942c7 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 7 Jan 2020 02:06:27 -0600 Subject: [PATCH] make tests pass --- libp2p.nimble | 2 +- tests/pubsub/testfloodsub.nim | 6 +- tests/pubsub/testgossipsub.nim | 141 +++++++++++++++++---------------- tests/pubsub/utils.nim | 6 +- tests/testswitch.nim | 16 ++-- 5 files changed, 90 insertions(+), 81 deletions(-) diff --git a/libp2p.nimble b/libp2p.nimble index 803e669bc..bf34ac2d1 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -16,7 +16,7 @@ requires "nim > 0.19.4", "stew" proc runTest(filename: string) = - exec "nim c -r tests/" & filename + exec "nim --opt:speed -d:release c -r tests/" & filename # rmFile "tests/" & filename task test, "Runs the test suite": diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index b6b603d1c..893bbc62a 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -191,7 +191,7 @@ suite "FloodSub": await allFutures(nodes.mapIt(it.stop())) await allFutures(awaitters) - result = passed >= 10 # non deterministic, so at least 2 times + result = passed >= 10 # non deterministic, so at least 10 times check: waitFor(runTests()) == true @@ -220,12 +220,10 @@ suite "FloodSub": await node.publish("foobar", cast[seq[byte]]("Hello!")) await sleepAsync(10.millis) - await sleepAsync(100.millis) - await allFutures(nodes.mapIt(it.stop())) await allFutures(awaitters) - result = passed >= 10 # non deterministic, so at least 20 times + result = passed >= 10 # non deterministic, so at least 10 times check: waitFor(runTests()) == true diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index b6219bd3d..be88a0090 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import unittest, sequtils, options, tables, sets +import unittest, sequtils, options, tables, sets, heapqueue import chronos import utils, ../../libp2p/[peer, peerinfo, @@ -35,9 +35,10 @@ suite "GossipSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await subscribeNodes(nodes) + await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) - await sleepAsync(1000.millis) + await subscribeNodes(nodes) + await sleepAsync(100.millis) var validatorFut = newFuture[bool]() proc validator(topic: string, @@ -50,12 +51,13 @@ suite "GossipSub": nodes[1].addValidator("foobar", validator) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await allFutures(handlerFut, handlerFut) + result = (await validatorFut) and (await handlerFut) await allFutures(nodes[0].stop(), nodes[1].stop()) await allFutures(awaiters) - result = true + check: waitFor(runTests()) == true + getGlobalDispatcher().timers.clear() test "GossipSub validation should fail": proc runTests(): Future[bool] {.async.} = @@ -82,13 +84,13 @@ suite "GossipSub": await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await sleepAsync(100.millis) - discard await validatorFut + result = await validatorFut await allFutures(nodes[0].stop(), nodes[1].stop()) await allFutures(awaiters) - result = true check: waitFor(runTests()) == true + getGlobalDispatcher().timers.clear() test "GossipSub validation one fails and one succeeds": proc runTests(): Future[bool] {.async.} = @@ -102,31 +104,35 @@ suite "GossipSub": awaiters.add((await nodes[0].start())) awaiters.add((await nodes[1].start())) - await subscribeNodes(nodes) await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("bar", handler) - await sleepAsync(1000.millis) + await subscribeNodes(nodes) + await sleepAsync(100.millis) + var passed, failed: Future[bool] = newFuture[bool]() proc validator(topic: string, message: Message): Future[bool] {.async.} = - if topic == "foo": - result = true + result = if topic == "foo": + passed.complete(true) + true else: - result = false + failed.complete(true) + false nodes[1].addValidator("foo", "bar", validator) await nodes[0].publish("foo", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) - await sleepAsync(100.millis) + result = ((await passed) and (await failed) and (await handlerFut)) await allFutures(nodes[0].stop(), nodes[1].stop()) await allFutures(awaiters) result = true check: waitFor(runTests()) == true + getGlobalDispatcher().timers.clear() - test "should add remote peer topic subscriptions": + test "GossipSub should add remote peer topic subscriptions": proc runTests(): Future[bool] {.async.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard @@ -148,18 +154,19 @@ suite "GossipSub": asyncCheck gossip2.handleConn(conn1, GossipSubCodec) await gossip1.subscribe("foobar", handler) - await sleepAsync(1.seconds) + await sleepAsync(10.millis) check: - "foobar" in gossip2.gossipsub - gossip1.peerInfo.id in gossip2.gossipsub["foobar"] + "foobar" in gossip2.gossipsub + gossip1.peerInfo.id in gossip2.gossipsub["foobar"] result = true check: waitFor(runTests()) == true + getGlobalDispatcher().timers.clear() - test "e2e - should add remote peer topic subscriptions": + test "e2e - GossipSub should add remote peer topic subscriptions": proc testBasicGossipSub(): Future[bool] {.async.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard @@ -173,9 +180,8 @@ suite "GossipSub": awaitters.add(await node.start()) await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) - await subscribeNodes(nodes) + await sleepAsync(100.millis) let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip2 = GossipSub(nodes[1].pubSub.get()) @@ -192,8 +198,9 @@ suite "GossipSub": check: waitFor(testBasicGossipSub()) == true + getGlobalDispatcher().timers.clear() - test "should add remote peer topic subscriptions if both peers are subscribed": + test "GossipSub should add remote peer topic subscriptions if both peers are subscribed": proc runTests(): Future[bool] {.async.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard @@ -219,27 +226,28 @@ suite "GossipSub": await gossip1.subscribe("foobar", handler) await gossip2.subscribe("foobar", handler) - await sleepAsync(1.seconds) + await sleepAsync(100.millis) check: - "foobar" in gossip1.topics - "foobar" in gossip2.topics + "foobar" in gossip1.topics + "foobar" in gossip2.topics - "foobar" in gossip1.gossipsub - "foobar" in gossip2.gossipsub + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub - # TODO: in a real setting, we would be checking for the peerId from - # gossip1 in gossip2 and vice versa, but since we're doing some mockery - # with connection piping and such, this is fine - do not change! - gossip1.peerInfo.id in gossip1.gossipsub["foobar"] - gossip2.peerInfo.id in gossip2.gossipsub["foobar"] + # TODO: in a real setting, we would be checking for the peerId from + # gossip1 in gossip2 and vice versa, but since we're doing some mockery + # with connection piping and such, this is fine - do not change! + gossip1.peerInfo.id in gossip1.gossipsub["foobar"] + gossip2.peerInfo.id in gossip2.gossipsub["foobar"] result = true check: waitFor(runTests()) == true + getGlobalDispatcher().timers.clear() - test "e2e - should add remote peer topic subscriptions if both peers are subscribed": + test "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed": proc testBasicGossipSub(): Future[bool] {.async.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard @@ -253,12 +261,9 @@ suite "GossipSub": awaitters.add(await node.start()) await nodes[0].subscribe("foobar", handler) - await sleepAsync(100.millis) - await nodes[1].subscribe("foobar", handler) - await sleepAsync(100.millis) - await subscribeNodes(nodes) + await sleepAsync(100.millis) let gossip1 = GossipSub(nodes[0].pubSub.get()) let gossip2 = GossipSub(nodes[1].pubSub.get()) @@ -280,6 +285,7 @@ suite "GossipSub": check: waitFor(testBasicGossipSub()) == true + getGlobalDispatcher().timers.clear() # test "send over fanout A -> B": # proc runTests(): Future[bool] {.async.} = @@ -321,7 +327,7 @@ suite "GossipSub": # check: # waitFor(runTests()) == true - test "e2e - send over fanout A -> B": + test "e2e - GossipSub send over fanout A -> B": proc runTests(): Future[bool] {.async.} = var passed: bool proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -336,10 +342,8 @@ suite "GossipSub": await subscribeNodes(nodes) await nodes[1].subscribe("foobar", handler) - await sleepAsync(3.seconds) - + await sleepAsync(100.millis) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await sleepAsync(3.seconds) var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) @@ -354,6 +358,7 @@ suite "GossipSub": check: waitFor(runTests()) == true + getGlobalDispatcher().timers.clear() # test "send over mesh A -> B": # proc runTests(): Future[bool] {.async.} = @@ -394,34 +399,32 @@ suite "GossipSub": # check: # waitFor(runTests()) == true - # test "e2e - send over mesh A -> B": + # test "e2e - GossipSub send over mesh A -> B": # proc runTests(): Future[bool] {.async.} = - # var passed: bool + # var passed: Future[bool] = newFuture[bool]() # proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = # check topic == "foobar" - # passed = true + # passed.complete(true) # var nodes = generateNodes(2, true) - # var wait = await nodes[1].start() + # var wait: seq[Future[void]] + # wait.add(await nodes[0].start()) + # wait.add(await nodes[1].start()) + + # await nodes[0].subscribe("foobar", handler) + # await nodes[1].subscribe("foobar", handler) # await nodes[0].subscribeToPeer(nodes[1].peerInfo) # await sleepAsync(100.millis) - # await nodes[0].subscribe("foobar", handler) - # await sleepAsync(100.millis) - - # await nodes[1].subscribe("foobar", handler) - # await sleepAsync(100.millis) - # await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - # await sleepAsync(1000.millis) - + # result = await passed # await nodes[1].stop() # await allFutures(wait) - # result = passed # check: # waitFor(runTests()) == true + # getGlobalDispatcher().timers.clear() # test "with multiple peers": # proc runTests(): Future[bool] {.async.} = @@ -481,16 +484,18 @@ suite "GossipSub": # check: # waitFor(runTests()) == true - test "e2e - with multiple peers": + test "e2e - GossipSub with multiple peers": proc runTests(): Future[bool] {.async.} = var nodes: seq[Switch] = newSeq[Switch]() var awaitters: seq[Future[void]] - for i in 0..<10: + for i in 0..<11: nodes.add newStandardSwitch(triggerSelf = true, gossip = true) awaitters.add((await nodes[i].start())) var seen: Table[string, int] + var subs: seq[Future[void]] + var seenFut = newFuture[void]() for dialer in nodes: var handler: TopicHandler closureScope: @@ -500,26 +505,28 @@ suite "GossipSub": seen[dialerNode.peerInfo.id] = 0 seen[dialerNode.peerInfo.id].inc check topic == "foobar" + if not seenFut.finished() and seen.len == 10: + seenFut.complete() - await dialer.subscribe("foobar", handler) - await sleepAsync(20.millis) - + subs.add(dialer.subscribe("foobar", handler)) + await allFutures(subs) await subscribeNodes(nodes) - await sleepAsync(10.millis) + await sleepAsync(1.seconds) - await nodes[0].publish("foobar", - cast[seq[byte]]("from node " & - nodes[1].peerInfo.id)) + await wait(nodes[0].publish("foobar", + cast[seq[byte]]("from node " & + nodes[1].peerInfo.id)), + 1.minutes) - await sleepAsync(1000.millis) - await allFutures(nodes.mapIt(it.stop())) - await allFutures(awaitters) - - check: seen.len == 10 + await wait(seenFut, 1.minutes) + check: seen.len >= 10 for k, v in seen.pairs: check: v == 1 + await allFutures(nodes.mapIt(it.stop())) + await allFutures(awaitters) result = true check: waitFor(runTests()) == true + getGlobalDispatcher().timers.clear() diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 55af70d2f..76926617a 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -8,8 +8,10 @@ proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] = result.add(newStandardSwitch(gossip = gossip)) proc subscribeNodes*(nodes: seq[Switch]) {.async.} = + var dials: seq[Future[void]] for dialer in nodes: for node in nodes: if dialer.peerInfo.peerId != node.peerInfo.peerId: - await dialer.subscribeToPeer(node.peerInfo) - await sleepAsync(100.millis) + dials.add(dialer.subscribeToPeer(node.peerInfo)) + await sleepAsync(100.millis) + await allFutures(dials) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 5e5472d1f..8f65411d1 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -1,4 +1,4 @@ -import unittest, tables, options +import unittest, tables import chronos import ../libp2p/[switch, multistream, @@ -35,7 +35,6 @@ method init(p: TestProto) {.gcsafe.} = p.handler = handle proc createSwitch(ma: MultiAddress): (Switch, PeerInfo) = - let seckey = PrivateKey.random(RSA) var peerInfo: PeerInfo = PeerInfo.init(PrivateKey.random(RSA)) peerInfo.addrs.add(ma) let identify = newIdentify(peerInfo) @@ -68,16 +67,18 @@ suite "Switch": testProto.init() testProto.codec = TestCodec switch1.mount(testProto) - asyncCheck switch1.start() + var awaiters: seq[Future[void]] + awaiters.add(await switch1.start()) (switch2, peerInfo2) = createSwitch(ma2) - asyncCheck switch2.start() + awaiters.add(await switch2.start()) let conn = await switch2.dial(switch1.peerInfo, TestCodec) await conn.writeLp("Hello!") let msg = cast[string](await conn.readLp()) check "Hello!" == msg - discard allFutures(switch1.stop(), switch2.stop()) + await allFutures(switch1.stop(), switch2.stop()) + await allFutures(awaiters) result = true check: @@ -91,10 +92,11 @@ suite "Switch": var peerInfo1, peerInfo2: PeerInfo var switch1, switch2: Switch (switch1, peerInfo1) = createSwitch(ma1) - asyncCheck switch1.start() + var awaiters: seq[Future[void]] + awaiters.add(await switch1.start()) (switch2, peerInfo2) = createSwitch(ma2) - asyncCheck switch2.start() + awaiters.add(await switch2.start()) var conn = await switch2.dial(switch1.peerInfo) check isNil(conn)