Mplex: Add the ability to send any size payload (#123)

* Mplex: Add the ability to send any size payload

* Ensure size of coder header
This commit is contained in:
Giovanni Petrantoni 2020-04-04 00:26:46 +09:00 committed by GitHub
parent 7f8090b166
commit e39bf0a4cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 26 deletions

View File

@ -76,18 +76,29 @@ proc writeMsg*(conn: Connection,
id: uint64, id: uint64,
msgType: MessageType, msgType: MessageType,
data: seq[byte] = @[]) {.async, gcsafe.} = data: seq[byte] = @[]) {.async, gcsafe.} =
trace "seding data over mplex", id, trace "sending data over mplex", id,
msgType, msgType,
data = data.len data = data.len
var
left = data.len
offset = 0
while left > 0 or data.len == 0:
let
chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left
chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data
## write lenght prefixed ## write lenght prefixed
var buf = initVBuffer() var buf = initVBuffer()
buf.writePBVarint(id shl 3 or ord(msgType).uint) buf.writePBVarint(id shl 3 or ord(msgType).uint64)
buf.writePBVarint(data.len().uint) # size should be always sent buf.writePBVarint(chunkSize.uint64) # size should be always sent
buf.finish() buf.finish()
left = left - chunkSize
offset = offset + chunkSize
try: try:
await conn.write(buf.buffer & data) await conn.write(buf.buffer & chunk)
except LPStreamIncompleteError as exc: except LPStreamIncompleteError as exc:
trace "unable to send message", exc = exc.msg trace "unable to send message", exc = exc.msg
if data.len == 0:
return
proc writeMsg*(conn: Connection, proc writeMsg*(conn: Connection,
id: uint64, id: uint64,

View File

@ -186,24 +186,23 @@ suite "Mplex":
check: check:
waitFor(testNewStream()) == true waitFor(testNewStream()) == true
test "e2e - write limits": test "e2e - write fragmented":
proc testNewStream(): Future[bool] {.async.} = proc testNewStream(): Future[bool] {.async.} =
let let
ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
listenJob = newFuture[void]() listenJob = newFuture[void]()
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
for _ in 0..<MaxMsgSize:
bigseq.add(uint8(rand(uint('A')..uint('z'))))
proc connHandler(conn: Connection) {.async, gcsafe.} = proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
defer: defer:
await stream.close() await stream.close()
let msg = await stream.readLp()
try: check msg == bigseq
discard await stream.readLp() trace "Bigseq check passed!"
except CatchableError:
return
# we should not reach this anyway!!
check false
listenJob.complete() listenJob.complete()
let mplexListen = newMplex(conn) let mplexListen = newMplex(conn)
@ -220,16 +219,12 @@ suite "Mplex":
let mplexDial = newMplex(conn) let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream() let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
for _ in 0..<MaxMsgSize:
bigseq.add(uint8(rand(uint('A')..uint('z'))))
try:
await stream.writeLp(bigseq) await stream.writeLp(bigseq)
await listenJob.wait(millis(500)) try:
await listenJob.wait(millis(5000))
except AsyncTimeoutError: except AsyncTimeoutError:
# we want to time out here! check false
discard
result = true result = true