use all() for futures and track connections

This commit is contained in:
Dmitriy Ryajov 2020-05-23 11:14:22 -06:00
parent 640c3bdc45
commit 20c68a2018
11 changed files with 117 additions and 126 deletions

View File

@ -2,12 +2,14 @@ import chronos
import ../libp2p/transports/tcptransport import ../libp2p/transports/tcptransport
import ../libp2p/stream/bufferstream import ../libp2p/stream/bufferstream
import ../libp2p/connection
const const
StreamTransportTrackerName = "stream.transport" StreamTransportTrackerName = "stream.transport"
StreamServerTrackerName = "stream.server" StreamServerTrackerName = "stream.server"
trackerNames = [ trackerNames = [
ConnectionTrackerName,
BufferStreamTrackerName, BufferStreamTrackerName,
TcpTransportTrackerName, TcpTransportTrackerName,
StreamTransportTrackerName, StreamTransportTrackerName,

View File

@ -50,7 +50,7 @@ suite "FloodSub":
let let
nodes = generateNodes(2) nodes = generateNodes(2)
nodesFut = await allFinished( nodesFut = await all(
nodes[0].start(), nodes[0].start(),
nodes[1].start() nodes[1].start()
) )
@ -64,14 +64,13 @@ suite "FloodSub":
result = await completionFut.wait(5.seconds) result = await completionFut.wait(5.seconds)
await allFuturesThrowing( await all(
nodes[0].stop(), nodes[0].stop(),
nodes[1].stop() nodes[1].stop()
) )
for fut in nodesFut: await all(nodesFut.concat())
let res = fut.read()
await allFuturesThrowing(res)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -96,8 +95,8 @@ suite "FloodSub":
result = await completionFut.wait(5.seconds) result = await completionFut.wait(5.seconds)
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await all(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(awaiters) await all(awaiters)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -129,9 +128,9 @@ suite "FloodSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
await allFuturesThrowing(handlerFut, handlerFut) check (await handlerFut) == true
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await all(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(awaiters) await all(awaiters)
result = true result = true
check: check:
@ -161,8 +160,8 @@ suite "FloodSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await all(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(awaiters) await all(awaiters)
result = true result = true
check: check:
@ -198,8 +197,8 @@ suite "FloodSub":
await nodes[0].publish("foo", cast[seq[byte]]("Hello!")) await nodes[0].publish("foo", cast[seq[byte]]("Hello!"))
await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!"))
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await all(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(awaiters) await all(awaiters)
result = true result = true
check: check:
@ -220,7 +219,7 @@ suite "FloodSub":
(proc(topic: string, data: seq[byte]) {.async, gcsafe.} = (proc(topic: string, data: seq[byte]) {.async, gcsafe.} =
check topic == "foobar" check topic == "foobar"
inc counter[] inc counter[]
if counter[] == 9: if counter[] == runs - 1:
fut.complete()), fut.complete()),
counter counter
) )
@ -229,7 +228,6 @@ suite "FloodSub":
for i in 0..<runs: for i in 0..<runs:
nodes.add newStandardSwitch() nodes.add newStandardSwitch()
var awaitters: seq[Future[void]] var awaitters: seq[Future[void]]
for i in 0..<runs: for i in 0..<runs:
awaitters.add(await nodes[i].start()) awaitters.add(await nodes[i].start())
@ -244,16 +242,16 @@ suite "FloodSub":
for y in 0..<runs: for y in 0..<runs:
if y != i: if y != i:
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await allFuturesThrowing(subs) await all(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[void]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!"))
await allFuturesThrowing(pubs) await all(pubs)
await allFuturesThrowing(futs.mapIt(it[0])) await all(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop())) await all(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await all(awaitters)
result = true result = true
check: check:
@ -298,16 +296,16 @@ suite "FloodSub":
for y in 0..<runs: for y in 0..<runs:
if y != i: if y != i:
subs &= waitSub(nodes[i], nodes[y], "foobar") subs &= waitSub(nodes[i], nodes[y], "foobar")
await allFuturesThrowing(subs) await all(subs)
var pubs: seq[Future[void]] var pubs: seq[Future[void]]
for i in 0..<runs: for i in 0..<runs:
pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!")) pubs &= nodes[i].publish("foobar", cast[seq[byte]]("Hello!"))
await allFuturesThrowing(pubs) await all(pubs)
await allFuturesThrowing(futs.mapIt(it[0])) await all(futs.mapIt(it[0]))
await allFuturesThrowing(nodes.mapIt(it.stop())) await all(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await all(awaitters)
result = true result = true

View File

@ -42,7 +42,7 @@ suite "GossipSub internal":
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == GossipSubD check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -71,7 +71,7 @@ suite "GossipSub internal":
await gossipSub.rebalanceMesh(topic) await gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == GossipSubD check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -103,7 +103,7 @@ suite "GossipSub internal":
await gossipSub.replenishFanout(topic) await gossipSub.replenishFanout(topic)
check gossipSub.fanout[topic].len == GossipSubD check gossipSub.fanout[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -138,7 +138,7 @@ suite "GossipSub internal":
await gossipSub.dropFanoutPeers() await gossipSub.dropFanoutPeers()
check topic notin gossipSub.fanout check topic notin gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -179,7 +179,7 @@ suite "GossipSub internal":
check topic1 notin gossipSub.fanout check topic1 notin gossipSub.fanout
check topic2 in gossipSub.fanout check topic2 in gossipSub.fanout
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -242,7 +242,7 @@ suite "GossipSub internal":
check p notin gossipSub.fanout[topic] check p notin gossipSub.fanout[topic]
check p notin gossipSub.mesh[topic] check p notin gossipSub.mesh[topic]
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -285,7 +285,7 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -328,7 +328,7 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == GossipSubD check peers.len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true
@ -371,7 +371,7 @@ suite "GossipSub internal":
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
check peers.len == 0 check peers.len == 0
await allFuturesThrowing(conns.mapIt(it.close())) await all(conns.mapIt(it.close()))
result = true result = true

View File

@ -79,8 +79,8 @@ suite "GossipSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
result = (await validatorFut) and (await handlerFut) result = (await validatorFut) and (await handlerFut)
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await all(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(awaiters) await all(awaiters)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -111,9 +111,8 @@ suite "GossipSub":
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
result = await validatorFut result = await validatorFut
await all(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await all(awaiters)
await allFuturesThrowing(awaiters)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -152,8 +151,8 @@ suite "GossipSub":
await nodes[0].publish("bar", cast[seq[byte]]("Hello!")) await nodes[0].publish("bar", cast[seq[byte]]("Hello!"))
result = ((await passed) and (await failed) and (await handlerFut)) result = ((await passed) and (await failed) and (await handlerFut))
await allFuturesThrowing(nodes[0].stop(), nodes[1].stop()) await all(nodes[0].stop(), nodes[1].stop())
await allFuturesThrowing(awaiters) await all(awaiters)
result = true result = true
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -183,8 +182,8 @@ suite "GossipSub":
"foobar" in gossip1.gossipsub "foobar" in gossip1.gossipsub
gossip2.peerInfo.id in gossip1.gossipsub["foobar"] gossip2.peerInfo.id in gossip1.gossipsub["foobar"]
await allFuturesThrowing(nodes.mapIt(it.stop())) await all(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await all(awaitters)
result = true result = true
@ -212,7 +211,7 @@ suite "GossipSub":
var subs: seq[Future[void]] var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar") subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar") subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs) await all(subs)
let let
gossip1 = GossipSub(nodes[0].pubSub.get()) gossip1 = GossipSub(nodes[0].pubSub.get())
@ -231,8 +230,8 @@ suite "GossipSub":
gossip1.peerInfo.id in gossip2.gossipsub["foobar"] or gossip1.peerInfo.id in gossip2.gossipsub["foobar"] or
gossip1.peerInfo.id in gossip2.mesh["foobar"] gossip1.peerInfo.id in gossip2.mesh["foobar"]
await allFuturesThrowing(nodes.mapIt(it.stop())) await all(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await all(awaitters)
result = true result = true
@ -280,7 +279,7 @@ suite "GossipSub":
await nodes[0].stop() await nodes[0].stop()
await nodes[1].stop() await nodes[1].stop()
await allFuturesThrowing(wait) await all(wait)
result = observed == 2 result = observed == 2
@ -310,7 +309,7 @@ suite "GossipSub":
await nodes[0].stop() await nodes[0].stop()
await nodes[1].stop() await nodes[1].stop()
await allFuturesThrowing(wait) await all(wait)
check: check:
waitFor(runTests()) == true waitFor(runTests()) == true
@ -345,7 +344,8 @@ suite "GossipSub":
subs.add(allFutures(dialer.subscribe("foobar", handler), subs.add(allFutures(dialer.subscribe("foobar", handler),
waitSub(nodes[0], dialer, "foobar"))) waitSub(nodes[0], dialer, "foobar")))
await allFuturesThrowing(subs) await all(subs)
await wait(nodes[0].publish("foobar", await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)), nodes[1].peerInfo.id)),
@ -356,8 +356,8 @@ suite "GossipSub":
for k, v in seen.pairs: for k, v in seen.pairs:
check: v == 1 check: v == 1
await allFuturesThrowing(nodes.mapIt(it.stop())) await all(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters) await all(awaitters)
result = true result = true
check: check:

View File

@ -250,7 +250,7 @@ suite "BufferStream":
await buf1.pushTo(cast[seq[byte]]("Hello2!")) await buf1.pushTo(cast[seq[byte]]("Hello2!"))
await buf2.pushTo(cast[seq[byte]]("Hello1!")) await buf2.pushTo(cast[seq[byte]]("Hello1!"))
await allFuturesThrowing(readFut1, readFut2) await all(readFut1, readFut2)
check: check:
res1 == cast[seq[byte]]("Hello2!") res1 == cast[seq[byte]]("Hello2!")
@ -300,7 +300,7 @@ suite "BufferStream":
await buf1.write(cast[seq[byte]]("Hello1!")) await buf1.write(cast[seq[byte]]("Hello1!"))
await buf2.write(cast[seq[byte]]("Hello2!")) await buf2.write(cast[seq[byte]]("Hello2!"))
await allFuturesThrowing(readFut1, readFut2) await all(readFut1, readFut2)
check: check:
res1 == cast[seq[byte]]("Hello2!") res1 == cast[seq[byte]]("Hello2!")
@ -376,7 +376,7 @@ suite "BufferStream":
await buf1.write(cast[seq[byte]]("Hello1!")) await buf1.write(cast[seq[byte]]("Hello1!"))
await buf2.write(cast[seq[byte]]("Hello2!")) await buf2.write(cast[seq[byte]]("Hello2!"))
await allFuturesThrowing(readFut1, readFut2) await all(readFut1, readFut2)
check: check:
res1 == cast[seq[byte]]("Hello2!") res1 == cast[seq[byte]]("Hello2!")
@ -437,7 +437,7 @@ suite "BufferStream":
var writerFut = writer() var writerFut = writer()
var readerFut = reader() var readerFut = reader()
await allFuturesThrowing(readerFut, writerFut) await all(readerFut, writerFut)
result = true result = true
await buf1.close() await buf1.close()

View File

@ -1,6 +1,7 @@
import options, tables import options, tables
import unittest import unittest
import chronos, chronicles import chronos, chronicles, stew/byteutils
import helpers
import ../libp2p/[daemon/daemonapi, import ../libp2p/[daemon/daemonapi,
protobuf/minprotobuf, protobuf/minprotobuf,
vbuffer, vbuffer,
@ -9,6 +10,7 @@ import ../libp2p/[daemon/daemonapi,
cid, cid,
varint, varint,
multihash, multihash,
standard_setup,
peer, peer,
peerinfo, peerinfo,
switch, switch,
@ -59,35 +61,6 @@ proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} =
if size > 0.uint: if size > 0.uint:
await s.readExactly(addr result[0], int(size)) await s.readExactly(addr result[0], int(size))
proc createNode*(privKey: Option[PrivateKey] = none(PrivateKey),
address: string = "/ip4/127.0.0.1/tcp/0",
triggerSelf: bool = false,
gossip: bool = false): Switch =
var seckey = privKey
if privKey.isNone:
seckey = some(PrivateKey.random(RSA).get())
var peerInfo = NativePeerInfo.init(seckey.get(), [Multiaddress.init(address).tryGet()])
proc createMplex(conn: Connection): Muxer = newMplex(conn)
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(TcpTransport.init())]
let muxers = [(MplexCodec, mplexProvider)].toTable()
let identify = newIdentify(peerInfo)
let secureManagers = [Secure(newSecio(seckey.get()))]
var pubSub: Option[PubSub]
if gossip:
pubSub = some(PubSub(newPubSub(GossipSub, peerInfo, triggerSelf)))
else:
pubSub = some(PubSub(newPubSub(FloodSub, peerInfo, triggerSelf)))
result = newSwitch(peerInfo,
transports,
identify,
muxers,
secureManagers = secureManagers,
pubSub = pubSub)
proc testPubSubDaemonPublish(gossip: bool = false, proc testPubSubDaemonPublish(gossip: bool = false,
count: int = 1): Future[bool] {.async.} = count: int = 1): Future[bool] {.async.} =
var pubsubData = "TEST MESSAGE" var pubsubData = "TEST MESSAGE"
@ -100,7 +73,7 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags) let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
let nativeNode = createNode(gossip = gossip) let nativeNode = newStandardSwitch(gossip = gossip)
let awaiters = nativeNode.start() let awaiters = nativeNode.start()
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
@ -110,6 +83,7 @@ proc testPubSubDaemonPublish(gossip: bool = false,
let smsg = cast[string](data) let smsg = cast[string](data)
check smsg == pubsubData check smsg == pubsubData
times.inc() times.inc()
echo "TIMES ", times
if times >= count and not finished: if times >= count and not finished:
finished = true finished = true
@ -125,15 +99,16 @@ proc testPubSubDaemonPublish(gossip: bool = false,
asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
await nativeNode.subscribe(testTopic, nativeHandler) await nativeNode.subscribe(testTopic, nativeHandler)
await sleepAsync(1.seconds) await sleepAsync(5.seconds)
proc publisher() {.async.} = proc publisher() {.async.} =
while not finished: while not finished:
await daemonNode.pubsubPublish(testTopic, msgData) await daemonNode.pubsubPublish(testTopic, msgData)
await sleepAsync(100.millis) await sleepAsync(500.millis)
await wait(publisher(), 5.minutes) # should be plenty of time await wait(publisher(), 5.minutes) # should be plenty of time
echo "HEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE"
result = true result = true
await nativeNode.stop() await nativeNode.stop()
await allFutures(awaiters) await allFutures(awaiters)
@ -151,7 +126,7 @@ proc testPubSubNodePublish(gossip: bool = false,
let daemonNode = await newDaemonApi(flags) let daemonNode = await newDaemonApi(flags)
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
let nativeNode = createNode(gossip = gossip) let nativeNode = newStandardSwitch(gossip = gossip)
let awaiters = nativeNode.start() let awaiters = nativeNode.start()
let nativePeer = nativeNode.peerInfo let nativePeer = nativeNode.peerInfo
@ -169,6 +144,7 @@ proc testPubSubNodePublish(gossip: bool = false,
let smsg = cast[string](message.data) let smsg = cast[string](message.data)
check smsg == pubsubData check smsg == pubsubData
times.inc() times.inc()
echo "TIMES ", times
if times >= count and not finished: if times >= count and not finished:
finished = true finished = true
result = true # don't cancel subscription result = true # don't cancel subscription
@ -176,12 +152,12 @@ proc testPubSubNodePublish(gossip: bool = false,
discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler)
proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard
await nativeNode.subscribe(testTopic, nativeHandler) await nativeNode.subscribe(testTopic, nativeHandler)
await sleepAsync(1.seconds) await sleepAsync(5.seconds)
proc publisher() {.async.} = proc publisher() {.async.} =
while not finished: while not finished:
await nativeNode.publish(testTopic, msgData) await nativeNode.publish(testTopic, msgData)
await sleepAsync(100.millis) await sleepAsync(500.millis)
await wait(publisher(), 5.minutes) # should be plenty of time await wait(publisher(), 5.minutes) # should be plenty of time
@ -191,11 +167,16 @@ proc testPubSubNodePublish(gossip: bool = false,
await daemonNode.close() await daemonNode.close()
suite "Interop": suite "Interop":
teardown:
for tracker in testTrackers():
echo tracker.dump()
# check tracker.isLeaked() == false
test "native -> daemon multiple reads and writes": test "native -> daemon multiple reads and writes":
proc runTests(): Future[bool] {.async.} = proc runTests(): Future[bool] {.async.} =
var protos = @["/test-stream"] var protos = @["/test-stream"]
let nativeNode = createNode() let nativeNode = newStandardSwitch()
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi() let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
@ -222,9 +203,13 @@ suite "Interop":
check "test 4" == cast[string]((await conn.readLp(1024))) check "test 4" == cast[string]((await conn.readLp(1024)))
await wait(testFuture, 10.secs) await wait(testFuture, 10.secs)
await conn.close()
await daemonNode.close()
await nativeNode.stop() await nativeNode.stop()
await allFutures(awaiters) await allFutures(awaiters)
await daemonNode.close()
await sleepAsync(1.seconds)
result = true result = true
check: check:
@ -243,7 +228,7 @@ suite "Interop":
var expect = newString(len(buffer) - 2) var expect = newString(len(buffer) - 2)
copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
let nativeNode = createNode() let nativeNode = newStandardSwitch()
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi() let daemonNode = await newDaemonApi()
@ -263,6 +248,8 @@ suite "Interop":
protos[0]) protos[0])
await conn.writeLp(test & "\r\n") await conn.writeLp(test & "\r\n")
result = expect == (await wait(testFuture, 10.secs)) result = expect == (await wait(testFuture, 10.secs))
await conn.close()
await nativeNode.stop() await nativeNode.stop()
await allFutures(awaiters) await allFutures(awaiters)
await daemonNode.close() await daemonNode.close()
@ -287,7 +274,7 @@ suite "Interop":
proto.handler = nativeHandler proto.handler = nativeHandler
proto.codec = protos[0] # codec proto.codec = protos[0] # codec
let nativeNode = createNode() let nativeNode = newStandardSwitch()
nativeNode.mount(proto) nativeNode.mount(proto)
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
@ -326,7 +313,7 @@ suite "Interop":
proto.handler = nativeHandler proto.handler = nativeHandler
proto.codec = protos[0] # codec proto.codec = protos[0] # codec
let nativeNode = createNode() let nativeNode = newStandardSwitch()
nativeNode.mount(proto) nativeNode.mount(proto)
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()
@ -365,6 +352,7 @@ suite "Interop":
check line == test check line == test
await conn.writeLp(cast[seq[byte]](test)) await conn.writeLp(cast[seq[byte]](test))
count.inc() count.inc()
echo "COUNT ", count
testFuture.complete(count) testFuture.complete(count)
await conn.close() await conn.close()
@ -374,7 +362,7 @@ suite "Interop":
proto.handler = nativeHandler proto.handler = nativeHandler
proto.codec = protos[0] # codec proto.codec = protos[0] # codec
let nativeNode = createNode() let nativeNode = newStandardSwitch()
nativeNode.mount(proto) nativeNode.mount(proto)
let awaiters = await nativeNode.start() let awaiters = await nativeNode.start()

View File

@ -186,7 +186,7 @@ suite "Mplex":
var data = newSeq[byte](1) var data = newSeq[byte](1)
try: try:
await chann.readExactly(addr data[0], 1) await chann.readExactly(addr data[0], 1)
doAssert(len(data) == 1) check data.len == 1
except LPStreamEOFError: except LPStreamEOFError:
result = true result = true
finally: finally:
@ -245,7 +245,7 @@ suite "Mplex":
await done.wait(1.seconds) await done.wait(1.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -284,7 +284,7 @@ suite "Mplex":
await done.wait(1.seconds) await done.wait(1.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -331,7 +331,7 @@ suite "Mplex":
await stream.close() await stream.close()
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -368,7 +368,7 @@ suite "Mplex":
await done.wait(1.seconds) await done.wait(1.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -410,7 +410,7 @@ suite "Mplex":
await done.wait(10.seconds) await done.wait(10.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -454,7 +454,7 @@ suite "Mplex":
await done.wait(5.seconds) await done.wait(5.seconds)
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(testNewStream()) waitFor(testNewStream())
@ -522,7 +522,7 @@ suite "Mplex":
await complete.wait(1.seconds) await complete.wait(1.seconds)
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(test()) waitFor(test())
@ -579,7 +579,7 @@ suite "Mplex":
await stream.close() await stream.close()
await conn.close() await conn.close()
await mplexDialFut await mplexDialFut
await allFuturesThrowing(transport1.close(), transport2.close()) await all(transport1.close(), transport2.close())
await listenFut await listenFut
waitFor(test()) waitFor(test())

View File

@ -151,6 +151,7 @@ proc newTestNaStream(na: NaHandler): TestNaStream =
suite "Multistream select": suite "Multistream select":
teardown: teardown:
for tracker in testTrackers(): for tracker in testTrackers():
# echo tracker.dump()
check tracker.isLeaked() == false check tracker.isLeaked() == false
test "test select custom proto": test "test select custom proto":
@ -276,7 +277,7 @@ suite "Multistream select":
await transport2.close() await transport2.close()
await transport1.close() await transport1.close()
await allFuturesThrowing(handlerWait1.wait(5000.millis) #[if OK won't happen!!]#, handlerWait2.wait(5000.millis) #[if OK won't happen!!]#) await all(handlerWait1.wait(5000.millis) #[if OK won't happen!!]#, handlerWait2.wait(5000.millis) #[if OK won't happen!!]#)
check: check:
waitFor(endToEnd()) == true waitFor(endToEnd()) == true
@ -306,10 +307,16 @@ suite "Multistream select":
let transport1: TcpTransport = TcpTransport.init() let transport1: TcpTransport = TcpTransport.init()
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
try:
await msListen.handle(conn) await msListen.handle(conn)
handlerWait.complete() except LPStreamEOFError:
discard
except LPStreamClosedError:
discard
finally:
await conn.close()
asyncCheck transport1.listen(ma, connHandler) let listenFut = transport1.listen(ma, connHandler)
let msDial = newMultistream() let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init() let transport2: TcpTransport = TcpTransport.init()
@ -323,8 +330,7 @@ suite "Multistream select":
await conn.close() await conn.close()
await transport2.close() await transport2.close()
await transport1.close() await transport1.close()
discard await listenFut.wait(5.seconds)
await handlerWait.wait(5000.millis) # when no issues will not wait that long!
check: check:
waitFor(endToEnd()) == true waitFor(endToEnd()) == true

View File

@ -175,7 +175,7 @@ suite "Noise":
let let
transport1: TcpTransport = TcpTransport.init() transport1: TcpTransport = TcpTransport.init()
asyncCheck await transport1.listen(server, connHandler) listenFut = await transport1.listen(server, connHandler)
let let
transport2: TcpTransport = TcpTransport.init() transport2: TcpTransport = TcpTransport.init()
@ -191,6 +191,7 @@ suite "Noise":
await conn.close() await conn.close()
await transport2.close() await transport2.close()
await transport1.close() await transport1.close()
await listenFut
result = true result = true
@ -219,9 +220,10 @@ suite "Noise":
await conn.writeLp("Hello!") await conn.writeLp("Hello!")
let msg = cast[string](await conn.readLp(1024)) let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg check "Hello!" == msg
await conn.close()
await allFuturesThrowing(switch1.stop(), switch2.stop()) await all(switch1.stop(), switch2.stop())
await allFuturesThrowing(awaiters) await all(awaiters)
result = true result = true
check: check:

View File

@ -5,13 +5,8 @@ import chronos
import ../libp2p/crypto/crypto, import ../libp2p/crypto/crypto,
../libp2p/peerinfo, ../libp2p/peerinfo,
../libp2p/peer ../libp2p/peer
import ./helpers
suite "PeerInfo": suite "PeerInfo":
teardown:
for tracker in testTrackers():
check tracker.isLeaked() == false
test "Should init with private key": test "Should init with private key":
let seckey = PrivateKey.random(ECDSA).get() let seckey = PrivateKey.random(ECDSA).get()
var peerInfo = PeerInfo.init(seckey) var peerInfo = PeerInfo.init(seckey)
@ -43,7 +38,7 @@ suite "PeerInfo":
check: check:
PeerID.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") == peerInfo.peerId PeerID.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") == peerInfo.peerId
# TODO: CIDv1 is handling is missing from PeerID # TODO: CIDv1 handling is missing from PeerID
# https://github.com/status-im/nim-libp2p/issues/53 # https://github.com/status-im/nim-libp2p/issues/53
# test "Should init from CIDv1 string": # test "Should init from CIDv1 string":
# var peerInfo = PeerInfo.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe") # var peerInfo = PeerInfo.init("bafzbeie5745rpv2m6tjyuugywy4d5ewrqgqqhfnf445he3omzpjbx5xqxe")

View File

@ -90,7 +90,7 @@ suite "Switch":
let msg = cast[string](await conn.readLp(1024)) let msg = cast[string](await conn.readLp(1024))
check "Hello!" == msg check "Hello!" == msg
await allFuturesThrowing( await all(
done.wait(5.seconds) #[if OK won't happen!!]#, done.wait(5.seconds) #[if OK won't happen!!]#,
conn.close(), conn.close(),
switch1.stop(), switch1.stop(),
@ -98,7 +98,7 @@ suite "Switch":
) )
# this needs to go at end # this needs to go at end
await allFuturesThrowing(awaiters) await all(awaiters)
waitFor(testSwitch()) waitFor(testSwitch())
@ -138,12 +138,12 @@ suite "Switch":
except LPStreamError: except LPStreamError:
result = false result = false
await allFuturesThrowing( await all(
conn.close(), conn.close(),
switch1.stop(), switch1.stop(),
switch2.stop() switch2.stop()
) )
await allFuturesThrowing(awaiters) await all(awaiters)
check: check:
waitFor(testSwitch()) == true waitFor(testSwitch()) == true