Add a safety check on message size limit when pushing new data in mplex
This commit is contained in:
parent
3531ebf772
commit
120ba0d528
|
@ -104,8 +104,12 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
of MessageType.MsgIn, MessageType.MsgOut:
|
of MessageType.MsgIn, MessageType.MsgOut:
|
||||||
trace "pushing data to channel", id = id,
|
trace "pushing data to channel", id = id,
|
||||||
initiator = initiator,
|
initiator = initiator,
|
||||||
msgType = msgType
|
msgType = msgType,
|
||||||
|
size = data.len
|
||||||
|
|
||||||
|
if data.len > MaxMsgSize:
|
||||||
|
raise newException(CatchableError,
|
||||||
|
"Message size over the limit of 1MiB per message.")
|
||||||
await channel.pushTo(data)
|
await channel.pushTo(data)
|
||||||
of MessageType.CloseIn, MessageType.CloseOut:
|
of MessageType.CloseIn, MessageType.CloseOut:
|
||||||
trace "closing channel", id = id,
|
trace "closing channel", id = id,
|
||||||
|
|
|
@ -179,6 +179,36 @@ suite "Mplex":
|
||||||
check:
|
check:
|
||||||
waitFor(testNewStream()) == true
|
waitFor(testNewStream()) == true
|
||||||
|
|
||||||
|
test "e2e - write limits":
|
||||||
|
proc testNewStream(): Future[bool] {.async.} =
|
||||||
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||||
|
|
||||||
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
||||||
|
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
|
||||||
|
let msg = await stream.readLp()
|
||||||
|
check cast[string](msg) == "Hello from stream!"
|
||||||
|
await stream.close()
|
||||||
|
|
||||||
|
let mplexListen = newMplex(conn)
|
||||||
|
mplexListen.streamHandler = handleMplexListen
|
||||||
|
discard mplexListen.handle()
|
||||||
|
|
||||||
|
let transport1: TcpTransport = newTransport(TcpTransport)
|
||||||
|
discard await transport1.listen(ma, connHandler)
|
||||||
|
|
||||||
|
let transport2: TcpTransport = newTransport(TcpTransport)
|
||||||
|
let conn = await transport2.dial(transport1.ma)
|
||||||
|
|
||||||
|
let mplexDial = newMplex(conn)
|
||||||
|
let stream = await mplexDial.newStream()
|
||||||
|
let bigseq = newSeq[uint8](MaxMsgSize + 1)
|
||||||
|
await stream.writeLp(bigseq)
|
||||||
|
await conn.close()
|
||||||
|
result = true
|
||||||
|
|
||||||
|
check:
|
||||||
|
waitFor(testNewStream()) == true
|
||||||
|
|
||||||
test "e2e - read/write initiator":
|
test "e2e - read/write initiator":
|
||||||
proc testNewStream(): Future[bool] {.async.} =
|
proc testNewStream(): Future[bool] {.async.} =
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||||
|
|
Loading…
Reference in New Issue