nim-libp2p/tests/testmplex.nim

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1167 lines
37 KiB
Nim
Raw Normal View History

{.used.}
# Nim-Libp2p
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import strformat, random, sequtils
import chronos, nimcrypto/utils, chronicles, stew/byteutils
import
../libp2p/[
errors,
stream/connection,
stream/bufferstream,
transports/tcptransport,
transports/transport,
2019-10-03 19:30:22 +00:00
multiaddress,
muxers/mplex/mplex,
muxers/mplex/coder,
2020-03-11 15:12:08 +00:00
muxers/mplex/lpchannel,
upgrademngrs/upgrade,
2020-03-11 15:12:08 +00:00
vbuffer,
varint,
]
2019-09-03 20:40:51 +00:00
import ./helpers
2019-09-03 20:40:51 +00:00
suite "Mplex":
teardown:
checkTrackers()
suite "channel encoding":
asyncTest "encode header with channel id 0":
proc encHandler(
msg: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("000873747265616d2031")
let conn = TestBufferStream.new(encHandler)
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
await conn.close()
asyncTest "encode header with channel id other than 0":
proc encHandler(
msg: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("88010873747265616d2031")
let conn = TestBufferStream.new(encHandler)
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
await conn.close()
asyncTest "encode header and body with channel id 0":
proc encHandler(
msg: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("020873747265616d2031")
let conn = TestBufferStream.new(encHandler)
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
await conn.close()
asyncTest "encode header and body with channel id other than 0":
proc encHandler(
msg: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("8a010873747265616d2031")
let conn = TestBufferStream.new(encHandler)
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
await conn.close()
asyncTest "decode header with channel id 0":
let stream = BufferStream.new()
let conn = stream
await stream.pushData(fromHex("000873747265616d2031"))
2019-09-08 06:32:41 +00:00
let msg = await conn.readMsg()
2019-09-03 20:40:51 +00:00
2020-02-12 14:43:42 +00:00
check msg.id == 0
check msg.msgType == MessageType.New
await conn.close()
asyncTest "decode header and body with channel id 0":
let stream = BufferStream.new()
let conn = stream
await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
2019-09-08 06:32:41 +00:00
let msg = await conn.readMsg()
2020-02-12 14:43:42 +00:00
check msg.id == 0
check msg.msgType == MessageType.MsgOut
check string.fromBytes(msg.data) == "hello from channel 0!!"
await conn.close()
asyncTest "decode header and body with channel id other than 0":
let stream = BufferStream.new()
let conn = stream
await stream.pushData(
fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")
)
2019-09-08 06:32:41 +00:00
let msg = await conn.readMsg()
2020-02-12 14:43:42 +00:00
check msg.id == 17
check msg.msgType == MessageType.MsgOut
check string.fromBytes(msg.data) == "hello from channel 0!!"
await conn.close()
suite "channel half-closed":
asyncTest "(local close) - should close for write":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
await chann.close()
expect LPStreamClosedError:
await chann.write("Hello")
await chann.reset()
await conn.close()
asyncTest "(local close) - should allow reads until remote closes":
let
conn = TestBufferStream.new(
proc(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
)
chann = LPChannel.init(1, conn, true)
await chann.pushData(("Hello!").toBytes)
var data = newSeq[byte](6)
await chann.close() # closing channel
# should be able to read on local clsoe
await chann.readExactly(addr data[0], 3)
# closing remote end
let closeFut = chann.pushEof()
# should still allow reading until buffer EOF
await chann.readExactly(addr data[3], 3)
2022-09-14 08:58:41 +00:00
expect LPStreamRemoteClosedError:
# this should fail now
await chann.readExactly(addr data[0], 3)
await chann.close()
await conn.close()
await closeFut
asyncTest "(remote close) - channel should close for reading by remote":
let
conn = TestBufferStream.new(
proc(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
)
chann = LPChannel.init(1, conn, true)
await chann.pushData(("Hello!").toBytes)
var data = newSeq[byte](6)
await chann.readExactly(addr data[0], 3)
let closeFut = chann.pushEof() # closing channel
let readFut = chann.readExactly(addr data[3], 3)
await allFutures(closeFut, readFut)
2022-09-14 08:58:41 +00:00
expect LPStreamRemoteClosedError:
await chann.readExactly(addr data[0], 6) # this should fail now
await chann.close()
await conn.close()
asyncTest "(remote close) - channel should allow writing on remote close":
let
testData = "Hello!".toBytes
conn = TestBufferStream.new(
proc(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
)
chann = LPChannel.init(1, conn, true)
await chann.pushEof() # closing channel
try:
await chann.writeLp(testData)
finally:
await chann.reset() # there's nobody reading the EOF!
await conn.close()
asyncTest "should not allow pushing data to channel when remote end closed":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
await chann.pushEof()
var buf: array[1, byte]
check:
(await chann.readOnce(addr buf[0], 1)) == 0
# EOF marker read
2022-09-14 08:58:41 +00:00
expect LPStreamClosedError:
await chann.pushData(@[byte(1)])
await chann.close()
await conn.close()
suite "channel reset":
asyncTest "channel should fail reading":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
await chann.reset()
var data = newSeq[byte](1)
2022-09-14 08:58:41 +00:00
expect LPStreamClosedError:
await chann.readExactly(addr data[0], 1)
await conn.close()
asyncTest "reset should complete read":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
let fut = chann.readExactly(addr data[0], 1)
await chann.reset()
2022-09-14 08:58:41 +00:00
expect LPStreamClosedError:
await fut
await conn.close()
asyncTest "reset should complete pushData":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
proc pushes() {.async.} = # pushes don't hang on reset
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
let push = pushes()
await chann.reset()
check await allFutures(push).withTimeout(100.millis)
await conn.close()
asyncTest "reset should complete both read and push":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
let futs = [chann.readExactly(addr data[0], 1), chann.pushData(@[0'u8])]
await chann.reset()
check await allFutures(futs).withTimeout(100.millis)
await conn.close()
asyncTest "reset should complete both read and pushes":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
let read = chann.readExactly(addr data[0], 1)
proc pushes() {.async.} =
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.reset()
check await allFutures(read, pushes()).withTimeout(100.millis)
await conn.close()
asyncTest "reset should complete both read and push with cancel":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
let rfut = chann.readExactly(addr data[0], 1)
rfut.cancel()
let xfut = chann.reset()
check await allFutures(rfut, xfut).withTimeout(100.millis)
await conn.close()
asyncTest "should complete both read and push after reset":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
let rfut = chann.readExactly(addr data[0], 1)
let rfut2 = sleepAsync(1.millis) or rfut
await sleepAsync(5.millis)
let wfut = chann.pushData(@[0'u8])
let wfut2 = chann.pushData(@[0'u8])
await chann.reset()
check await allFutures(rfut, rfut2, wfut, wfut2).withTimeout(100.millis)
await conn.close()
asyncTest "reset should complete ongoing push without reader":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
await chann.pushData(@[0'u8])
let push1 = chann.pushData(@[0'u8])
await chann.reset()
check await allFutures(push1).withTimeout(100.millis)
await conn.close()
asyncTest "reset should complete ongoing read without a push":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
let rfut = chann.readExactly(addr data[0], 1)
await chann.reset()
check await allFutures(rfut).withTimeout(100.millis)
await conn.close()
asyncTest "reset should allow all reads and pushes to complete":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
proc writer() {.async.} =
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
proc reader() {.async.} =
await chann.readExactly(addr data[0], 1)
await chann.readExactly(addr data[0], 1)
await chann.readExactly(addr data[0], 1)
let rw = @[writer(), reader()]
await chann.close()
check await chann.reset()
# this would hang
.withTimeout(100.millis)
check await allFuturesThrowing(allFinished(rw)).withTimeout(100.millis)
await conn.close()
asyncTest "channel should fail writing":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
await chann.reset()
expect LPStreamClosedError:
await chann.write(("Hello!").toBytes)
await conn.close()
2020-02-12 14:43:42 +00:00
asyncTest "channel should reset on timeout":
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true, timeout = 100.millis)
check await chann.join().withTimeout(1.minutes)
await conn.close()
suite "mplex e2e":
asyncTest "read/write receiver":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
2019-09-25 22:57:27 +00:00
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
except CancelledError, LPStreamError:
return
finally:
await stream.close()
2019-09-25 22:57:27 +00:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 22:57:27 +00:00
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
2019-09-25 22:57:27 +00:00
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
await stream.writeLp("HELLO")
check LPChannel(stream).isOpen # not lazy
await stream.close()
await conn.close()
await acceptFut.wait(1.seconds)
await mplexDialFut.wait(1.seconds)
await allFuturesThrowing(transport1.stop(), transport2.stop())
await listenFut
asyncTest "read/write receiver lazy":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
except CancelledError, LPStreamError:
return
finally:
await stream.close()
await mplexListen.handle()
await mplexListen.close()
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let stream = await mplexDial.newStream(lazy = true)
let mplexDialFut = mplexDial.handle()
check not LPChannel(stream).isOpen # assert lazy
await stream.writeLp("HELLO")
check LPChannel(stream).isOpen # assert lazy
await stream.close()
2019-09-25 22:57:27 +00:00
await conn.close()
await acceptFut.wait(1.seconds)
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
2020-03-27 23:37:00 +00:00
await listenFut
asyncTest "write fragmented":
let
2021-12-16 10:05:20 +00:00
ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
listenJob = newFuture[void]()
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
for _ in 0 ..< MaxMsgSize:
bigseq.add(uint8(rand(uint('A') .. uint('z'))))
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async.} =
try:
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(MaxMsgSize)
check msg == bigseq
trace "Bigseq check passed!"
except CancelledError, LPStreamError:
return
finally:
await stream.close()
listenJob.complete()
await mplexListen.handle()
await sleepAsync(1.seconds) # give chronos some slack to process things
await mplexListen.close()
except CancelledError as exc:
raise exc
except CatchableError as exc:
check false
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
await stream.writeLp(bigseq)
await listenJob.wait(10.seconds)
await stream.close()
await conn.close()
await acceptFut
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
2020-03-27 23:37:00 +00:00
await listenFut
asyncTest "read/write initiator":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
2019-09-25 22:57:27 +00:00
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
await stream.writeLp("Hello from stream!")
except CancelledError, LPStreamError:
return
finally:
await stream.close()
2019-09-25 22:57:27 +00:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 22:57:27 +00:00
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
2019-09-25 22:57:27 +00:00
let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
2019-09-25 22:57:27 +00:00
let stream = await mplexDial.newStream("DIALER")
let msg = string.fromBytes(await stream.readLp(1024))
await stream.close()
2019-09-25 22:57:27 +00:00
check msg == "Hello from stream!"
await conn.close()
await acceptFut.wait(1.seconds)
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
2020-03-27 23:37:00 +00:00
await listenFut
asyncTest "multiple streams":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
2019-09-25 22:57:27 +00:00
let transport1 = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
let done = newFuture[void]()
proc acceptHandler() {.async.} =
var count = 1
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
try:
check string.fromBytes(msg) == &"stream {count}!"
except ValueError as exc:
raiseAssert(exc.msg)
count.inc
if count == 11:
done.complete()
except CancelledError, LPStreamError:
return
finally:
await stream.close()
2019-09-25 22:57:27 +00:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 22:57:27 +00:00
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
2019-09-25 22:57:27 +00:00
let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn)
# TODO: Reenable once half-closed is working properly
let mplexDialFut = mplexDial.handle()
2020-03-27 23:37:00 +00:00
for i in 1 .. 10:
2019-09-25 22:57:27 +00:00
let stream = await mplexDial.newStream()
await stream.writeLp(&"stream {i}!")
await stream.close()
await done.wait(10.seconds)
await conn.close()
await acceptFut.wait(1.seconds)
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
2020-03-27 23:37:00 +00:00
await listenFut
asyncTest "multiple read/write streams":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
2019-09-25 22:57:27 +00:00
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
let done = newFuture[void]()
proc acceptHandler() {.async.} =
var count = 1
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
try:
check string.fromBytes(msg) == &"stream {count} from dialer!"
await stream.writeLp(&"stream {count} from listener!")
except ValueError as exc:
raiseAssert(exc.msg)
count.inc
if count == 11:
done.complete()
except CancelledError, LPStreamError:
return
finally:
await stream.close()
2019-09-25 22:57:27 +00:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 22:57:27 +00:00
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
2019-09-25 22:57:27 +00:00
let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
2019-09-25 22:57:27 +00:00
for i in 1 .. 10:
let stream = await mplexDial.newStream("dialer stream")
await stream.writeLp(&"stream {i} from dialer!")
2020-05-08 20:58:23 +00:00
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == &"stream {i} from listener!"
2019-09-25 22:57:27 +00:00
await stream.close()
await done.wait(5.seconds)
2019-09-25 22:57:27 +00:00
await conn.close()
await acceptFut.wait(1.seconds)
await mplexDialFut
await mplexDial.close()
await allFuturesThrowing(transport1.stop(), transport2.stop())
await listenFut
asyncTest "channel closes listener with EOF":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection]
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
try:
discard await stream.readLp(1024)
except LPStreamEOFError:
return
except CancelledError, LPStreamError:
return
finally:
await stream.close()
check false
await mplexListen.handle()
await mplexListen.close()
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream())
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
for i, s in dialStreams:
await s.closeWithEOF()
check listenStreams[i].closed
check s.closed
checkTracker(LPChannelTrackerName)
await conn.close()
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
await acceptFut
asyncTest "channel closes dialer with EOF":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade())
var count = 0
var done = newFuture[void]()
var listenStreams: seq[Connection]
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
count.inc()
if count == 10:
done.complete()
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream())
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
proc dialReadLoop() {.async.} =
for s in dialStreams:
try:
discard await s.readLp(1024)
check false
except LPStreamEOFError:
await s.close()
continue
check false
await done
let readLoop = dialReadLoop()
for s in listenStreams:
await s.closeWithEOF()
check s.closed
await readLoop
await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join()))
checkTracker(LPChannelTrackerName)
await conn.close()
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
await acceptFut
asyncTest "dialing mplex closes both ends":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection]
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream())
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
await mplexDial.close()
await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join()))
checkTracker(LPChannelTrackerName)
await conn.close()
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
await acceptFut
asyncTest "listening mplex closes both ends":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade())
var mplexListen: Mplex
var listenStreams: seq[Connection]
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream())
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkUntilTimeout:
listenStreams.len == 10 and dialStreams.len == 10
await mplexListen.close()
await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join()))
checkTracker(LPChannelTrackerName)
await conn.close()
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
await acceptFut
asyncTest "canceling mplex handler closes both ends":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade())
var mplexHandle: Future[void]
var listenStreams: seq[Connection]
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await noCancel stream.join()
mplexHandle = mplexListen.handle()
await mplexHandle
await mplexListen.close()
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream())
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkUntilTimeout:
listenStreams.len == 10 and dialStreams.len == 10
mplexHandle.cancel()
await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join()))
checkTracker(LPChannelTrackerName)
await conn.close()
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
asyncTest "closing dialing connection should close both ends":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection]
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream())
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkUntilTimeout:
listenStreams.len == 10 and dialStreams.len == 10
await conn.close()
await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join()))
checkTracker(LPChannelTrackerName)
await conn.closeWithEOF()
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
await acceptFut
asyncTest "canceling listening connection should close both ends":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenConn: Connection
var listenStreams: seq[Connection]
proc acceptHandler() {.async.} =
listenConn = await transport1.accept()
let mplexListen = Mplex.new(listenConn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
var dialStreams = toSeq(0 .. 9).mapIt(await mplexDial.newStream())
check:
unorderedCompare(dialStreams, mplexDial.getStreams())
checkUntilTimeout:
listenStreams.len == 10 and dialStreams.len == 10
await listenConn.closeWithEOF()
await allFuturesThrowing((dialStreams & listenStreams).mapIt(it.join()))
checkTracker(LPChannelTrackerName)
await conn.close()
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
await acceptFut
suite "jitter":
asyncTest "channel should be able to handle erratic read/writes":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
2020-03-11 15:12:08 +00:00
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
var complete = newFuture[void]()
const MsgSize = 1024
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
except CancelledError as e:
echo e.msg
except LPStreamError as e:
echo e.msg
await stream.close()
complete.complete()
2020-03-11 22:23:39 +00:00
await mplexListen.handle()
await mplexListen.close()
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
for _ in 0 ..< MsgSize: # write one less than max size
bigseq.add(uint8(rand(uint('A') .. uint('z'))))
## create length prefixed libp2p frame
var buf = initVBuffer()
buf.writeSeq(bigseq)
buf.finish()
## create mplex header
var mplexBuf = initVBuffer()
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
await conn.write(mplexBuf.buffer)
proc writer() {.async.} =
var sent = 0
randomize()
let total = buf.buffer.len
const min = 20
const max = 50
while sent < total:
var size = rand(min .. max)
size = if size > buf.buffer.len: buf.buffer.len else: size
var send = buf.buffer[0 ..< size]
await conn.write(send)
sent += size
buf.buffer = buf.buffer[size ..^ 1]
await writer()
await complete.wait(1.seconds)
await stream.close()
await conn.close()
await acceptFut
await mplexDialFut
2020-03-11 22:23:39 +00:00
await allFuturesThrowing(transport1.stop(), transport2.stop())
await listenFut
2020-03-11 22:23:39 +00:00
asyncTest "channel should handle 1 byte read/write":
2021-12-16 10:05:20 +00:00
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
2020-03-11 22:23:39 +00:00
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma)
var complete = newFuture[void]()
const MsgSize = 512
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
except CancelledError, LPStreamError:
return
finally:
await stream.close()
complete.complete()
2020-03-11 22:23:39 +00:00
await mplexListen.handle()
await mplexListen.close()
2020-03-11 22:23:39 +00:00
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.addrs[0])
2020-03-11 22:23:39 +00:00
let acceptFut = acceptHandler()
let mplexDial = Mplex.new(conn)
let stream = await mplexDial.newStream()
let mplexDialFut = mplexDial.handle()
var bigseq = newSeqOfCap[uint8](MsgSize + 1)
for _ in 0 ..< MsgSize: # write one less than max size
bigseq.add(uint8(rand(uint('A') .. uint('z'))))
2020-03-11 22:23:39 +00:00
## create length prefixed libp2p frame
var buf = initVBuffer()
buf.writeSeq(bigseq)
buf.finish()
2020-03-11 22:23:39 +00:00
## create mplex header
var mplexBuf = initVBuffer()
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
2020-03-11 15:12:08 +00:00
await conn.write(mplexBuf.buffer)
proc writer() {.async.} =
for i in buf.buffer:
await conn.write(@[i])
2020-03-11 15:12:08 +00:00
await writer()
2020-03-27 23:37:00 +00:00
await complete.wait(5.seconds)
await stream.close()
await conn.close()
await acceptFut
await mplexDialFut
await allFuturesThrowing(transport1.stop(), transport2.stop())
await listenFut