From 20c68a2018c445dc640d555a032e03768003ac86 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 23 May 2020 11:14:22 -0600 Subject: [PATCH] use all() for futures and track connections --- tests/helpers.nim | 2 + tests/pubsub/testfloodsub.nim | 50 ++++++++++---------- tests/pubsub/testgossipinternal.nim | 18 ++++---- tests/pubsub/testgossipsub.nim | 34 +++++++------- tests/testbufferstream.nim | 8 ++-- tests/testinterop.nim | 72 ++++++++++++----------------- tests/testmplex.nim | 18 ++++---- tests/testmultistream.nim | 18 +++++--- tests/testnoise.nim | 8 ++-- tests/testpeerinfo.nim | 7 +-- tests/testswitch.nim | 8 ++-- 11 files changed, 117 insertions(+), 126 deletions(-) diff --git a/tests/helpers.nim b/tests/helpers.nim index aac3ce759..aad4a6ef9 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -2,12 +2,14 @@ import chronos import ../libp2p/transports/tcptransport import ../libp2p/stream/bufferstream +import ../libp2p/connection const StreamTransportTrackerName = "stream.transport" StreamServerTrackerName = "stream.server" trackerNames = [ + ConnectionTrackerName, BufferStreamTrackerName, TcpTransportTrackerName, StreamTransportTrackerName, diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 9d7cc9406..82312f8fb 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -50,7 +50,7 @@ suite "FloodSub": let nodes = generateNodes(2) - nodesFut = await allFinished( + nodesFut = await all( nodes[0].start(), nodes[1].start() ) @@ -64,14 +64,13 @@ suite "FloodSub": result = await completionFut.wait(5.seconds) - await allFuturesThrowing( + await all( nodes[0].stop(), nodes[1].stop() ) - for fut in nodesFut: - let res = fut.read() - await allFuturesThrowing(res) + await all(nodesFut.concat()) + check: waitFor(runTests()) == true @@ -96,8 +95,8 @@ suite "FloodSub": result = await completionFut.wait(5.seconds) - await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) - await allFuturesThrowing(awaiters) + await all(nodes[0].stop(), nodes[1].stop()) + await all(awaiters) check: waitFor(runTests()) == true @@ -129,9 +128,9 @@ suite "FloodSub": await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await allFuturesThrowing(handlerFut, handlerFut) - await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) - await allFuturesThrowing(awaiters) + check (await handlerFut) == true + await all(nodes[0].stop(), nodes[1].stop()) + await all(awaiters) result = true check: @@ -161,8 +160,8 @@ suite "FloodSub": await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) - await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) - await allFuturesThrowing(awaiters) + await all(nodes[0].stop(), nodes[1].stop()) + await all(awaiters) result = true check: @@ -198,8 +197,8 @@ suite "FloodSub": await nodes[0].publish("foo", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) - await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) - await allFuturesThrowing(awaiters) + await all(nodes[0].stop(), nodes[1].stop()) + await all(awaiters) result = true check: @@ -220,7 +219,7 @@ suite "FloodSub": (proc(topic: string, data: seq[byte]) {.async, gcsafe.} = check topic == "foobar" inc counter[] - if counter[] == 9: + if counter[] == runs - 1: fut.complete()), counter ) @@ -229,7 +228,6 @@ suite "FloodSub": for i in 0.. 0.uint: await s.readExactly(addr result[0], int(size)) -proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey), - address: string = "/ip4/127.0.0.1/tcp/0", - triggerSelf: bool = false, - gossip: bool = false): Switch = - var seckey = privKey - if privKey.isNone: - seckey = some(PrivateKey.random(RSA).get()) - - var peerInfo = NativePeerInfo.init(seckey.get(), [Multiaddress.init(address).tryGet()]) - proc createMplex(conn: Connection): Muxer = newMplex(conn) - let mplexProvider = newMuxerProvider(createMplex, MplexCodec) - let transports = @[Transport(TcpTransport.init())] - let muxers = [(MplexCodec, mplexProvider)].toTable() - let identify = newIdentify(peerInfo) - let secureManagers = [Secure(newSecio(seckey.get()))] - - var pubSub: Option[PubSub] - if gossip: - pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf))) - else: - pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf))) - - result = newSwitch(peerInfo, - transports, - identify, - muxers, - secureManagers = secureManagers, - pubSub = pubSub) - proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1): Future[bool] {.async.} = var pubsubData = "TEST MESSAGE" @@ -100,7 +73,7 @@ proc testPubSubDaemonPublish(gossip: bool = false, let daemonNode = await newDaemonApi(flags) let daemonPeer = await daemonNode.identity() - let nativeNode = createNode(gossip = gossip) + let nativeNode = newStandardSwitch(gossip = gossip) let awaiters = nativeNode.start() let nativePeer = nativeNode.peerInfo @@ -110,6 +83,7 @@ proc testPubSubDaemonPublish(gossip: bool = false, let smsg = cast[string](data) check smsg == pubsubData times.inc() + echo "TIMES ", times if times >= count and not finished: finished = true @@ -125,15 +99,16 @@ proc testPubSubDaemonPublish(gossip: bool = false, asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) await nativeNode.subscribe(testTopic, nativeHandler) - await sleepAsync(1.seconds) + await sleepAsync(5.seconds) proc publisher() {.async.} = while not finished: await daemonNode.pubsubPublish(testTopic, msgData) - await sleepAsync(100.millis) + await sleepAsync(500.millis) await wait(publisher(), 5.minutes) # should be plenty of time + echo "HEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE" result = true await nativeNode.stop() await allFutures(awaiters) @@ -151,7 +126,7 @@ proc testPubSubNodePublish(gossip: bool = false, let daemonNode = await newDaemonApi(flags) let daemonPeer = await daemonNode.identity() - let nativeNode = createNode(gossip = gossip) + let nativeNode = newStandardSwitch(gossip = gossip) let awaiters = nativeNode.start() let nativePeer = nativeNode.peerInfo @@ -169,6 +144,7 @@ proc testPubSubNodePublish(gossip: bool = false, let smsg = cast[string](message.data) check smsg == pubsubData times.inc() + echo "TIMES ", times if times >= count and not finished: finished = true result = true # don't cancel subscription @@ -176,12 +152,12 @@ proc testPubSubNodePublish(gossip: bool = false, discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard await nativeNode.subscribe(testTopic, nativeHandler) - await sleepAsync(1.seconds) + await sleepAsync(5.seconds) proc publisher() {.async.} = while not finished: await nativeNode.publish(testTopic, msgData) - await sleepAsync(100.millis) + await sleepAsync(500.millis) await wait(publisher(), 5.minutes) # should be plenty of time @@ -191,11 +167,16 @@ proc testPubSubNodePublish(gossip: bool = false, await daemonNode.close() suite "Interop": + teardown: + for tracker in testTrackers(): + echo tracker.dump() + # check tracker.isLeaked() == false + test "native -> daemon multiple reads and writes": proc runTests(): Future[bool] {.async.} = var protos = @["/test-stream"] - let nativeNode = createNode() + let nativeNode = newStandardSwitch() let awaiters = await nativeNode.start() let daemonNode = await newDaemonApi() let daemonPeer = await daemonNode.identity() @@ -222,9 +203,13 @@ suite "Interop": check "test 4" == cast[string]((await conn.readLp(1024))) await wait(testFuture, 10.secs) + await conn.close() + + await daemonNode.close() await nativeNode.stop() await allFutures(awaiters) - await daemonNode.close() + + await sleepAsync(1.seconds) result = true check: @@ -243,7 +228,7 @@ suite "Interop": var expect = newString(len(buffer) - 2) copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) - let nativeNode = createNode() + let nativeNode = newStandardSwitch() let awaiters = await nativeNode.start() let daemonNode = await newDaemonApi() @@ -263,6 +248,8 @@ suite "Interop": protos[0]) await conn.writeLp(test & "\r\n") result = expect == (await wait(testFuture, 10.secs)) + + await conn.close() await nativeNode.stop() await allFutures(awaiters) await daemonNode.close() @@ -287,7 +274,7 @@ suite "Interop": proto.handler = nativeHandler proto.codec = protos[0] # codec - let nativeNode = createNode() + let nativeNode = newStandardSwitch() nativeNode.mount(proto) let awaiters = await nativeNode.start() @@ -326,7 +313,7 @@ suite "Interop": proto.handler = nativeHandler proto.codec = protos[0] # codec - let nativeNode = createNode() + let nativeNode = newStandardSwitch() nativeNode.mount(proto) let awaiters = await nativeNode.start() @@ -365,6 +352,7 @@ suite "Interop": check line == test await conn.writeLp(cast[seq[byte]](test)) count.inc() + echo "COUNT ", count testFuture.complete(count) await conn.close() @@ -374,7 +362,7 @@ suite "Interop": proto.handler = nativeHandler proto.codec = protos[0] # codec - let nativeNode = createNode() + let nativeNode = newStandardSwitch() nativeNode.mount(proto) let awaiters = await nativeNode.start() diff --git a/tests/testmplex.nim b/tests/testmplex.nim index eea99636c..de6c3922c 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -186,7 +186,7 @@ suite "Mplex": var data = newSeq[byte](1) try: await chann.readExactly(addr data[0], 1) - doAssert(len(data) == 1) + check data.len == 1 except LPStreamEOFError: result = true finally: @@ -245,7 +245,7 @@ suite "Mplex": await done.wait(1.seconds) await conn.close() await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(testNewStream()) @@ -284,7 +284,7 @@ suite "Mplex": await done.wait(1.seconds) await conn.close() await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(testNewStream()) @@ -331,7 +331,7 @@ suite "Mplex": await stream.close() await conn.close() await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(testNewStream()) @@ -368,7 +368,7 @@ suite "Mplex": await done.wait(1.seconds) await conn.close() await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(testNewStream()) @@ -410,7 +410,7 @@ suite "Mplex": await done.wait(10.seconds) await conn.close() await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(testNewStream()) @@ -454,7 +454,7 @@ suite "Mplex": await done.wait(5.seconds) await conn.close() await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(testNewStream()) @@ -522,7 +522,7 @@ suite "Mplex": await complete.wait(1.seconds) await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(test()) @@ -579,7 +579,7 @@ suite "Mplex": await stream.close() await conn.close() await mplexDialFut - await allFuturesThrowing(transport1.close(), transport2.close()) + await all(transport1.close(), transport2.close()) await listenFut waitFor(test()) diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 9346d73af..ba7a6c46f 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -151,6 +151,7 @@ proc newTestNaStream(na: NaHandler): TestNaStream = suite "Multistream select": teardown: for tracker in testTrackers(): + # echo tracker.dump() check tracker.isLeaked() == false test "test select custom proto": @@ -276,7 +277,7 @@ suite "Multistream select": await transport2.close() await transport1.close() - await allFuturesThrowing(handlerWait1.wait(5000.millis) #[if OK won't happen!!]#, handlerWait2.wait(5000.millis) #[if OK won't happen!!]#) + await all(handlerWait1.wait(5000.millis) #[if OK won't happen!!]#, handlerWait2.wait(5000.millis) #[if OK won't happen!!]#) check: waitFor(endToEnd()) == true @@ -306,10 +307,16 @@ suite "Multistream select": let transport1: TcpTransport = TcpTransport.init() proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = - await msListen.handle(conn) - handlerWait.complete() + try: + await msListen.handle(conn) + except LPStreamEOFError: + discard + except LPStreamClosedError: + discard + finally: + await conn.close() - asyncCheck transport1.listen(ma, connHandler) + let listenFut = transport1.listen(ma, connHandler) let msDial = newMultistream() let transport2: TcpTransport = TcpTransport.init() @@ -323,8 +330,7 @@ suite "Multistream select": await conn.close() await transport2.close() await transport1.close() - - await handlerWait.wait(5000.millis) # when no issues will not wait that long! + discard await listenFut.wait(5.seconds) check: waitFor(endToEnd()) == true diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 948301dd2..5565d25c5 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -175,7 +175,7 @@ suite "Noise": let transport1: TcpTransport = TcpTransport.init() - asyncCheck await transport1.listen(server, connHandler) + listenFut = await transport1.listen(server, connHandler) let transport2: TcpTransport = TcpTransport.init() @@ -191,6 +191,7 @@ suite "Noise": await conn.close() await transport2.close() await transport1.close() + await listenFut result = true @@ -219,9 +220,10 @@ suite "Noise": await conn.writeLp("Hello!") let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg + await conn.close() - await allFuturesThrowing(switch1.stop(), switch2.stop()) - await allFuturesThrowing(awaiters) + await all(switch1.stop(), switch2.stop()) + await all(awaiters) result = true check: diff --git a/tests/testpeerinfo.nim b/tests/testpeerinfo.nim index 1cba02b72..369b23b16 100644 --- a/tests/testpeerinfo.nim +++ b/tests/testpeerinfo.nim @@ -5,13 +5,8 @@ import chronos import ../libp2p/crypto/crypto, ../libp2p/peerinfo, ../libp2p/peer -import ./helpers suite "PeerInfo": - teardown: - for tracker in testTrackers(): - check tracker.isLeaked() == false - test "Should init with private key": let seckey = PrivateKey.random(ECDSA).get() var peerInfo = PeerInfo.init(seckey) @@ -43,7 +38,7 @@ suite "PeerInfo": check: PeerID.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") == peerInfo.peerId - # TODO: CIDv1 is handling is missing from PeerID + # TODO: CIDv1 handling is missing from PeerID # https://github.com/status-im/nim-libp2p/issues/53 # test "Should init from CIDv1 string": # var peerInfo = PeerInfo.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe") diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 1951cc601..31ca5ac05 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -90,7 +90,7 @@ suite "Switch": let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg - await allFuturesThrowing( + await all( done.wait(5.seconds) #[if OK won't happen!!]#, conn.close(), switch1.stop(), @@ -98,7 +98,7 @@ suite "Switch": ) # this needs to go at end - await allFuturesThrowing(awaiters) + await all(awaiters) waitFor(testSwitch()) @@ -138,12 +138,12 @@ suite "Switch": except LPStreamError: result = false - await allFuturesThrowing( + await all( conn.close(), switch1.stop(), switch2.stop() ) - await allFuturesThrowing(awaiters) + await all(awaiters) check: waitFor(testSwitch()) == true