import unittest, strformat, strformat, random, oids, sequtils import chronos, nimcrypto/utils, chronicles, stew/byteutils import ../libp2p/[errors, stream/connection, stream/bufferstream, transports/tcptransport, transports/transport, multiaddress, muxers/mplex/mplex, muxers/mplex/coder, muxers/mplex/lpchannel, vbuffer, varint] import ./helpers {.used.} suite "Mplex": teardown: checkTrackers() suite "channel encoding": asyncTest "encode header with channel id 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("000873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header with channel id other than 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("88010873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("020873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id other than 0": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("8a010873747265616d2031") let conn = newBufferStream(encHandler) await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "decode header with channel id 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("000873747265616d2031")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.New await conn.close() asyncTest "decode header and body with channel id 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() asyncTest "decode header and body with channel id other than 0": let stream = newBufferStream() let conn = stream await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 17 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() suite "channel half-closed": asyncTest "(local close) - should close for write": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.close() expect LPStreamClosedError: await chann.write("Hello") await chann.reset() await conn.close() asyncTest "(local close) - should allow reads until remote closes": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = discard, ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.close() # closing channel # should be able to read on local clsoe await chann.readExactly(addr data[0], 3) # closing remote end let closeFut = chann.pushEof() # should still allow reading until buffer EOF await chann.readExactly(addr data[3], 3) expect LPStreamEOFError: # this should fail now await chann.readExactly(addr data[0], 3) await chann.close() await conn.close() await closeFut asyncTest "(remote close) - channel should close for reading by remote": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = discard, ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.readExactly(addr data[0], 3) let closeFut = chann.pushEof() # closing channel let readFut = chann.readExactly(addr data[3], 3) await all(closeFut, readFut) expect LPStreamEOFError: await chann.readExactly(addr data[0], 6) # this should fail now await chann.close() await conn.close() asyncTest "(remote close) - channel should allow writing on remote close": let testData = "Hello!".toBytes conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = discard ) chann = LPChannel.init(1, conn, true) await chann.pushEof() # closing channel try: await chann.writeLp(testData) finally: await chann.reset() # there's nobody reading the EOF! await conn.close() asyncTest "should not allow pushing data to channel when remote end closed": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushEof() var buf: array[1, byte] check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read expect LPStreamEOFError: await chann.pushData(@[byte(1)]) await chann.close() await conn.close() suite "channel reset": asyncTest "channel should fail reading": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() var data = newSeq[byte](1) expect LPStreamEOFError: await chann.readExactly(addr data[0], 1) await conn.close() asyncTest "should complete read": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let fut = chann.readExactly(addr data[0], 1) await chann.reset() expect LPStreamEOFError: await fut await conn.close() asyncTest "should complete pushData": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushData(@[0'u8]) let fut = chann.pushData(@[0'u8]) await chann.reset() check await fut.withTimeout(100.millis) await conn.close() asyncTest "should complete both read and push": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) let wfut = chann.pushData(@[0'u8]) let wfut2 = chann.pushData(@[0'u8]) await chann.reset() check await allFutures(rfut, wfut, wfut2).withTimeout(100.millis) await conn.close() asyncTest "channel should fail writing": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() expect LPStreamClosedError: await chann.write(("Hello!").toBytes) await conn.close() asyncTest "channel should reset on timeout": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) chann = LPChannel.init( 1, conn, true, timeout = 100.millis) check await chann.join().withTimeout(1.minutes) await conn.close() suite "mplex e2e": asyncTest "read/write receiver": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" await stream.close() done.complete() await mplexListen.handle() await mplexListen.close() let transport1: TcpTransport = TcpTransport.init() let listenFut = await transport1.listen(ma, connHandler) let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() await stream.writeLp("HELLO") check LPChannel(stream).isOpen # not lazy await stream.close() await done.wait(1.seconds) await conn.close() await mplexDialFut.wait(1.seconds) await allFuturesThrowing( transport1.close(), transport2.close()) await listenFut asyncTest "read/write receiver lazy": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" await stream.close() done.complete() await mplexListen.handle() await mplexListen.close() let transport1: TcpTransport = TcpTransport.init() let listenFut = await transport1.listen(ma, connHandler) let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) let stream = await mplexDial.newStream(lazy = true) let mplexDialFut = mplexDial.handle() check not LPChannel(stream).isOpen # assert lazy await stream.writeLp("HELLO") check LPChannel(stream).isOpen # assert lazy await stream.close() await done.wait(1.seconds) await conn.close() await mplexDialFut await allFuturesThrowing( transport1.close(), transport2.close()) await listenFut asyncTest "write fragmented": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() listenJob = newFuture[void]() var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) for _ in 0.. buf.buffer.len: buf.buffer.len else: size var send = buf.buffer[0..