{.used.} # Nim-Libp2p # Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) # at your option. # This file may not be copied, modified, or distributed except according to # those terms. import strformat, random, sequtils import chronos, nimcrypto/utils, chronicles, stew/byteutils import ../libp2p/[errors, stream/connection, stream/bufferstream, transports/tcptransport, transports/transport, multiaddress, muxers/mplex/mplex, muxers/mplex/coder, muxers/mplex/lpchannel, upgrademngrs/upgrade, vbuffer, varint] import ./helpers suite "Mplex": teardown: checkTrackers() suite "channel encoding": asyncTest "encode header with channel id 0": proc encHandler( msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("000873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header with channel id other than 0": proc encHandler( msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("88010873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id 0": proc encHandler( msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("020873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "encode header and body with channel id other than 0": proc encHandler( msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = check msg == fromHex("8a010873747265616d2031") let conn = TestBufferStream.new(encHandler) await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() asyncTest "decode header with channel id 0": let stream = BufferStream.new() let conn = stream await stream.pushData(fromHex("000873747265616d2031")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.New await conn.close() asyncTest "decode header and body with channel id 0": let stream = BufferStream.new() let conn = stream await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() asyncTest "decode header and body with channel id other than 0": let stream = BufferStream.new() let conn = stream await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 17 check msg.msgType == MessageType.MsgOut check string.fromBytes(msg.data) == "hello from channel 0!!" await conn.close() suite "channel half-closed": asyncTest "(local close) - should close for write": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.close() expect LPStreamClosedError: await chann.write("Hello") await chann.reset() await conn.close() asyncTest "(local close) - should allow reads until remote closes": let conn = TestBufferStream.new( proc ( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard, ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.close() # closing channel # should be able to read on local clsoe await chann.readExactly(addr data[0], 3) # closing remote end let closeFut = chann.pushEof() # should still allow reading until buffer EOF await chann.readExactly(addr data[3], 3) expect LPStreamRemoteClosedError: # this should fail now await chann.readExactly(addr data[0], 3) await chann.close() await conn.close() await closeFut asyncTest "(remote close) - channel should close for reading by remote": let conn = TestBufferStream.new( proc ( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard, ) chann = LPChannel.init(1, conn, true) await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.readExactly(addr data[0], 3) let closeFut = chann.pushEof() # closing channel let readFut = chann.readExactly(addr data[3], 3) await allFutures(closeFut, readFut) expect LPStreamRemoteClosedError: await chann.readExactly(addr data[0], 6) # this should fail now await chann.close() await conn.close() asyncTest "(remote close) - channel should allow writing on remote close": let testData = "Hello!".toBytes conn = TestBufferStream.new( proc ( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard ) chann = LPChannel.init(1, conn, true) await chann.pushEof() # closing channel try: await chann.writeLp(testData) finally: await chann.reset() # there's nobody reading the EOF! await conn.close() asyncTest "should not allow pushing data to channel when remote end closed": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushEof() var buf: array[1, byte] check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read expect LPStreamClosedError: await chann.pushData(@[byte(1)]) await chann.close() await conn.close() suite "channel reset": asyncTest "channel should fail reading": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() var data = newSeq[byte](1) expect LPStreamClosedError: await chann.readExactly(addr data[0], 1) await conn.close() asyncTest "reset should complete read": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let fut = chann.readExactly(addr data[0], 1) await chann.reset() expect LPStreamClosedError: await fut await conn.close() asyncTest "reset should complete pushData": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) proc pushes() {.async.} = # pushes don't hang on reset await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) let push = pushes() await chann.reset() check await allFutures(push).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and push": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let futs = [ chann.readExactly(addr data[0], 1), chann.pushData(@[0'u8]), ] await chann.reset() check await allFutures(futs).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and pushes": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let read = chann.readExactly(addr data[0], 1) proc pushes() {.async.} = await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.reset() check await allFutures(read, pushes()).withTimeout(100.millis) await conn.close() asyncTest "reset should complete both read and push with cancel": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) rfut.cancel() let xfut = chann.reset() check await allFutures(rfut, xfut).withTimeout(100.millis) await conn.close() asyncTest "should complete both read and push after reset": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) let rfut2 = sleepAsync(1.millis) or rfut await sleepAsync(5.millis) let wfut = chann.pushData(@[0'u8]) let wfut2 = chann.pushData(@[0'u8]) await chann.reset() check await allFutures(rfut, rfut2, wfut, wfut2).withTimeout(100.millis) await conn.close() asyncTest "reset should complete ongoing push without reader": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.pushData(@[0'u8]) let push1 = chann.pushData(@[0'u8]) await chann.reset() check await allFutures(push1).withTimeout(100.millis) await conn.close() asyncTest "reset should complete ongoing read without a push": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) let rfut = chann.readExactly(addr data[0], 1) await chann.reset() check await allFutures(rfut).withTimeout(100.millis) await conn.close() asyncTest "reset should allow all reads and pushes to complete": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) var data = newSeq[byte](1) proc writer() {.async.} = await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) await chann.pushData(@[0'u8]) proc reader() {.async.} = await chann.readExactly(addr data[0], 1) await chann.readExactly(addr data[0], 1) await chann.readExactly(addr data[0], 1) let rw = @[writer(), reader()] await chann.close() check await chann.reset() # this would hang .withTimeout(100.millis) check await allFuturesThrowing( allFinished(rw)) .withTimeout(100.millis) await conn.close() asyncTest "channel should fail writing": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init(1, conn, true) await chann.reset() expect LPStreamClosedError: await chann.write(("Hello!").toBytes) await conn.close() asyncTest "channel should reset on timeout": proc writeHandler( data: seq[byte] ) {.async: (raises: [CancelledError, LPStreamError]).} = discard let conn = TestBufferStream.new(writeHandler) chann = LPChannel.init( 1, conn, true, timeout = 100.millis) check await chann.join().withTimeout(1.minutes) await conn.close() suite "mplex e2e": asyncTest "read/write receiver": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" except CancelledError, LPStreamError: return finally: await stream.close() await mplexListen.handle() await mplexListen.close() let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() await stream.writeLp("HELLO") check LPChannel(stream).isOpen # not lazy await stream.close() await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut.wait(1.seconds) await allFuturesThrowing( transport1.stop(), transport2.stop()) await listenFut asyncTest "read/write receiver lazy": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async.} = let conn = await transport1.accept() let mplexListen = Mplex.new(conn) mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} = try: let msg = await stream.readLp(1024) check string.fromBytes(msg) == "HELLO" except CancelledError, LPStreamError: return finally: await stream.close() await mplexListen.handle() await mplexListen.close() let acceptFut = acceptHandler() let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) let conn = await transport2.dial(transport1.addrs[0]) let mplexDial = Mplex.new(conn) let stream = await mplexDial.newStream(lazy = true) let mplexDialFut = mplexDial.handle() check not LPChannel(stream).isOpen # assert lazy await stream.writeLp("HELLO") check LPChannel(stream).isOpen # assert lazy await stream.close() await conn.close() await acceptFut.wait(1.seconds) await mplexDialFut await allFuturesThrowing( transport1.stop(), transport2.stop()) await listenFut asyncTest "write fragmented": let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] listenJob = newFuture[void]() var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) for _ in 0.. buf.buffer.len: buf.buffer.len else: size var send = buf.buffer[0..