import unittest, strformat, strformat, random import chronos, nimcrypto/utils, chronicles import ../libp2p/[errors, connection, stream/lpstream, stream/bufferstream, transports/tcptransport, transports/transport, multiaddress, muxers/mplex/mplex, muxers/mplex/coder, muxers/mplex/types, muxers/mplex/lpchannel, vbuffer, varint] when defined(nimHasUsed): {.used.} const StreamTransportTrackerName = "stream.transport" StreamServerTrackerName = "stream.server" suite "Mplex": teardown: let trackers = [ getTracker(BufferStreamTrackerName), getTracker(AsyncStreamWriterTrackerName), getTracker(TcpTransportTrackerName), getTracker(AsyncStreamReaderTrackerName), getTracker(StreamTransportTrackerName), getTracker(StreamServerTrackerName) ] for tracker in trackers: if not isNil(tracker): # echo tracker.dump() check tracker.isLeaked() == false test "encode header with channel id 0": proc testEncodeHeader(): Future[bool] {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("000873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.New, cast[seq[byte]]("stream 1")) result = true await stream.close() check: waitFor(testEncodeHeader()) == true test "encode header with channel id other than 0": proc testEncodeHeader(): Future[bool] {.async.} = proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("88010873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(17, MessageType.New, cast[seq[byte]]("stream 1")) result = true await stream.close() check: waitFor(testEncodeHeader()) == true test "encode header and body with channel id 0": proc testEncodeHeaderBody(): Future[bool] {.async.} = var step = 0 proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("020873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(0, MessageType.MsgOut, cast[seq[byte]]("stream 1")) result = true await stream.close() check: waitFor(testEncodeHeaderBody()) == true test "encode header and body with channel id other than 0": proc testEncodeHeaderBody(): Future[bool] {.async.} = var step = 0 proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("8a010873747265616d2031") let stream = newBufferStream(encHandler) let conn = newConnection(stream) await conn.writeMsg(17, MessageType.MsgOut, cast[seq[byte]]("stream 1")) await conn.close() result = true await stream.close() check: waitFor(testEncodeHeaderBody()) == true test "decode header with channel id 0": proc testDecodeHeader(): Future[bool] {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("000873747265616d2031")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.New result = true await stream.close() check: waitFor(testDecodeHeader()) == true test "decode header and body with channel id 0": proc testDecodeHeader(): Future[bool] {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 0 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" result = true await stream.close() check: waitFor(testDecodeHeader()) == true test "decode header and body with channel id other than 0": proc testDecodeHeader(): Future[bool] {.async.} = let stream = newBufferStream() let conn = newConnection(stream) await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 17 check msg.msgType == MessageType.MsgOut check cast[string](msg.data) == "hello from channel 0!!" result = true await stream.close() check: waitFor(testDecodeHeader()) == true test "e2e - read/write receiver": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") var done = newFuture[void]() done2 = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp() check cast[string](msg) == "Hello from stream!" await stream.close() done.complete() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen await mplexListen.handle() await conn.close() done2.complete() let transport1: TcpTransport = newTransport(TcpTransport) let lfut = await transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) let stream = await mplexDial.newStream() let openState = cast[LPChannel](stream.stream).isOpen await stream.writeLp("Hello from stream!") await conn.close() check openState # not lazy result = true await done.wait(5000.millis) await done2.wait(5000.millis) await stream.close() await conn.close() await transport2.close() await transport1.close() await lfut check: waitFor(testNewStream()) == true test "e2e - read/write receiver lazy": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") var done = newFuture[void]() done2 = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp() check cast[string](msg) == "Hello from stream!" await stream.close() done.complete() let mplexListen = newMplex(conn) mplexListen.streamHandler = handleMplexListen await mplexListen.handle() done2.complete() let transport1: TcpTransport = newTransport(TcpTransport) let listenFut = await transport1.listen(ma, connHandler) let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) let stream = await mplexDial.newStream("", true) let openState = cast[LPChannel](stream.stream).isOpen await stream.writeLp("Hello from stream!") await conn.close() check not openState # assert lazy result = true await done.wait(5000.millis) await done2.wait(5000.millis) await conn.close() await stream.close() await mplexDial.close() await transport2.close() await transport1.close() await listenFut check: waitFor(testNewStream()) == true test "e2e - write fragmented": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") listenJob = newFuture[void]() var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) for _ in 0.. buf.buffer.len: buf.buffer.len else: size var send = buf.buffer[0..