import unittest, sequtils, sugar, strformat, options, strformat, random import chronos, nimcrypto/utils, chronicles import ../libp2p/[connection, stream/lpstream, stream/bufferstream, transports/tcptransport, transports/transport, protocols/identify, multiaddress, muxers/mplex/mplex, muxers/mplex/coder, muxers/mplex/types, muxers/mplex/lpchannel, vbuffer, varint] when defined(nimHasUsed): {.used.} suite "Mplex": test "encode header with channel id 0": proc testEncodeHeader(): Future[bool] {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("000873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.New, cast[seq[byte]]("stream 1")) result = true check: waitFor(testEncodeHeader()) == true test "encode header with channel id other than 0": proc testEncodeHeader(): Future[bool] {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("88010873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(17, MessageType.New, cast[seq[byte]]("stream 1")) result = true check: waitFor(testEncodeHeader()) == true test "encode header and body with channel id 0": proc testEncodeHeaderBody(): Future[bool] {.async.} = var step = 0 proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("020873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.MsgOut, cast[seq[byte]]("stream 1")) result = true check: waitFor(testEncodeHeaderBody()) == true test "encode header and body with channel id other than 0": proc testEncodeHeaderBody(): Future[bool] {.async.} = var step = 0 proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("8a010873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(17, MessageType.MsgOut, cast[seq[byte]]("stream 1")) await conn.close() result = true check: waitFor(testEncodeHeaderBody()) == true test "decode header with channel id 0": proc testDecodeHeader(): Future[bool] {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("000873747265616d2031")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.New result = true check: waitFor(testDecodeHeader()) == true test "decode header and body with channel id 0": proc testDecodeHeader(): Future[bool] {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" result = true check: waitFor(testDecodeHeader()) == true test "decode header and body with channel id other than 0": proc testDecodeHeader(): Future[bool] {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 17 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" result = true check: waitFor(testDecodeHeader()) == true test "e2e - read/write receiver": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp() check cast[string](msg) == "Hello from stream!" await stream.close() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen discard mplexListen.handle() let transport1: TcpTransport = newTransport(TcpTransport) discard await transport1.listen(ma, connHandler) defer: await transport1.close() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) let stream = await mplexDial.newStream() let openState = cast[LPChannel](stream.stream).isOpen await stream.writeLp("Hello from stream!") await conn.close() check openState # not lazy result = true check: waitFor(testNewStream()) == true test "e2e - read/write receiver lazy": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp() check cast[string](msg) == "Hello from stream!" await stream.close() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen discard mplexListen.handle() let transport1: TcpTransport = newTransport(TcpTransport) discard await transport1.listen(ma, connHandler) defer: await transport1.close() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) let stream = await mplexDial.newStream("", true) let openState = cast[LPChannel](stream.stream).isOpen await stream.writeLp("Hello from stream!") await conn.close() check not openState # assert lazy result = true check: waitFor(testNewStream()) == true test "e2e - write limits": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") listenJob = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = defer: await stream.close() let msg = await stream.readLp() # we should not reach this anyway!! check false listenJob.complete() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen discard mplexListen.handle() let transport1: TcpTransport = newTransport(TcpTransport) discard await transport1.listen(ma, connHandler) defer: await transport1.close() let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) defer: await conn.close() let mplexDial = newMplex(conn) let stream = await mplexDial.newStream() var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1) for _ in 0.. buf.buffer.len: buf.buffer.len else: len var send = buf.buffer[0..