From 1819502fb550c6b55c5910d273335ca30a49523c Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 18 May 2020 07:49:49 -0600 Subject: [PATCH] Cleanup - tests and logging (#178) * make async for proper exception handling * tryAndWarn msg messes up Exception msg * misc: comment out tracker dumps * cleanup mplex tests * more informative errors * give CI time to run * revert change, bacause it causes races --- libp2p/errors.nim | 19 +- libp2p/muxers/mplex/coder.nim | 4 + tests/pubsub/testfloodsub.nim | 1 + tests/testinterop.nim | 4 +- tests/testmplex.nim | 547 +++++++++++++++------------------- tests/testswitch.nim | 1 + 6 files changed, 254 insertions(+), 322 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index 63873bd19..06edd31e3 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -19,7 +19,7 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = if res.failed: let exc = res.readError() # We still don't abort but warn - warn "Something went wrong in a future", error=exc.name + warn "A feature has failed, enable trace logging for details", error=exc.name trace "Exception message", msg=exc.msg else: quote do: @@ -29,11 +29,11 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = let exc = res.readError() for i in 0..<`nexclude`: if exc of `exclude`[i]: - trace "Ignoring an error (no warning)", error=exc.name, msg=exc.msg + trace "A feature has failed", error=exc.name, msg=exc.msg break check # We still don't abort but warn - warn "Something went wrong in a future", error=exc.name - trace "Exception message", msg=exc.msg + warn "A feature has failed, enable trace logging for details", error=exc.name + trace "Exception details", msg=exc.msg proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = var futs: seq[Future[T]] @@ -55,10 +55,11 @@ proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = return call() -template tryAndWarn*(msg: static[string]; body: untyped): untyped = +template tryAndWarn*(message: static[string]; body: untyped): untyped = try: body - except CancelledError as ex: - raise ex - except CatchableError as ex: - warn "Ignored an error", name=ex.name, msg=msg + except CancelledError as exc: + raise exc # TODO: why catch and re-raise? + except CatchableError as exc: + warn "An exception has ocurred, enable trace logging for details", name = exc.name + trace "Exception details", exc = exc.msg diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index b6152fa12..95a70fe48 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -83,4 +83,8 @@ proc writeMsg*(conn: Connection, id: uint64, msgType: MessageType, data: string) {.async, gcsafe.} = + # TODO: changing this to + #`await conn.writeMsg(id, msgType, cast[seq[byte]](data))` + # causes all sorts of race conditions and hangs. + # DON'T DO IT! result = conn.writeMsg(id, msgType, cast[seq[byte]](data)) diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index ef9456b01..48a337d2d 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -38,6 +38,7 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = suite "FloodSub": teardown: for tracker in testTrackers(): + # echo tracker.dump() check tracker.isLeaked() == false test "FloodSub basic publish/subscribe A -> B": diff --git a/tests/testinterop.nim b/tests/testinterop.nim index dd1aa9f84..137bcea32 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -133,7 +133,7 @@ proc testPubSubDaemonPublish(gossip: bool = false, await daemonNode.pubsubPublish(testTopic, msgData) await sleepAsync(100.millis) - await wait(publisher(), 1.minutes) # should be plenty of time + await wait(publisher(), 5.minutes) # should be plenty of time result = true await nativeNode.stop() @@ -184,7 +184,7 @@ proc testPubSubNodePublish(gossip: bool = false, await nativeNode.publish(testTopic, msgData) await sleepAsync(100.millis) - await wait(publisher(), 1.minutes) # should be plenty of time + await wait(publisher(), 5.minutes) # should be plenty of time result = finished await nativeNode.stop() diff --git a/tests/testmplex.nim b/tests/testmplex.nim index a457363fb..bad5edef5 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -1,5 +1,5 @@ import unittest, strformat, strformat, random -import chronos, nimcrypto/utils, chronicles +import chronos, nimcrypto/utils, chronicles, stew/byteutils import ../libp2p/[errors, connection, stream/lpstream, @@ -21,60 +21,47 @@ when defined(nimHasUsed): {.used.} suite "Mplex": teardown: for tracker in testTrackers(): + # echo tracker.dump() check tracker.isLeaked() == false test "encode header with channel id 0": - proc testEncodeHeader(): Future[bool] {.async.} = + proc testEncodeHeader() {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("000873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.New, cast[seq[byte]]("stream 1")) + await conn.close() - result = true - - await stream.close() - - check: - waitFor(testEncodeHeader()) == true + waitFor(testEncodeHeader()) test "encode header with channel id other than 0": - proc testEncodeHeader(): Future[bool] {.async.} = + proc testEncodeHeader() {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("88010873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(17, MessageType.New, cast[seq[byte]]("stream 1")) + await conn.close() - result = true - - await stream.close() - - check: - waitFor(testEncodeHeader()) == true + waitFor(testEncodeHeader()) test "encode header and body with channel id 0": - proc testEncodeHeaderBody(): Future[bool] {.async.} = - var step = 0 + proc testEncodeHeaderBody() {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("020873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.MsgOut, cast[seq[byte]]("stream 1")) + await conn.close() - result = true - - await stream.close() - - check: - waitFor(testEncodeHeaderBody()) == true + waitFor(testEncodeHeaderBody()) test "encode header and body with channel id other than 0": - proc testEncodeHeaderBody(): Future[bool] {.async.} = - var step = 0 + proc testEncodeHeaderBody() {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("8a010873747265616d2031") @@ -83,15 +70,10 @@ suite "Mplex": await conn.writeMsg(17, MessageType.MsgOut, cast[seq[byte]]("stream 1")) await conn.close() - result = true - - await stream.close() - - check: - waitFor(testEncodeHeaderBody()) == true + waitFor(testEncodeHeaderBody()) test "decode header with channel id 0": - proc testDecodeHeader(): Future[bool] {.async.} = + proc testDecodeHeader() {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("000873747265616d2031")) @@ -99,16 +81,12 @@ suite "Mplex": check msg.id == 0 check msg.msgType == MessageType.New + await conn.close() - result = true - - await stream.close() - - check: - waitFor(testDecodeHeader()) == true + waitFor(testDecodeHeader()) test "decode header and body with channel id 0": - proc testDecodeHeader(): Future[bool] {.async.} = + proc testDecodeHeader() {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) @@ -117,16 +95,12 @@ suite "Mplex": check msg.id == 0 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" + await conn.close() - result = true - - await stream.close() - - check: - waitFor(testDecodeHeader()) == true + waitFor(testDecodeHeader()) test "decode header and body with channel id other than 0": - proc testDecodeHeader(): Future[bool] {.async.} = + proc testDecodeHeader() {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) @@ -135,80 +109,115 @@ suite "Mplex": check msg.id == 17 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" + await conn.close() - result = true + waitFor(testDecodeHeader()) - await stream.close() + test "half closed - channel should close for write": + proc testClosedForWrite() {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newConnection(newBufferStream(writeHandler)) + chann = newChannel(1, conn, true) + try: + await chann.close() + await chann.write("Hello") + finally: + await chann.reset() + await conn.close() - check: - waitFor(testDecodeHeader()) == true + expect LPStreamEOFError: + waitFor(testClosedForWrite()) + + test "half closed - channel should close for read by remote": + proc testClosedForRead() {.async.} = + let + conn = newConnection(newBufferStream( + proc (data: seq[byte]) {.gcsafe, async.} = + result = nil + )) + chann = newChannel(1, conn, true) + + try: + await chann.pushTo(cast[seq[byte]]("Hello!")) + let closeFut = chann.closedByRemote() + + var data = newSeq[byte](6) + await chann.readExactly(addr data[0], 6) # this should work, since there is data in the buffer + await chann.readExactly(addr data[0], 6) # this should throw + await closeFut + finally: + await chann.close() + await conn.close() + + expect LPStreamEOFError: + waitFor(testClosedForRead()) + + test "should not allow pushing data to channel when remote end closed": + proc testResetWrite() {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newConnection(newBufferStream(writeHandler)) + chann = newChannel(1, conn, true) + try: + await chann.closedByRemote() + await chann.pushTo(@[byte(1)]) + finally: + await chann.close() + await conn.close() + + expect LPStreamEOFError: + waitFor(testResetWrite()) + + test "reset - channel should fail reading": + proc testResetRead() {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newConnection(newBufferStream(writeHandler)) + chann = newChannel(1, conn, true) + + try: + await chann.reset() + var data = newSeq[byte](1) + await chann.readExactly(addr data[0], 1) + doAssert(len(data) == 1) + finally: + await conn.close() + + expect LPStreamEOFError: + waitFor(testResetRead()) + + test "reset - channel should fail writing": + proc testResetWrite() {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newConnection(newBufferStream(writeHandler)) + chann = newChannel(1, conn, true) + try: + await chann.reset() + await chann.write(cast[seq[byte]]("Hello!")) + finally: + await conn.close() + + expect LPStreamEOFError: + waitFor(testResetWrite()) test "e2e - read/write receiver": - proc testNewStream(): Future[bool] {.async.} = + proc testNewStream() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - var - done = newFuture[void]() - done2 = newFuture[void]() - + var done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + let mplexListen = newMplex(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = let msg = await stream.readLp(1024) - check cast[string](msg) == "Hello from stream!" + check cast[string](msg) == "HELLO" await stream.close() done.complete() - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen await mplexListen.handle() - await conn.close() - done2.complete() - - let transport1: TcpTransport = newTransport(TcpTransport) - let lfut = await transport1.listen(ma, connHandler) - - let transport2: TcpTransport = newTransport(TcpTransport) - let conn = await transport2.dial(transport1.ma) - - let mplexDial = newMplex(conn) - let stream = await mplexDial.newStream() - let openState = cast[LPChannel](stream.stream).isOpen - await stream.writeLp("Hello from stream!") - await conn.close() - check openState # not lazy - - result = true - - await done.wait(5000.millis) - await done2.wait(5000.millis) - await stream.close() - await conn.close() - await transport2.close() - await transport1.close() - await lfut - - check: - waitFor(testNewStream()) == true - - test "e2e - read/write receiver lazy": - proc testNewStream(): Future[bool] {.async.} = - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - - var - done = newFuture[void]() - done2 = newFuture[void]() - - proc connHandler(conn: Connection) {.async, gcsafe.} = - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp(1024) - check cast[string](msg) == "Hello from stream!" - await stream.close() - done.complete() - - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen - await mplexListen.handle() - done2.complete() + await mplexListen.close() let transport1: TcpTransport = newTransport(TcpTransport) let listenFut = await transport1.listen(ma, connHandler) @@ -217,28 +226,61 @@ suite "Mplex": let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) - let stream = await mplexDial.newStream("", true) - let openState = cast[LPChannel](stream.stream).isOpen - await stream.writeLp("Hello from stream!") - await conn.close() - - check not openState # assert lazy - result = true - - await done.wait(5000.millis) - await done2.wait(5000.millis) - await conn.close() + let mplexDialFut = mplexDial.handle() + let stream = await mplexDial.newStream() + await stream.writeLp("HELLO") + check LPChannel(stream.stream).isOpen # not lazy await stream.close() - await mplexDial.close() - await transport2.close() - await transport1.close() + + await done.wait(1.seconds) + await conn.close() + await mplexDialFut + await allFuturesThrowing(transport1.close(), transport2.close()) await listenFut - check: - waitFor(testNewStream()) == true + waitFor(testNewStream()) + + test "e2e - read/write receiver lazy": + proc testNewStream() {.async.} = + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + + var done = newFuture[void]() + proc connHandler(conn: Connection) {.async, gcsafe.} = + let mplexListen = newMplex(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = + let msg = await stream.readLp(1024) + check cast[string](msg) == "HELLO" + await stream.close() + done.complete() + + await mplexListen.handle() + await mplexListen.close() + + let transport1: TcpTransport = newTransport(TcpTransport) + let listenFut = await transport1.listen(ma, connHandler) + + let transport2: TcpTransport = newTransport(TcpTransport) + let conn = await transport2.dial(transport1.ma) + + let mplexDial = newMplex(conn) + let stream = await mplexDial.newStream(lazy = true) + let mplexDialFut = mplexDial.handle() + check not LPChannel(stream.stream).isOpen # assert lazy + await stream.writeLp("HELLO") + check LPChannel(stream.stream).isOpen # assert lazy + await stream.close() + + await done.wait(1.seconds) + await conn.close() + await mplexDialFut + await allFuturesThrowing(transport1.close(), transport2.close()) + await listenFut + + waitFor(testNewStream()) test "e2e - write fragmented": - proc testNewStream(): Future[bool] {.async.} = + proc testNewStream() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") listenJob = newFuture[void]() @@ -248,17 +290,17 @@ suite "Mplex": bigseq.add(uint8(rand(uint('A')..uint('z')))) proc connHandler(conn: Connection) {.async, gcsafe.} = - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - defer: - await stream.close() + let mplexListen = newMplex(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = let msg = await stream.readLp(MaxMsgSize) check msg == bigseq trace "Bigseq check passed!" + await stream.close() listenJob.complete() - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen - discard mplexListen.handle() + await mplexListen.handle() + await mplexListen.close() let transport1: TcpTransport = newTransport(TcpTransport) let listenFut = await transport1.listen(ma, connHandler) @@ -267,41 +309,38 @@ suite "Mplex": let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) + let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() await stream.writeLp(bigseq) try: - await listenJob.wait(millis(5000)) + await listenJob.wait(10.seconds) except AsyncTimeoutError: check false - result = true - await stream.close() - await mplexDial.close() await conn.close() - await transport2.close() - await transport1.close() + await mplexDialFut + await allFuturesThrowing(transport1.close(), transport2.close()) await listenFut - check: - waitFor(testNewStream()) == true + waitFor(testNewStream()) test "e2e - read/write initiator": - proc testNewStream(): Future[bool] {.async.} = + proc testNewStream() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let done = newFuture[void]() - proc connHandler(conn: Connection) {.async, gcsafe.} = - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + let mplexListen = newMplex(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = await stream.writeLp("Hello from stream!") await stream.close() done.complete() - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen await mplexListen.handle() + await mplexListen.close() let transport1: TcpTransport = newTransport(TcpTransport) let listenFut = await transport1.listen(ma, connHandler) @@ -310,46 +349,39 @@ suite "Mplex": let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) - let dialFut = mplexDial.handle() + let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream("DIALER") let msg = cast[string](await stream.readLp(1024)) + await stream.close() check msg == "Hello from stream!" - # await dialFut - result = true - - await done.wait(5000.millis) - await stream.close() + await done.wait(1.seconds) await conn.close() - await mplexDial.close() - await transport2.close() - await transport1.close() + await mplexDialFut + await allFuturesThrowing(transport1.close(), transport2.close()) await listenFut - check: - waitFor(testNewStream()) == true + waitFor(testNewStream()) test "e2e - multiple streams": - proc testNewStream(): Future[bool] {.async.} = + proc testNewStream() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let done = newFuture[void]() - - var count = 1 - var listenConn: Connection proc connHandler(conn: Connection) {.async, gcsafe.} = - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + var count = 1 + let mplexListen = newMplex(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = let msg = await stream.readLp(1024) - check cast[string](msg) == &"stream {count}!" + check string.fromBytes(msg) == &"stream {count}!" count.inc await stream.close() if count == 10: done.complete() - listenConn = conn - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen await mplexListen.handle() + await mplexListen.close() let transport1 = newTransport(TcpTransport) let listenFut = await transport1.listen(ma, connHandler) @@ -358,34 +390,31 @@ suite "Mplex": let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) + # TODO: Reenable once half-closed is working properly + # let mplexDialFut = mplexDial.handle() for i in 1..10: let stream = await mplexDial.newStream() await stream.writeLp(&"stream {i}!") await stream.close() - await done.wait(5000.millis) + await done.wait(10.seconds) await conn.close() - await transport2.close() - await mplexDial.close() - await listenConn.close() - await transport1.close() + # await mplexDialFut + await allFuturesThrowing(transport1.close(), transport2.close()) await listenFut - result = true - - check: - waitFor(testNewStream()) == true + waitFor(testNewStream()) test "e2e - multiple read/write streams": - proc testNewStream(): Future[bool] {.async.} = + proc testNewStream() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") - var count = 1 - var listenConn: Connection let done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = - listenConn = conn - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + var count = 1 + let mplexListen = newMplex(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = let msg = await stream.readLp(1024) check cast[string](msg) == &"stream {count} from dialer!" await stream.writeLp(&"stream {count} from listener!") @@ -394,20 +423,17 @@ suite "Mplex": if count == 10: done.complete() - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen await mplexListen.handle() + await mplexListen.close() let transport1: TcpTransport = newTransport(TcpTransport) - let transportFut = await transport1.listen(ma, connHandler) + let listenFut = await transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) - let dialFut = mplexDial.handle() - dialFut.addCallback(proc(udata: pointer = nil) {.gcsafe.} - = trace "completed dialer") + let mplexDialFut = mplexDial.handle() for i in 1..10: let stream = await mplexDial.newStream("dialer stream") await stream.writeLp(&"stream {i} from dialer!") @@ -417,73 +443,29 @@ suite "Mplex": await done.wait(5.seconds) await conn.close() - await listenConn.close() - await allFuturesThrowing(dialFut) - await mplexDial.close() - await transport2.close() - await transport1.close() - await transportFut - result = true + await mplexDialFut + await allFuturesThrowing(transport1.close(), transport2.close()) + await listenFut - check: - waitFor(testNewStream()) == true - - test "half closed - channel should close for write": - proc testClosedForWrite(): Future[void] {.async.} = - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let - buff = newBufferStream(writeHandler) - conn = newConnection(buff) - chann = newChannel(1, conn, true) - try: - await chann.close() - await chann.write("Hello") - finally: - await chann.cleanUp() - await conn.close() - - expect LPStreamEOFError: - waitFor(testClosedForWrite()) - - test "half closed - channel should close for read by remote": - proc testClosedForRead(): Future[void] {.async.} = - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = - discard - - let - buff = newBufferStream(writeHandler) - conn = newConnection(buff) - chann = newChannel(1, conn, true) - - try: - await chann.pushTo(cast[seq[byte]]("Hello!")) - await chann.closedByRemote() - var data = newSeq[byte](6) - await chann.readExactly(addr data[0], 6) # this should work, since there is data in the buffer - await chann.readExactly(addr data[0], 6) # this should throw - finally: - await chann.cleanUp() - await conn.close() - - expect LPStreamEOFError: - waitFor(testClosedForRead()) + waitFor(testNewStream()) test "jitter - channel should be able to handle erratic read/writes": - proc test(): Future[bool] {.async.} = + proc test() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") var complete = newFuture[void]() const MsgSize = 1024 proc connHandler(conn: Connection) {.async, gcsafe.} = - proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + let mplexListen = newMplex(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = let msg = await stream.readLp(MsgSize) check msg.len == MsgSize await stream.close() complete.complete() - let mplexListen = newMplex(conn) - mplexListen.streamHandler = handleMplexListen - discard mplexListen.handle() + await mplexListen.handle() + await mplexListen.close() let transport1: TcpTransport = newTransport(TcpTransport) let listenFut = await transport1.listen(ma, connHandler) @@ -492,6 +474,7 @@ suite "Mplex": let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) + let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1) for _ in 0..