Cleanup tests (#435)

* add async testing methods

* refactor with async testing methods

* use iffy in async tests
This commit is contained in:
Dmitriy Ryajov 2020-11-12 21:44:02 -06:00 committed by GitHub
parent 23ffd1f9f9
commit 55b763264e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 3010 additions and 3544 deletions

View File

@ -46,6 +46,27 @@ template checkTrackers*() =
# Also test the GC is not fooling with us
GC_fullCollect()
template asyncTeardown*(body: untyped): untyped =
teardown:
waitFor((
proc() {.async.} =
body
)())
template asyncSetup*(body: untyped): untyped =
setup:
waitFor((
proc() {.async.} =
body
)())
template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((
proc() {.async.} =
body
)())
type RngWrap = object
rng: ref BrHmacDrbgContext

View File

@ -38,8 +38,7 @@ suite "FloodSub":
teardown:
checkTrackers()
test "FloodSub basic publish/subscribe A -> B":
proc runTests() {.async.} =
asyncTest "FloodSub basic publish/subscribe A -> B":
var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
@ -81,10 +80,7 @@ suite "FloodSub":
await allFuturesThrowing(nodesFut.concat())
waitFor(runTests())
test "FloodSub basic publish/subscribe B -> A":
proc runTests() {.async.} =
asyncTest "FloodSub basic publish/subscribe B -> A":
var completionFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
@ -127,10 +123,7 @@ suite "FloodSub":
await allFuturesThrowing(nodesFut)
waitFor(runTests())
test "FloodSub validation should succeed":
proc runTests() {.async.} =
asyncTest "FloodSub validation should succeed":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
@ -181,10 +174,7 @@ suite "FloodSub":
await allFuturesThrowing(nodesFut)
waitFor(runTests())
test "FloodSub validation should fail":
proc runTests() {.async.} =
asyncTest "FloodSub validation should fail":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
@ -230,10 +220,7 @@ suite "FloodSub":
await allFuturesThrowing(nodesFut)
waitFor(runTests())
test "FloodSub validation one fails and one succeeds":
proc runTests() {.async.} =
asyncTest "FloodSub validation one fails and one succeeds":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo"
@ -285,10 +272,7 @@ suite "FloodSub":
await allFuturesThrowing(nodesFut)
waitFor(runTests())
test "FloodSub multiple peers, no self trigger":
proc runTests() {.async.} =
asyncTest "FloodSub multiple peers, no self trigger":
var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
@ -338,10 +322,7 @@ suite "FloodSub":
await allFuturesThrowing(nodesFut)
waitFor(runTests())
test "FloodSub multiple peers, with self trigger":
proc runTests() {.async.} =
asyncTest "FloodSub multiple peers, with self trigger":
var runs = 10
var futs = newSeq[(Future[void], TopicHandler, ref int)](runs)
@ -390,5 +371,3 @@ suite "FloodSub":
it.switch.stop())))
await allFuturesThrowing(nodesFut)
waitFor(runTests())

View File

@ -30,18 +30,11 @@ suite "GossipSub internal":
teardown:
checkTrackers()
test "topic params":
proc testRun(): Future[bool] {.async.} =
asyncTest "topic params":
let params = TopicParams.init()
params.validateParameters().tryGet()
return true
check:
waitFor(testRun()) == true
test "`rebalanceMesh` Degree Lo":
proc testRun(): Future[bool] {.async.} =
asyncTest "`rebalanceMesh` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
@ -67,13 +60,8 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`rebalanceMesh` Degree Hi":
proc testRun(): Future[bool] {.async.} =
asyncTest "`rebalanceMesh` Degree Hi":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
@ -100,13 +88,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`replenishFanout` Degree Lo":
proc testRun(): Future[bool] {.async.} =
asyncTest "`replenishFanout` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -134,13 +116,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`dropFanoutPeers` drop expired fanout topics":
proc testRun(): Future[bool] {.async.} =
asyncTest "`dropFanoutPeers` drop expired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -171,13 +147,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`dropFanoutPeers` leave unexpired fanout topics":
proc testRun(): Future[bool] {.async.} =
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -215,13 +185,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should gather up to degree D non intersecting peers":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -284,13 +248,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in mesh":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -331,13 +289,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in fanout":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -379,13 +331,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -426,8 +372,3 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true

View File

@ -28,12 +28,9 @@ proc randomPeerInfo(): PeerInfo =
suite "GossipSub internal":
teardown:
for tracker in testTrackers():
# echo tracker.dump()
check tracker.isLeaked() == false
checkTrackers()
test "`rebalanceMesh` Degree Lo":
proc testRun(): Future[bool] {.async.} =
asyncTest "`rebalanceMesh` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
@ -57,13 +54,8 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`rebalanceMesh` Degree Hi":
proc testRun(): Future[bool] {.async.} =
asyncTest "`rebalanceMesh` Degree Hi":
let gossipSub = TestGossipSub.init(newStandardSwitch())
let topic = "foobar"
@ -88,13 +80,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`replenishFanout` Degree Lo":
proc testRun(): Future[bool] {.async.} =
asyncTest "`replenishFanout` Degree Lo":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -120,13 +106,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`dropFanoutPeers` drop expired fanout topics":
proc testRun(): Future[bool] {.async.} =
asyncTest "`dropFanoutPeers` drop expired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -155,13 +135,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`dropFanoutPeers` leave unexpired fanout topics":
proc testRun(): Future[bool] {.async.} =
asyncTEst "`dropFanoutPeers` leave unexpired fanout topics":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -196,13 +170,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should gather up to degree D non intersecting peers":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -261,13 +229,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in mesh":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -306,13 +268,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in fanout":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -351,13 +307,7 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true
test "`getGossipPeers` - should not crash on missing topics in gossip":
proc testRun(): Future[bool] {.async.} =
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
@ -395,8 +345,3 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
result = true
check:
waitFor(testRun()) == true

View File

@ -75,8 +75,7 @@ suite "GossipSub":
teardown:
checkTrackers()
test "GossipSub validation should succeed":
proc runTests() {.async.} =
asyncTest "GossipSub validation should succeed":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
@ -134,10 +133,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
waitFor(runTests())
test "GossipSub validation should fail":
proc runTests() {.async.} =
asyncTest "GossipSub validation should fail":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
@ -199,10 +195,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
waitFor(runTests())
test "GossipSub validation one fails and one succeeds":
proc runTests() {.async.} =
asyncTest "GossipSub validation one fails and one succeeds":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foo"
@ -267,10 +260,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
waitFor(runTests())
test "e2e - GossipSub should add remote peer topic subscriptions":
proc testBasicGossipSub() {.async.} =
asyncTest "e2e - GossipSub should add remote peer topic subscriptions":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
@ -318,10 +308,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
waitFor(testBasicGossipSub())
test "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
proc testBasicGossipSub() {.async.} =
asyncTest "e2e - GossipSub should add remote peer topic subscriptions if both peers are subscribed":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
@ -384,10 +371,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
waitFor(testBasicGossipSub())
test "e2e - GossipSub send over fanout A -> B":
proc runTests() {.async.} =
asyncTest "e2e - GossipSub send over fanout A -> B":
var passed = newFuture[void]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
@ -459,10 +443,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
check observed == 2
waitFor(runTests())
test "e2e - GossipSub send over mesh A -> B":
proc runTests(): Future[bool] {.async.} =
asyncTest "e2e - GossipSub send over mesh A -> B":
var passed: Future[bool] = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar"
@ -495,7 +476,7 @@ suite "GossipSub":
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await passed
check await passed
var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1])
@ -520,11 +501,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
check:
waitFor(runTests()) == true
test "e2e - GossipSub with multiple peers":
proc runTests() {.async.} =
asyncTest "e2e - GossipSub with multiple peers":
var runs = 10
let
@ -575,10 +552,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut)
waitFor(runTests())
test "e2e - GossipSub with multiple peers (sparse)":
proc runTests() {.async.} =
asyncTest "e2e - GossipSub with multiple peers (sparse)":
var runs = 10
let
@ -629,5 +603,3 @@ suite "GossipSub":
it.switch.stop())))
await allFuturesThrowing(nodesFut)
waitFor(runTests())

View File

@ -3,6 +3,8 @@ import chronos, stew/byteutils
import ../libp2p/stream/bufferstream,
../libp2p/stream/lpstream
import ./helpers
{.used.}
suite "BufferStream":
@ -10,22 +12,15 @@ suite "BufferStream":
# echo getTracker(BufferStreamTrackerName).dump()
check getTracker(BufferStreamTrackerName).isLeaked() == false
test "push data to buffer":
proc testpushData(): Future[bool] {.async.} =
asyncTest "push data to buffer":
let buff = newBufferStream()
check buff.len == 0
var data = "12345"
await buff.pushData(data.toBytes())
check buff.len == 5
result = true
await buff.close()
check:
waitFor(testpushData()) == true
test "push and wait":
proc testpushData(): Future[bool] {.async.} =
asyncTest "push and wait":
let buff = newBufferStream()
check buff.len == 0
@ -40,16 +35,9 @@ suite "BufferStream":
await fut0
await fut1
check buff.len == 4
result = true
await buff.close()
check:
waitFor(testpushData()) == true
test "read with size":
proc testRead(): Future[bool] {.async.} =
asyncTest "read with size":
let buff = newBufferStream()
check buff.len == 0
@ -57,16 +45,9 @@ suite "BufferStream":
var data: array[3, byte]
await buff.readExactly(addr data[0], data.len)
check ['1', '2', '3'] == string.fromBytes(data)
result = true
await buff.close()
check:
waitFor(testRead()) == true
test "readExactly":
proc testReadExactly(): Future[bool] {.async.} =
asyncTest "readExactly":
let buff = newBufferStream()
check buff.len == 0
@ -75,16 +56,9 @@ suite "BufferStream":
var data: array[2, byte]
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == ['1', '2']
result = true
await buff.close()
check:
waitFor(testReadExactly()) == true
test "readExactly raises":
proc testReadExactly(): Future[bool] {.async.} =
asyncTest "readExactly raises":
let buff = newBufferStream()
check buff.len == 0
@ -93,16 +67,10 @@ suite "BufferStream":
var readFut = buff.readExactly(addr data[0], data.len)
await buff.close()
try:
expect LPStreamIncompleteError:
await readFut
except LPStreamIncompleteError:
result = true
check:
waitFor(testReadExactly()) == true
test "readOnce":
proc testReadOnce(): Future[bool] {.async.} =
asyncTest "readOnce":
let buff = newBufferStream()
check buff.len == 0
@ -113,16 +81,9 @@ suite "BufferStream":
check (await readFut) == 3
check string.fromBytes(data) == ['1', '2', '3']
result = true
await buff.close()
check:
waitFor(testReadOnce()) == true
test "reads should happen in order":
proc testWritePtr(): Future[bool] {.async.} =
asyncTest "reads should happen in order":
let buff = newBufferStream()
check buff.len == 0
@ -157,16 +118,9 @@ suite "BufferStream":
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == "Msg 6"
for f in [w4, w5, w6]: await f
result = true
check:
waitFor(testWritePtr()) == true
test "small reads":
proc testWritePtr(): Future[bool] {.async.} =
asyncTest "small reads":
let buff = newBufferStream()
check buff.len == 0
@ -179,55 +133,33 @@ suite "BufferStream":
var str2: string
var data: array[2, byte]
try:
expect LPStreamEOFError:
while true:
let x = await buff.readOnce(addr data[0], data.len)
str2 &= string.fromBytes(data[0..<x])
except LPStreamEOFError:
discard
for f in writes: await f
check str == str2
result = true
await buff.close()
check:
waitFor(testWritePtr()) == true
test "shouldn't get stuck on close":
proc closeTest(): Future[bool] {.async.} =
asyncTest "shouldn't get stuck on close":
var stream = newBufferStream()
var
fut = stream.pushData(toBytes("hello"))
fut2 = stream.pushData(toBytes("again"))
await stream.close()
try:
expect AsyncTimeoutError:
await wait(fut, 100.milliseconds)
await wait(fut2, 100.milliseconds)
result = true
except AsyncTimeoutError:
result = false
await stream.close()
check:
waitFor(closeTest()) == true
test "no push after close":
proc closeTest(): Future[bool] {.async.} =
asyncTest "no push after close":
var stream = newBufferStream()
await stream.pushData("123".toBytes())
var data: array[3, byte]
await stream.readExactly(addr data[0], data.len)
await stream.close()
try:
expect LPStreamEOFError:
await stream.pushData("123".toBytes())
except LPStreamClosedError:
result = true
check:
waitFor(closeTest()) == true

View File

@ -3,21 +3,16 @@ import chronos, nimcrypto/utils
import ../libp2p/[stream/connection,
stream/bufferstream]
import ./helpers
suite "Connection":
test "close":
proc test(): Future[bool] {.async.} =
asyncTest "close":
var conn = newBufferStream()
await conn.close()
check:
conn.closed == true
result = true
check:
waitFor(test()) == true
test "parent close":
proc test(): Future[bool] {.async.} =
asyncTest "parent close":
var buf = newBufferStream()
var conn = buf
@ -26,14 +21,7 @@ suite "Connection":
conn.closed == true
buf.closed == true
await sleepAsync(1.seconds)
result = true
check:
waitFor(test()) == true
test "child close":
proc test(): Future[bool] {.async.} =
asyncTest "child close":
var buf = newBufferStream()
var conn = buf
@ -41,9 +29,3 @@ suite "Connection":
check:
conn.closed == true
buf.closed == true
await sleepAsync(1.seconds)
result = true
check:
waitFor(test()) == true

View File

@ -17,8 +17,7 @@ suite "Identify":
teardown:
checkTrackers()
test "handle identify message":
proc testHandle(): Future[bool] {.async.} =
asyncTest "handle identify message":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get()
let remotePeerInfo = PeerInfo.init(remoteSecKey,
@ -54,16 +53,9 @@ suite "Identify":
await conn.close()
await transport1.close()
await serverFut
result = true
await transport2.close()
check:
waitFor(testHandle()) == true
test "handle failed identify":
proc testHandleError() {.async.} =
asyncTest "handle failed identify":
let ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var remotePeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [ma])
let identifyProto1 = newIdentify(remotePeerInfo)
@ -86,6 +78,7 @@ suite "Identify":
var localPeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [ma])
let identifyProto2 = newIdentify(localPeerInfo)
expect IdentityNoMatchError:
try:
let pi2 = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
discard await msDial.select(conn, IdentifyCodec)
@ -95,6 +88,3 @@ suite "Identify":
await conn.close()
await transport2.close()
await transport1.close()
expect IdentityNoMatchError:
waitFor(testHandleError())

View File

@ -58,8 +58,7 @@ proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} =
if size > 0.uint:
await s.readExactly(addr result[0], int(size))
proc testPubSubDaemonPublish(gossip: bool = false,
count: int = 1): Future[bool] {.async.} =
proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1) {.async.} =
var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic"
var msgData = pubsubData.toBytes()
@ -120,14 +119,12 @@ proc testPubSubDaemonPublish(gossip: bool = false,
await wait(publisher(), 5.minutes) # should be plenty of time
result = true
await nativeNode.stop()
await pubsub.stop()
await allFutures(awaiters)
await daemonNode.close()
proc testPubSubNodePublish(gossip: bool = false,
count: int = 1): Future[bool] {.async.} =
proc testPubSubNodePublish(gossip: bool = false, count: int = 1) {.async.} =
var pubsubData = "TEST MESSAGE"
var testTopic = "test-topic"
var msgData = pubsubData.toBytes()
@ -187,7 +184,7 @@ proc testPubSubNodePublish(gossip: bool = false,
await wait(publisher(), 5.minutes) # should be plenty of time
result = finished
check finished
await nativeNode.stop()
await pubsub.stop()
await allFutures(awaiters)
@ -199,14 +196,11 @@ suite "Interop":
# and libp2p, so not sure which one it is,
# need to investigate more
# teardown:
# for tracker in testTrackers():
# # echo tracker.dump()
# # check tracker.isLeaked() == false
# checkTrackers()
# TODO: this test is failing sometimes on windows
# For some reason we receive EOF before test 4 sometimes
test "native -> daemon multiple reads and writes":
proc runTests(): Future[bool] {.async.} =
asyncTest "native -> daemon multiple reads and writes":
var protos = @["/test-stream"]
let nativeNode = newStandardSwitch(
@ -242,13 +236,8 @@ suite "Interop":
await allFutures(awaiters)
await sleepAsync(1.seconds)
result = true
check:
waitFor(runTests()) == true
test "native -> daemon connection":
proc runTests(): Future[bool] {.async.} =
asyncTest "native -> daemon connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
# We are preparing expect string, which should be prefixed with varint
@ -283,18 +272,14 @@ suite "Interop":
daemonPeer.addresses),
protos[0])
await conn.writeLp(test & "\r\n")
result = expect == (await wait(testFuture, 10.secs))
check expect == (await wait(testFuture, 10.secs))
await conn.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
check:
waitFor(runTests()) == true
test "daemon -> native connection":
proc runTests(): Future[bool] {.async.} =
asyncTest "daemon -> native connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
@ -323,7 +308,7 @@ suite "Interop":
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
discard await stream.transp.writeLp(test)
result = test == (await wait(testFuture, 10.secs))
check test == (await wait(testFuture, 10.secs))
await stream.close()
await nativeNode.stop()
@ -331,11 +316,7 @@ suite "Interop":
await daemonNode.close()
await sleepAsync(1.seconds)
check:
waitFor(runTests()) == true
test "daemon -> multiple reads and writes":
proc runTests(): Future[bool] {.async.} =
asyncTest "daemon -> multiple reads and writes":
var protos = @["/test-stream"]
var testFuture = newFuture[void]("test.future")
@ -374,17 +355,12 @@ suite "Interop":
await wait(testFuture, 10.secs)
result = true
await stream.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
check:
waitFor(runTests()) == true
test "read write multiple":
proc runTests(): Future[bool] {.async.} =
asyncTest "read write multiple":
var protos = @["/test-stream"]
var test = "TEST STRING"
@ -424,43 +400,32 @@ suite "Interop":
check test == string.fromBytes(line)
inc(count2)
result = 10 == (await wait(testFuture, 1.minutes))
check 10 == (await wait(testFuture, 1.minutes))
await stream.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
check:
waitFor(runTests()) == true
asyncTest "floodsub: daemon publish one":
await testPubSubDaemonPublish()
test "floodsub: daemon publish one":
check:
waitFor(testPubSubDaemonPublish()) == true
asyncTest "floodsub: daemon publish many":
await testPubSubDaemonPublish(count = 10)
test "floodsub: daemon publish many":
check:
waitFor(testPubSubDaemonPublish(count = 10)) == true
asyncTest "gossipsub: daemon publish one":
await testPubSubDaemonPublish(gossip = true)
test "gossipsub: daemon publish one":
check:
waitFor(testPubSubDaemonPublish(gossip = true)) == true
asyncTest "gossipsub: daemon publish many":
await testPubSubDaemonPublish(gossip = true, count = 10)
test "gossipsub: daemon publish many":
check:
waitFor(testPubSubDaemonPublish(gossip = true, count = 10)) == true
asyncTest "floodsub: node publish one":
await testPubSubNodePublish()
test "floodsub: node publish one":
check:
waitFor(testPubSubNodePublish()) == true
asyncTest "floodsub: node publish many":
await testPubSubNodePublish(count = 10)
test "floodsub: node publish many":
check:
waitFor(testPubSubNodePublish(count = 10)) == true
asyncTest "gossipsub: node publish one":
await testPubSubNodePublish(gossip = true)
test "gossipsub: node publish one":
check:
waitFor(testPubSubNodePublish(gossip = true)) == true
test "gossipsub: node publish many":
check:
waitFor(testPubSubNodePublish(gossip = true, count = 10)) == true
asyncTest "gossipsub: node publish many":
await testPubSubNodePublish(gossip = true, count = 10)

View File

@ -20,8 +20,8 @@ suite "Mplex":
teardown:
checkTrackers()
test "encode header with channel id 0":
proc testEncodeHeader() {.async.} =
suite "channel encoding":
asyncTest "encode header with channel id 0":
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("000873747265616d2031")
@ -29,10 +29,7 @@ suite "Mplex":
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeader())
test "encode header with channel id other than 0":
proc testEncodeHeader() {.async.} =
asyncTest "encode header with channel id other than 0":
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("88010873747265616d2031")
@ -40,10 +37,7 @@ suite "Mplex":
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeader())
test "encode header and body with channel id 0":
proc testEncodeHeaderBody() {.async.} =
asyncTest "encode header and body with channel id 0":
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("020873747265616d2031")
@ -51,10 +45,7 @@ suite "Mplex":
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeaderBody())
test "encode header and body with channel id other than 0":
proc testEncodeHeaderBody() {.async.} =
asyncTest "encode header and body with channel id other than 0":
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("8a010873747265616d2031")
@ -62,10 +53,7 @@ suite "Mplex":
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeaderBody())
test "decode header with channel id 0":
proc testDecodeHeader() {.async.} =
asyncTest "decode header with channel id 0":
let stream = newBufferStream()
let conn = stream
await stream.pushData(fromHex("000873747265616d2031"))
@ -75,10 +63,7 @@ suite "Mplex":
check msg.msgType == MessageType.New
await conn.close()
waitFor(testDecodeHeader())
test "decode header and body with channel id 0":
proc testDecodeHeader() {.async.} =
asyncTest "decode header and body with channel id 0":
let stream = newBufferStream()
let conn = stream
await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
@ -89,10 +74,7 @@ suite "Mplex":
check string.fromBytes(msg.data) == "hello from channel 0!!"
await conn.close()
waitFor(testDecodeHeader())
test "decode header and body with channel id other than 0":
proc testDecodeHeader() {.async.} =
asyncTest "decode header and body with channel id other than 0":
let stream = newBufferStream()
let conn = stream
await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121"))
@ -103,28 +85,21 @@ suite "Mplex":
check string.fromBytes(msg.data) == "hello from channel 0!!"
await conn.close()
waitFor(testDecodeHeader())
test "half closed (local close) - should close for write":
proc testClosedForWrite(): Future[bool] {.async.} =
suite "channel half-closed":
asyncTest "(local close) - should close for write":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = LPChannel.init(1, conn, true)
await chann.close()
try:
expect LPStreamClosedError:
await chann.write("Hello")
except LPStreamClosedError:
result = true
finally:
await chann.reset()
await conn.close()
check:
waitFor(testClosedForWrite()) == true
test "half closed (local close) - should allow reads until remote closes":
proc testOpenForRead(): Future[bool] {.async.} =
asyncTest "(local close) - should allow reads until remote closes":
let
conn = newBufferStream(
proc (data: seq[byte]) {.gcsafe, async.} =
@ -142,21 +117,16 @@ suite "Mplex":
let closeFut = chann.pushEof()
# should still allow reading until buffer EOF
await chann.readExactly(addr data[3], 3)
try:
expect LPStreamEOFError:
# this should fail now
await chann.readExactly(addr data[0], 3)
except LPStreamEOFError:
result = true
finally:
await chann.close()
await conn.close()
await closeFut
check:
waitFor(testOpenForRead()) == true
test "half closed (remote close) - channel should close for reading by remote":
proc testClosedForRead(): Future[bool] {.async.} =
asyncTest "(remote close) - channel should close for reading by remote":
let
conn = newBufferStream(
proc (data: seq[byte]) {.gcsafe, async.} =
@ -171,19 +141,14 @@ suite "Mplex":
let closeFut = chann.pushEof() # closing channel
let readFut = chann.readExactly(addr data[3], 3)
await all(closeFut, readFut)
try:
expect LPStreamEOFError:
await chann.readExactly(addr data[0], 6) # this should fail now
except LPStreamEOFError:
result = true
finally:
await chann.close()
await conn.close()
check:
waitFor(testClosedForRead()) == true
test "half closed (remote close) - channel should allow writing on remote close":
proc testClosedForRead(): Future[bool] {.async.} =
asyncTest "(remote close) - channel should allow writing on remote close":
let
testData = "Hello!".toBytes
conn = newBufferStream(
@ -195,16 +160,11 @@ suite "Mplex":
await chann.pushEof() # closing channel
try:
await chann.writeLp(testData)
return true
finally:
await chann.reset() # there's nobody reading the EOF!
await conn.close()
check:
waitFor(testClosedForRead()) == true
test "should not allow pushing data to channel when remote end closed":
proc testResetWrite(): Future[bool] {.async.} =
asyncTest "should not allow pushing data to channel when remote end closed":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
@ -212,19 +172,16 @@ suite "Mplex":
await chann.pushEof()
var buf: array[1, byte]
check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read
try:
expect LPStreamEOFError:
await chann.pushData(@[byte(1)])
except LPStreamEOFError:
result = true
finally:
await chann.close()
await conn.close()
check:
waitFor(testResetWrite()) == true
suite "channel reset":
test "reset - channel should fail reading":
proc testResetRead(): Future[bool] {.async.} =
asyncTest "channel should fail reading":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
@ -232,18 +189,12 @@ suite "Mplex":
await chann.reset()
var data = newSeq[byte](1)
try:
expect LPStreamEOFError:
await chann.readExactly(addr data[0], 1)
except LPStreamEOFError:
result = true
finally:
await conn.close()
check:
waitFor(testResetRead()) == true
test "reset - should complete read":
proc testResetRead(): Future[bool] {.async.} =
asyncTest "should complete read":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
@ -251,19 +202,14 @@ suite "Mplex":
var data = newSeq[byte](1)
let fut = chann.readExactly(addr data[0], 1)
await chann.reset()
try:
expect LPStreamEOFError:
await fut
except LPStreamEOFError:
result = true
finally:
await conn.close()
check:
waitFor(testResetRead()) == true
test "reset - should complete pushData":
proc testResetRead(): Future[bool] {.async.} =
asyncTest "should complete pushData":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
@ -272,14 +218,10 @@ suite "Mplex":
await chann.pushData(@[0'u8])
let fut = chann.pushData(@[0'u8])
await chann.reset()
result = await fut.withTimeout(100.millis)
check await fut.withTimeout(100.millis)
await conn.close()
check:
waitFor(testResetRead()) == true
test "reset - should complete both read and push":
proc testResetRead(): Future[bool] {.async.} =
asyncTest "should complete both read and push":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
@ -290,46 +232,33 @@ suite "Mplex":
let wfut = chann.pushData(@[0'u8])
let wfut2 = chann.pushData(@[0'u8])
await chann.reset()
result = await allFutures(rfut, wfut, wfut2).withTimeout(100.millis)
check await allFutures(rfut, wfut, wfut2).withTimeout(100.millis)
await conn.close()
check:
waitFor(testResetRead()) == true
test "reset - channel should fail writing":
proc testResetWrite(): Future[bool] {.async.} =
asyncTest "channel should fail writing":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = LPChannel.init(1, conn, true)
await chann.reset()
try:
expect LPStreamClosedError:
await chann.write(("Hello!").toBytes)
except LPStreamClosedError:
result = true
finally:
await conn.close()
check:
waitFor(testResetWrite()) == true
test "reset - channel should reset on timeout":
proc testResetWrite(): Future[bool] {.async.} =
asyncTest "channel should reset on timeout":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = LPChannel.init(
1, conn, true, timeout = 100.millis)
check await chann.closeEvent.wait().withTimeout(1.minutes)
check await chann.join().withTimeout(1.minutes)
await conn.close()
result = true
check:
waitFor(testResetWrite())
test "e2e - read/write receiver":
proc testNewStream() {.async.} =
suite "mplex e2e":
asyncTest "read/write receiver":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var done = newFuture[void]()
@ -364,12 +293,10 @@ suite "Mplex":
await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - read/write receiver lazy":
proc testNewStream() {.async.} =
asyncTest "read/write receiver lazy":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var done = newFuture[void]()
@ -407,10 +334,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - write fragmented":
proc testNewStream() {.async.} =
asyncTest "write fragmented":
let
ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
listenJob = newFuture[void]()
@ -449,10 +373,7 @@ suite "Mplex":
let stream = await mplexDial.newStream()
await stream.writeLp(bigseq)
try:
await listenJob.wait(10.seconds)
except AsyncTimeoutError:
check false
await stream.close()
await conn.close()
@ -460,12 +381,10 @@ suite "Mplex":
await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - read/write initiator":
proc testNewStream() {.async.} =
asyncTest "read/write initiator":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let done = newFuture[void]()
@ -501,10 +420,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - multiple streams":
proc testNewStream() {.async.} =
asyncTest "multiple streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let done = newFuture[void]()
@ -545,10 +461,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - multiple read/write streams":
proc testNewStream() {.async.} =
asyncTest "multiple read/write streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let done = newFuture[void]()
@ -592,10 +505,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - channel closes listener with EOF":
proc testNewStream() {.async.} =
asyncTest "channel closes listener with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var listenStreams: seq[Connection]
@ -641,10 +551,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - channel closes dialer with EOF":
proc testNewStream() {.async.} =
asyncTest "channel closes dialer with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var listenStreams: seq[Connection]
@ -707,10 +614,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - dialing mplex closes both ends":
proc testNewStream() {.async.} =
asyncTest "dialing mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var listenStreams: seq[Connection]
@ -750,10 +654,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - listening mplex closes both ends":
proc testNewStream() {.async.} =
asyncTest "listening mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var mplexListen: Mplex
@ -794,10 +695,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - canceling mplex handler closes both ends":
proc testNewStream() {.async.} =
asyncTest "canceling mplex handler closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var mplexHandle: Future[void]
@ -839,10 +737,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - closing dialing connection should close both ends":
proc testNewStream() {.async.} =
asyncTest "closing dialing connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var listenStreams: seq[Connection]
@ -882,10 +777,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - canceling listening connection should close both ends":
proc testNewStream() {.async.} =
asyncTest "canceling listening connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var listenConn: Connection
@ -927,10 +819,8 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(testNewStream())
test "jitter - channel should be able to handle erratic read/writes":
proc test() {.async.} =
suite "jitter":
asyncTest "channel should be able to handle erratic read/writes":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var complete = newFuture[void]()
@ -1001,10 +891,7 @@ suite "Mplex":
transport2.close())
await listenFut
waitFor(test())
test "jitter - channel should handle 1 byte read/write":
proc test() {.async.} =
asyncTest "channel should handle 1 byte read/write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var complete = newFuture[void]()
@ -1059,5 +946,3 @@ suite "Mplex":
transport1.close(),
transport2.close())
await listenFut
waitFor(test())

View File

@ -170,18 +170,13 @@ suite "Multistream select":
teardown:
checkTrackers()
test "test select custom proto":
proc testSelect(): Future[bool] {.async.} =
asyncTest "test select custom proto":
let ms = newMultistream()
let conn = newTestSelectStream()
result = (await ms.select(conn, @["/test/proto/1.0.0"])) == "/test/proto/1.0.0"
check (await ms.select(conn, @["/test/proto/1.0.0"])) == "/test/proto/1.0.0"
await conn.close()
check:
waitFor(testSelect()) == true
test "test handle custom proto":
proc testHandle(): Future[bool] {.async.} =
asyncTest "test handle custom proto":
let ms = newMultistream()
let conn = newTestSelectStream()
@ -195,13 +190,8 @@ suite "Multistream select":
protocol.handler = testHandler
ms.addHandler("/test/proto/1.0.0", protocol)
await ms.handle(conn)
result = true
check:
waitFor(testHandle()) == true
test "test handle `ls`":
proc testLs(): Future[bool] {.async.} =
asyncTest "test handle `ls`":
let ms = newMultistream()
proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} # forward declaration
@ -220,15 +210,9 @@ suite "Multistream select":
ms.addHandler("/test/proto1/1.0.0", protocol)
ms.addHandler("/test/proto2/1.0.0", protocol)
await ms.handle(conn)
result = true
await done.wait(5.seconds)
check:
waitFor(testLs()) == true
test "test handle `na`":
proc testNa(): Future[bool] {.async.} =
asyncTest "test handle `na`":
let ms = newMultistream()
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.}
@ -247,13 +231,8 @@ suite "Multistream select":
ms.addHandler("/unabvailable/proto/1.0.0", protocol)
await ms.handle(conn)
result = true
check:
waitFor(testNa()) == true
test "e2e - handle":
proc endToEnd(): Future[bool] {.async.} =
asyncTest "e2e - handle":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let
@ -288,7 +267,7 @@ suite "Multistream select":
check (await msDial.select(conn, "/test/proto/1.0.0")) == true
let hello = string.fromBytes(await conn.readLp(1024))
result = hello == "Hello!"
check hello == "Hello!"
await conn.close()
await transport2.close()
@ -298,11 +277,7 @@ suite "Multistream select":
handlerWait1.wait(30.seconds),
handlerWait2.wait(30.seconds))
check:
waitFor(endToEnd()) == true
test "e2e - ls":
proc endToEnd(): Future[bool] {.async.} =
asyncTest "e2e - ls":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let
@ -326,17 +301,10 @@ suite "Multistream select":
let transport1: TcpTransport = TcpTransport.init()
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
try:
await msListen.handle(conn)
except LPStreamEOFError:
discard
except LPStreamClosedError:
discard
finally:
await conn.close()
let listenFut = transport1.listen(ma, connHandler)
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
@ -344,20 +312,15 @@ suite "Multistream select":
let ls = await msDial.list(conn)
let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
result = ls == protos
check ls == protos
await conn.close()
await transport2.close()
await transport1.close()
discard await listenFut.wait(5.seconds)
check:
waitFor(endToEnd()) == true
test "e2e - select one from a list with unsupported protos":
proc endToEnd(): Future[bool] {.async.} =
asyncTest "e2e - select one from a list with unsupported protos":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
@ -384,18 +347,13 @@ suite "Multistream select":
@["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0"
let hello = string.fromBytes(await conn.readLp(1024))
result = hello == "Hello!"
check hello == "Hello!"
await conn.close()
await transport2.close()
await transport1.close()
check:
waitFor(endToEnd()) == true
test "e2e - select one with both valid":
proc endToEnd(): Future[bool] {.async.} =
asyncTest "e2e - select one with both valid":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var protocol: LPProtocol = new LPProtocol
@ -421,12 +379,8 @@ suite "Multistream select":
let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn, @["/test/proto2/1.0.0", "/test/proto1/1.0.0"])) == "/test/proto2/1.0.0"
result = string.fromBytes(await conn.readLp(1024)) == "Hello from /test/proto2/1.0.0!"
check string.fromBytes(await conn.readLp(1024)) == "Hello from /test/proto2/1.0.0!"
await conn.close()
await transport2.close()
await transport1.close()
check:
waitFor(endToEnd()) == true

View File

@ -74,8 +74,7 @@ suite "Noise":
teardown:
checkTrackers()
test "e2e: handle write + noise":
proc testListenerDialer(): Future[bool] {.async.} =
asyncTest "e2e: handle write + noise":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server])
@ -108,13 +107,9 @@ suite "Noise":
await transport1.close()
await transport2.close()
result = string.fromBytes(msg) == "Hello!"
check string.fromBytes(msg) == "Hello!"
check:
waitFor(testListenerDialer()) == true
test "e2e: handle write + noise (wrong prologue)":
proc testListenerDialer(): Future[bool] {.async.} =
asyncTest "e2e: handle write + noise (wrong prologue)":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server])
@ -145,13 +140,7 @@ suite "Noise":
await transport1.close()
await transport2.close()
result = true
check:
waitFor(testListenerDialer()) == true
test "e2e: handle read + noise":
proc testListenerDialer(): Future[bool] {.async.} =
asyncTest "e2e: handle read + noise":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server])
@ -186,13 +175,7 @@ suite "Noise":
await transport1.close()
await transport2.close()
result = true
check:
waitFor(testListenerDialer()) == true
test "e2e: handle read + noise fragmented":
proc testListenerDialer(): Future[bool] {.async.} =
asyncTest "e2e: handle read + noise fragmented":
let
server = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server])
@ -231,13 +214,7 @@ suite "Noise":
await transport1.close()
await listenFut
result = true
check:
waitFor(testListenerDialer()) == true
test "e2e use switch dial proto string":
proc testSwitch(): Future[bool] {.async, gcsafe.} =
asyncTest "e2e use switch dial proto string":
let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
@ -264,13 +241,8 @@ suite "Noise":
switch1.stop(),
switch2.stop())
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(testSwitch()) == true
test "e2e test wrong secure negotiation":
proc testSwitch(): Future[bool] {.async, gcsafe.} =
asyncTest "e2e test wrong secure negotiation":
let ma1: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let ma2: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
@ -296,11 +268,6 @@ suite "Noise":
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(testSwitch()) == true
# test "interop with rust noise":
# when true: # disable cos in CI we got no interop server/client
# proc testListenerDialer(): Future[bool] {.async.} =

View File

@ -30,8 +30,7 @@ suite "Switch":
teardown:
checkTrackers()
test "e2e use switch dial proto string":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e use switch dial proto string":
let done = newFuture[void]()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
@ -75,10 +74,7 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)
waitFor(testSwitch())
test "e2e use switch dial proto string with custom matcher":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e use switch dial proto string with custom matcher":
let done = newFuture[void]()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
@ -127,10 +123,7 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)
waitFor(testSwitch())
test "e2e should not leak bufferstreams and connections on channel close":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should not leak bufferstreams and connections on channel close":
let done = newFuture[void]()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
@ -175,10 +168,7 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)
waitFor(testSwitch())
test "e2e use connect then dial":
proc testSwitch(): Future[bool] {.async, gcsafe.} =
asyncTest "e2e use connect then dial":
var awaiters: seq[Future[void]]
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
@ -206,13 +196,9 @@ suite "Switch":
check switch1.isConnected(switch2.peerInfo)
check switch2.isConnected(switch1.peerInfo)
try:
await conn.writeLp("Hello!")
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
result = true
except LPStreamError:
result = false
await allFuturesThrowing(
conn.close(),
@ -224,11 +210,8 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)
check:
waitFor(testSwitch()) == true
test "e2e should not leak on peer disconnect":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should not leak on peer disconnect":
var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
@ -255,10 +238,7 @@ suite "Switch":
switch2.stop())
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "e2e should trigger connection events (remote)":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should trigger connection events (remote)":
var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
@ -314,10 +294,7 @@ suite "Switch":
switch2.stop())
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "e2e should trigger connection events (local)":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should trigger connection events (local)":
var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
@ -373,10 +350,7 @@ suite "Switch":
switch2.stop())
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "e2e should trigger peer events (remote)":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should trigger peer events (remote)":
var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
@ -431,10 +405,7 @@ suite "Switch":
switch2.stop())
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "e2e should trigger peer events (local)":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should trigger peer events (local)":
var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
@ -489,10 +460,7 @@ suite "Switch":
switch2.stop())
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "e2e should trigger peer events only once per peer":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should trigger peer events only once per peer":
var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
@ -563,10 +531,7 @@ suite "Switch":
switch3.stop())
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "e2e should allow dropping peer from connection events":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should allow dropping peer from connection events":
var awaiters: seq[Future[void]]
let rng = newRng()
@ -609,10 +574,7 @@ suite "Switch":
switches.mapIt( it.stop() ))
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "e2e should allow dropping multiple connections for peer from connection events":
proc testSwitch() {.async, gcsafe.} =
asyncTest "e2e should allow dropping multiple connections for peer from connection events":
var awaiters: seq[Future[void]]
let rng = newRng()
@ -662,10 +624,7 @@ suite "Switch":
switches.mapIt( it.stop() ))
await allFuturesThrowing(awaiters)
waitFor(testSwitch())
test "connect to inexistent peer":
proc testSwitch() {.async, gcsafe.} =
asyncTest "connect to inexistent peer":
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let sfut = await switch2.start()
let someAddr = MultiAddress.init("/ip4/127.128.0.99").get()
@ -674,5 +633,3 @@ suite "Switch":
expect(DialFailedError):
let conn = await switch2.dial(somePeer, TestCodec)
await switch2.stop()
waitFor(testSwitch())

View File

@ -13,8 +13,7 @@ suite "TCP transport":
teardown:
checkTrackers()
test "test listener: handle write":
proc testListener(): Future[bool] {.async, gcsafe.} =
asyncTest "test listener: handle write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let handlerWait = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
@ -34,13 +33,9 @@ suite "TCP transport":
await streamTransport.closeWait()
await transport.close()
result = string.fromBytes(msg) == "Hello!"
check string.fromBytes(msg) == "Hello!"
check:
waitFor(testListener()) == true
test "test listener: handle read":
proc testListener(): Future[bool] {.async.} =
asyncTest "test listener: handle read":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let handlerWait = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
@ -59,13 +54,10 @@ suite "TCP transport":
await streamTransport.closeWait()
await transport.close()
result = sent == 6
check sent == 6
check:
waitFor(testListener()) == true
test "test dialer: handle write":
proc testDialer(address: TransportAddress): Future[bool] {.async.} =
asyncTest "test dialer: handle write":
let address = initTAddress("0.0.0.0:0")
let handlerWait = newFuture[void]()
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async, gcsafe.} =
@ -86,7 +78,7 @@ suite "TCP transport":
let conn = await transport.dial(ma)
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
result = string.fromBytes(msg) == "Hello!"
check string.fromBytes(msg) == "Hello!"
await handlerWait.wait(5000.millis) # when no issues will not wait that long!
@ -97,11 +89,8 @@ suite "TCP transport":
server.close()
await server.join()
check:
waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
test "test dialer: handle write":
proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} =
asyncTest "test dialer: handle write":
let address = initTAddress("0.0.0.0:0")
let handlerWait = newFuture[void]()
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async, gcsafe.} =
@ -122,21 +111,16 @@ suite "TCP transport":
let transport: TcpTransport = TcpTransport.init()
let conn = await transport.dial(ma)
await conn.write(cstring("Hello!"), 6)
result = true
await handlerWait.wait(5000.millis) # when no issues will not wait that long!
await conn.close()
await transport.close()
server.stop()
server.close()
await server.join()
check:
waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
test "e2e: handle write":
proc testListenerDialer(): Future[bool] {.async.} =
asyncTest "e2e: handle write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let handlerWait = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
@ -158,13 +142,9 @@ suite "TCP transport":
await transport2.close()
await transport1.close()
result = string.fromBytes(msg) == "Hello!"
check string.fromBytes(msg) == "Hello!"
check:
waitFor(testListenerDialer()) == true
test "e2e: handle read":
proc testListenerDialer(): Future[bool] {.async.} =
asyncTest "e2e: handle read":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let handlerWait = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
@ -186,7 +166,3 @@ suite "TCP transport":
await conn.close()
await transport2.close()
await transport1.close()
result = true
check:
waitFor(testListenerDialer()) == true