diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index bd15ef7..cc0f8c0 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -76,18 +76,29 @@ proc writeMsg*(conn: Connection, id: uint64, msgType: MessageType, data: seq[byte] = @[]) {.async, gcsafe.} = - trace "seding data over mplex", id, + trace "sending data over mplex", id, msgType, data = data.len - ## write lenght prefixed - var buf = initVBuffer() - buf.writePBVarint(id shl 3 or ord(msgType).uint) - buf.writePBVarint(data.len().uint) # size should be always sent - buf.finish() - try: - await conn.write(buf.buffer & data) - except LPStreamIncompleteError as exc: - trace "unable to send message", exc = exc.msg + var + left = data.len + offset = 0 + while left > 0 or data.len == 0: + let + chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left + chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data + ## write lenght prefixed + var buf = initVBuffer() + buf.writePBVarint(id shl 3 or ord(msgType).uint64) + buf.writePBVarint(chunkSize.uint64) # size should be always sent + buf.finish() + left = left - chunkSize + offset = offset + chunkSize + try: + await conn.write(buf.buffer & chunk) + except LPStreamIncompleteError as exc: + trace "unable to send message", exc = exc.msg + if data.len == 0: + return proc writeMsg*(conn: Connection, id: uint64, diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 1a14ad1..b538152 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -186,24 +186,23 @@ suite "Mplex": check: waitFor(testNewStream()) == true - test "e2e - write limits": + test "e2e - write fragmented": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") listenJob = newFuture[void]() + var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2) + for _ in 0..