import unittest, strformat, strformat, random, oids, sequtils import chronos, nimcrypto/utils, chronicles, stew/byteutils import ../libp2p/[errors, stream/connection, stream/bufferstream, transports/tcptransport, transports/transport, multiaddress, muxers/mplex/mplex, muxers/mplex/coder, muxers/mplex/lpchannel, vbuffer, varint] import ./helpers {.used.} suite "Mplex": teardown: checkTrackers() suite "channel encoding": asyncTest "encode header with channel id 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("000873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header with channel id other than 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("88010873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("020873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id other than 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("8a010873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "decode header with channel id 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("000873747265616d2031")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.New await conn.close() asyncTest "decode header and body with channel id 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() asyncTest "decode header and body with channel id other than 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 17 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() suite "channel half-closed": asyncTest "(local close) - should close for write": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.close() expect LPStreamClosedError: await chann.write("Hello") await chann.reset() await conn.close() asyncTest "(local close) - should allow reads until remote closes": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = discard, ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.close() # closing channel # should be able to read on local clsoe await chann.readExactly(addr data[0], 3) # closing remote end let closeFut = chann.pushEof() # should still allow reading until buffer EOF await chann.readExactly(addr data[3], 3) expect LPStreamEOFError: # this should fail now await chann.readExactly(addr data[0], 3) await chann.close() await conn.close() await closeFut asyncTest "(remote close) - channel should close for reading by remote": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = discard, ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.readExactly(addr data[0], 3) let closeFut = chann.pushEof() # closing channel let readFut = chann.readExactly(addr data[3], 3) await allFutures(closeFut, readFut) expect LPStreamEOFError: await chann.readExactly(addr data[0], 6) # this should fail now await chann.close() await conn.close() asyncTest "(remote close) - channel should allow writing on remote close": let testData = "Hello!".toBytes conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = discard ) chann = LPChannel.init(1, conn, true) await chann.pushEof() # closing channel try: await chann.writeLp(testData) finally: await chann.reset() # there's nobody reading the EOF! await conn.close() asyncTest "should not allow pushing data to channel when remote end closed": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushEof() var buf: array[1, byte] check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read expect LPStreamEOFError: await chann.pushData(@[byte(1)]) await chann.close() await conn.close() suite "channel reset": asyncTest "channel should fail reading": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() var data = newSeq[byte](1) expect LPStreamEOFError: await chann.readExactly(addr data[0], 1) await conn.close() asyncTest "reset should complete read": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let fut = chann.readExactly(addr data[0], 1) await chann.reset() expect LPStreamEOFError: await fut await conn.close() asyncTest "reset should complete pushData": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) proc pushes() {.async.} = # pushes don't hang on reset await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) let push = pushes() await chann.reset() check await allFutures(push).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and push": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let futs = [ chann.readExactly(addr data[0], 1), chann.pushData(@[0'u8]), ] await chann.reset() check await allFutures(futs).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and pushes": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let read = chann.readExactly(addr data[0], 1) proc pushes() {.async.} = await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.reset() check await allFutures(read, pushes()).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and push with cancel": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) rfut.cancel() let xfut = chann.reset() check await allFutures(rfut, xfut).withTimeout(100.millis) await conn.close() asyncTest "should complete both read and push after reset": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) let rfut2 = sleepAsync(1.millis) or rfut await sleepAsync(5.millis) let wfut = chann.pushData(@[0'u8]) let wfut2 = chann.pushData(@[0'u8]) await chann.reset() check await allFutures(rfut, rfut2, wfut, wfut2).withTimeout(100.millis) await conn.close() asyncTest "reset should complete ongoing push without reader": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushData(@[0'u8]) let push1 = chann.pushData(@[0'u8]) await chann.reset() check await allFutures(push1).withTimeout(100.millis) await conn.close() asyncTest "reset should complete ongoing read without a push": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) await chann.reset() check await allFutures(rfut).withTimeout(100.millis) await conn.close() asyncTest "reset should allow all reads and pushes to complete": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) proc writer() {.async.} = await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) proc reader() {.async.} = await chann.readExactly(addr data[0], 1) await chann.readExactly(addr data[0], 1) await chann.readExactly(addr data[0], 1) let rw = @[writer(), reader()] await chann.close() check await chann.reset() # this would hang .withTimeout(100.millis) check await allFuturesThrowing( allFinished(rw)) .withTimeout(100.millis) await conn.close() asyncTest "channel should fail writing": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() expect LPStreamClosedError: await chann.write(("Hello!").toBytes) await conn.close() asyncTest "channel should reset on timeout": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init( 1, conn, true, timeout = 100.millis) check await chann.join().withTimeout(1.minutes) await conn.close() suite "mplex e2e": asyncTest "read/write receiver": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let transport1: TcpTransport = TcpTransport.init() let listenFut = transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = let conn = await transport1.accept() let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" await stream.close() await mplexListen.handle() await mplexListen.close() let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() await stream.writeLp("HELLO") check LPChannel(stream).isOpen # not lazy await stream.close() await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut.wait(1.seconds) await allFuturesThrowing( transport1.stop(), transport2.stop()) await listenFut asyncTest "read/write receiver lazy": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let transport1: TcpTransport = TcpTransport.init() let listenFut = transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = let conn = await transport1.accept() let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" await stream.close() await mplexListen.handle() await mplexListen.close() let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) let stream = await mplexDial.newStream(lazy = true) let mplexDialFut = mplexDial.handle() check not LPChannel(stream).isOpen # assert lazy await stream.writeLp("HELLO") check LPChannel(stream).isOpen # assert lazy await stream.close() await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut await allFuturesThrowing( transport1.stop(), transport2.stop()) await listenFut asyncTest "write fragmented": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() listenJob = newFuture[void]() var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) for _ in 0.. buf.buffer.len: buf.buffer.len else: size var send = buf.buffer[0..