diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 14a23ee..0cd480a 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -57,9 +57,8 @@ proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} = var data: seq[byte] = newSeq[byte](dataLenVarint.int) if dataLenVarint.int > 0: - while data.len < dataLenVarint.int: - data &= await conn.read(dataLenVarint.int - data.len) - trace "read data", data = data + await conn.readExactly(addr data[0], dataLenVarint.int) + trace "read data", data = data let header = headerVarint result = (header shr 3, MessageType(header and 0x7), data) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 0f4be1e..a48c463 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -104,7 +104,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = size = data.len if data.len > MaxMsgSize: - raise newLPStreamLimitError(); + raise newLPStreamLimitError() await channel.pushTo(data) of MessageType.CloseIn, MessageType.CloseOut: trace "closing channel", id = id, diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 735c7dd..d8f48ba 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -383,7 +383,7 @@ suite "Mplex": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") var complete = newFuture[void]() - const MsgSize = 512 * 10 + const MsgSize = 1024 proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp() @@ -425,15 +425,73 @@ suite "Mplex": var sent = 0 randomize() let total = buf.buffer.len - const max = 500 - const min = 100 + const min = 20 + const max = 50 while sent < total: - var len = rand(min..max) - len = if len > buf.buffer.len: buf.buffer.len else: len - var send = buf.buffer[0.. buf.buffer.len: buf.buffer.len else: size + var send = buf.buffer[0..