{.used.} # Nim-Libp2p # Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) # at your option. # This file may not be copied, modified, or distributed except according to # those terms. import strformat, random, sequtils import chronos, nimcrypto/utils, chronicles, stew/byteutils import ../libp2p/[ errors, stream/connection, stream/bufferstream, transports/tcptransport, transports/transport, multiaddress, muxers/mplex/mplex, muxers/mplex/coder, muxers/mplex/lpchannel, upgrademngrs/upgrade, vbuffer, varint, ] import ./helpers suite "Mplex": teardown: checkTrackers() suite "channel encoding": asyncTest "encode header with channel id 0": proc encHandler( msg: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("000873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header with channel id other than 0": proc encHandler( msg: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("88010873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id 0": proc encHandler( msg: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("020873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id other than 0": proc encHandler( msg: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("8a010873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "decode header with channel id 0": let stream = BufferStream.new() let conn = stream await stream.pushData(fromHex("000873747265616d2031")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.New await conn.close() asyncTest "decode header and body with channel id 0": let stream = BufferStream.new() let conn = stream await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() asyncTest "decode header and body with channel id other than 0": let stream = BufferStream.new() let conn = stream await stream.pushData( fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121") ) let msg = await conn.readMsg() check msg.id == 17 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() suite "channel half-closed": asyncTest "(local close) - should close for write": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.close() expect LPStreamClosedError: await chann.write("Hello") await chann.reset() await conn.close() asyncTest "(local close) - should allow reads until remote closes": let conn = TestBufferStream.new( proc(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = discard ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.close() # closing channel # should be able to read on local clsoe await chann.readExactly(addr data[0], 3) # closing remote end let closeFut = chann.pushEof() # should still allow reading until buffer EOF await chann.readExactly(addr data[3], 3) expect LPStreamRemoteClosedError: # this should fail now await chann.readExactly(addr data[0], 3) await chann.close() await conn.close() await closeFut asyncTest "(remote close) - channel should close for reading by remote": let conn = TestBufferStream.new( proc(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = discard ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.readExactly(addr data[0], 3) let closeFut = chann.pushEof() # closing channel let readFut = chann.readExactly(addr data[3], 3) await allFutures(closeFut, readFut) expect LPStreamRemoteClosedError: await chann.readExactly(addr data[0], 6) # this should fail now await chann.close() await conn.close() asyncTest "(remote close) - channel should allow writing on remote close": let testData = "Hello!".toBytes conn = TestBufferStream.new( proc(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = discard ) chann = LPChannel.init(1, conn, true) await chann.pushEof() # closing channel try: await chann.writeLp(testData) finally: await chann.reset() # there's nobody reading the EOF! await conn.close() asyncTest "should not allow pushing data to channel when remote end closed": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushEof() var buf: array[1, byte] check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read expect LPStreamClosedError: await chann.pushData(@[byte(1)]) await chann.close() await conn.close() suite "channel reset": asyncTest "channel should fail reading": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() var data = newSeq[byte](1) expect LPStreamClosedError: await chann.readExactly(addr data[0], 1) await conn.close() asyncTest "reset should complete read": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let fut = chann.readExactly(addr data[0], 1) await chann.reset() expect LPStreamClosedError: await fut await conn.close() asyncTest "reset should complete pushData": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) proc pushes() {.async.} = # pushes don't hang on reset await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) let push = pushes() await chann.reset() check await allFutures(push).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and push": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let futs = [chann.readExactly(addr data[0], 1), chann.pushData(@[0'u8])] await chann.reset() check await allFutures(futs).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and pushes": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let read = chann.readExactly(addr data[0], 1) proc pushes() {.async.} = await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.reset() check await allFutures(read, pushes()).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and push with cancel": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) rfut.cancel() let xfut = chann.reset() check await allFutures(rfut, xfut).withTimeout(100.millis) await conn.close() asyncTest "should complete both read and push after reset": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) let rfut2 = sleepAsync(1.millis) or rfut await sleepAsync(5.millis) let wfut = chann.pushData(@[0'u8]) let wfut2 = chann.pushData(@[0'u8]) await chann.reset() check await allFutures(rfut, rfut2, wfut, wfut2).withTimeout(100.millis) await conn.close() asyncTest "reset should complete ongoing push without reader": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushData(@[0'u8]) let push1 = chann.pushData(@[0'u8]) await chann.reset() check await allFutures(push1).withTimeout(100.millis) await conn.close() asyncTest "reset should complete ongoing read without a push": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) await chann.reset() check await allFutures(rfut).withTimeout(100.millis) await conn.close() asyncTest "reset should allow all reads and pushes to complete": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) proc writer() {.async.} = await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) proc reader() {.async.} = await chann.readExactly(addr data[0], 1) await chann.readExactly(addr data[0], 1) await chann.readExactly(addr data[0], 1) let rw = @[writer(), reader()] await chann.close() check await chann.reset() # this would hang .withTimeout(100.millis) check await allFuturesThrowing(allFinished(rw)).withTimeout(100.millis) await conn.close() asyncTest "channel should fail writing": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() expect LPStreamClosedError: await chann.write(("Hello!").toBytes) await conn.close() asyncTest "channel should reset on timeout": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true, timeout = 100.millis) check await chann.join().withTimeout(1.minutes) await conn.close() suite "mplex e2e": asyncTest "read/write receiver": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" except CancelledError, LPStreamError: return finally: await stream.close() await mplexListen.handle() await mplexListen.close() let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() await stream.writeLp("HELLO") check LPChannel(stream).isOpen # not lazy await stream.close() await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut.wait(1.seconds) await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut asyncTest "read/write receiver lazy": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" except CancelledError, LPStreamError: return finally: await stream.close() await mplexListen.handle() await mplexListen.close() let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let stream = await mplexDial.newStream(lazy = true) let mplexDialFut = mplexDial.handle() check not LPChannel(stream).isOpen # assert lazy await stream.writeLp("HELLO") check LPChannel(stream).isOpen # assert lazy await stream.close() await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut asyncTest "write fragmented": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] listenJob = newFuture[void]() var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) for _ in 0 ..< MaxMsgSize: bigseq.add(uint8(rand(uint('A') .. uint('z')))) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async.} = try: let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(MaxMsgSize) check msg == bigseq trace "Bigseq check passed!" except CancelledError, LPStreamError: return finally: await stream.close() listenJob.complete() await mplexListen.handle() await sleepAsync(1.seconds) # give chronos some slack to process things await mplexListen.close() except CancelledError as exc: raise exc except CatchableError as exc: check false let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() await stream.writeLp(bigseq) await listenJob.wait(10.seconds) await stream.close() await conn.close() await acceptFut await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut asyncTest "read/write initiator": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: await stream.writeLp("Hello from stream!") except CancelledError, LPStreamError: return finally: await stream.close() await mplexListen.handle() await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream("DIALER") let msg = string.fromBytes(await stream.readLp(1024)) await stream.close() check msg == "Hello from stream!" await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut asyncTest "multiple streams": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) let done = newFuture[void]() proc acceptHandler() {.async.} = var count = 1 let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(1024) try: check string.fromBytes(msg) == &"stream {count}!" except ValueError as exc: raiseAssert(exc.msg) count.inc if count == 11: done.complete() except CancelledError, LPStreamError: return finally: await stream.close() await mplexListen.handle() await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) # TODO: Reenable once half-closed is working properly let mplexDialFut = mplexDial.handle() for i in 1 .. 10: let stream = await mplexDial.newStream() await stream.writeLp(&"stream {i}!") await stream.close() await done.wait(10.seconds) await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut asyncTest "multiple read/write streams": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) let done = newFuture[void]() proc acceptHandler() {.async.} = var count = 1 let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(1024) try: check string.fromBytes(msg) == &"stream {count} from dialer!" await stream.writeLp(&"stream {count} from listener!") except ValueError as exc: raiseAssert(exc.msg) count.inc if count == 11: done.complete() except CancelledError, LPStreamError: return finally: await stream.close() await mplexListen.handle() await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() for i in 1 .. 10: let stream = await mplexDial.newStream("dialer stream") await stream.writeLp(&"stream {i} from dialer!") let msg = await stream.readLp(1024) check string.fromBytes(msg) == &"stream {i} from listener!" await stream.close() await done.wait(5.seconds) await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut await mplexDial.close() await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut asyncTest "channel closes listener with EOF": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenStreams: seq[Connection] proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = listenStreams.add(stream) try: discard await stream.readLp(1024) except LPStreamEOFError: return except CancelledError, LPStreamError: return finally: await stream.close() check false await mplexListen.handle() await mplexListen.close() await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream()) check: unorderedCompare(dialStreams, mplexDial.getStreams()) for i, s in dialStreams: await s.closeWithEOF() check listenStreams[i].closed check s.closed checkTracker(LPChannelTrackerName) await conn.close() await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await acceptFut asyncTest "channel closes dialer with EOF": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var count = 0 var done = newFuture[void]() var listenStreams: seq[Connection] proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = listenStreams.add(stream) count.inc() if count == 10: done.complete() await noCancel stream.join() await mplexListen.handle() await mplexListen.close() await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream()) check: unorderedCompare(dialStreams, mplexDial.getStreams()) proc dialReadLoop() {.async.} = for s in dialStreams: try: discard await s.readLp(1024) check false except LPStreamEOFError: await s.close() continue check false await done let readLoop = dialReadLoop() for s in listenStreams: await s.closeWithEOF() check s.closed await readLoop await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join())) checkTracker(LPChannelTrackerName) await conn.close() await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await acceptFut asyncTest "dialing mplex closes both ends": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenStreams: seq[Connection] proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = listenStreams.add(stream) await noCancel stream.join() await mplexListen.handle() await mplexListen.close() await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream()) check: unorderedCompare(dialStreams, mplexDial.getStreams()) await mplexDial.close() await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join())) checkTracker(LPChannelTrackerName) await conn.close() await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await acceptFut asyncTest "listening mplex closes both ends": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var mplexListen: Mplex var listenStreams: seq[Connection] proc acceptHandler() {.async.} = let conn = await transport1.accept() mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = listenStreams.add(stream) await noCancel stream.join() await mplexListen.handle() await mplexListen.close() await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream()) check: unorderedCompare(dialStreams, mplexDial.getStreams()) checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10 await mplexListen.close() await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join())) checkTracker(LPChannelTrackerName) await conn.close() await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await acceptFut asyncTest "canceling mplex handler closes both ends": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var mplexHandle: Future[void] var listenStreams: seq[Connection] proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = listenStreams.add(stream) await noCancel stream.join() mplexHandle = mplexListen.handle() await mplexHandle await mplexListen.close() await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream()) check: unorderedCompare(dialStreams, mplexDial.getStreams()) checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10 mplexHandle.cancel() await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join())) checkTracker(LPChannelTrackerName) await conn.close() await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) asyncTest "closing dialing connection should close both ends": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenStreams: seq[Connection] proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = listenStreams.add(stream) await noCancel stream.join() await mplexListen.handle() await mplexListen.close() await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream()) check: unorderedCompare(dialStreams, mplexDial.getStreams()) checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10 await conn.close() await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join())) checkTracker(LPChannelTrackerName) await conn.closeWithEOF() await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await acceptFut asyncTest "canceling listening connection should close both ends": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1 = TcpTransport.new(upgrade = Upgrade()) var listenConn: Connection var listenStreams: seq[Connection] proc acceptHandler() {.async.} = listenConn = await transport1.accept() let mplexListen = Mplex.new(listenConn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = listenStreams.add(stream) await noCancel stream.join() await mplexListen.handle() await mplexListen.close() await transport1.start(ma) let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream()) check: unorderedCompare(dialStreams, mplexDial.getStreams()) checkUntilTimeout: listenStreams.len == 10 and dialStreams.len == 10 await listenConn.closeWithEOF() await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join())) checkTracker(LPChannelTrackerName) await conn.close() await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await acceptFut suite "jitter": asyncTest "channel should be able to handle erratic read/writes": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) var complete = newFuture[void]() const MsgSize = 1024 proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(MsgSize) check msg.len == MsgSize except CancelledError as e: echo e.msg except LPStreamError as e: echo e.msg await stream.close() complete.complete() await mplexListen.handle() await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1) for _ in 0 ..< MsgSize: # write one less than max size bigseq.add(uint8(rand(uint('A') .. uint('z')))) ## create length prefixed libp2p frame var buf = initVBuffer() buf.writeSeq(bigseq) buf.finish() ## create mplex header var mplexBuf = initVBuffer() mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint) mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent await conn.write(mplexBuf.buffer) proc writer() {.async.} = var sent = 0 randomize() let total = buf.buffer.len const min = 20 const max = 50 while sent < total: var size = rand(min .. max) size = if size > buf.buffer.len: buf.buffer.len else: size var send = buf.buffer[0 ..< size] await conn.write(send) sent += size buf.buffer = buf.buffer[size ..^ 1] await writer() await complete.wait(1.seconds) await stream.close() await conn.close() await acceptFut await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut asyncTest "channel should handle 1 byte read/write": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) var complete = newFuture[void]() const MsgSize = 512 proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(MsgSize) check msg.len == MsgSize except CancelledError, LPStreamError: return finally: await stream.close() complete.complete() await mplexListen.handle() await mplexListen.close() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let acceptFut = acceptHandler() let mplexDial = Mplex.new(conn) let stream = await mplexDial.newStream() let mplexDialFut = mplexDial.handle() var bigseq = newSeqOfCap[uint8](MsgSize + 1) for _ in 0 ..< MsgSize: # write one less than max size bigseq.add(uint8(rand(uint('A') .. uint('z')))) ## create length prefixed libp2p frame var buf = initVBuffer() buf.writeSeq(bigseq) buf.finish() ## create mplex header var mplexBuf = initVBuffer() mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint) mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent await conn.write(mplexBuf.buffer) proc writer() {.async.} = for i in buf.buffer: await conn.write(@[i]) await writer() await complete.wait(5.seconds) await stream.close() await conn.close() await acceptFut await mplexDialFut await allFuturesThrowing(transport1.stop(), transport2.stop()) await listenFut