From 120ba0d52804e55044d622825d3ec121a632d780 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Thu, 6 Feb 2020 15:24:11 +0900 Subject: [PATCH] Add a safety check on message size limit when pushing new data in mplex --- libp2p/muxers/mplex/mplex.nim | 6 +++++- tests/testmplex.nim | 30 ++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index d7d24c445..c0e018159 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -104,8 +104,12 @@ method handle*(m: Mplex) {.async, gcsafe.} = of MessageType.MsgIn, MessageType.MsgOut: trace "pushing data to channel", id = id, initiator = initiator, - msgType = msgType + msgType = msgType, + size = data.len + if data.len > MaxMsgSize: + raise newException(CatchableError, + "Message size over the limit of 1MiB per message.") await channel.pushTo(data) of MessageType.CloseIn, MessageType.CloseOut: trace "closing channel", id = id, diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 415332136..3015b5ba4 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -179,6 +179,36 @@ suite "Mplex": check: waitFor(testNewStream()) == true + test "e2e - write limits": + proc testNewStream(): Future[bool] {.async.} = + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + + proc connHandler(conn: Connection) {.async, gcsafe.} = + proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + let msg = await stream.readLp() + check cast[string](msg) == "Hello from stream!" + await stream.close() + + let mplexListen = newMplex(conn) + mplexListen.streamHandler = handleMplexListen + discard mplexListen.handle() + + let transport1: TcpTransport = newTransport(TcpTransport) + discard await transport1.listen(ma, connHandler) + + let transport2: TcpTransport = newTransport(TcpTransport) + let conn = await transport2.dial(transport1.ma) + + let mplexDial = newMplex(conn) + let stream = await mplexDial.newStream() + let bigseq = newSeq[uint8](MaxMsgSize + 1) + await stream.writeLp(bigseq) + await conn.close() + result = true + + check: + waitFor(testNewStream()) == true + test "e2e - read/write initiator": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")