mirror of https://github.com/vacp2p/nim-libp2p.git
add jitter tolerance to mplex
This commit is contained in:
parent
82af623641
commit
4fc84cbe81
|
@ -10,7 +10,9 @@ import ../libp2p/[connection,
|
||||||
muxers/mplex/mplex,
|
muxers/mplex/mplex,
|
||||||
muxers/mplex/coder,
|
muxers/mplex/coder,
|
||||||
muxers/mplex/types,
|
muxers/mplex/types,
|
||||||
muxers/mplex/lpchannel]
|
muxers/mplex/lpchannel,
|
||||||
|
vbuffer,
|
||||||
|
varint]
|
||||||
|
|
||||||
when defined(nimHasUsed): {.used.}
|
when defined(nimHasUsed): {.used.}
|
||||||
|
|
||||||
|
@ -376,6 +378,77 @@ suite "Mplex":
|
||||||
# expect LPStreamEOFError:
|
# expect LPStreamEOFError:
|
||||||
# waitFor(testClosedForRead())
|
# waitFor(testClosedForRead())
|
||||||
|
|
||||||
|
test "jitter - channel should be able to handle erratic read/writes":
|
||||||
|
proc test(): Future[bool] {.async.} =
|
||||||
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||||
|
|
||||||
|
var complete = newFuture[void]()
|
||||||
|
const MsgSize = 512 * 10
|
||||||
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
||||||
|
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
|
||||||
|
let msg = await stream.readLp()
|
||||||
|
check msg.len == MsgSize
|
||||||
|
await stream.close()
|
||||||
|
complete.complete()
|
||||||
|
|
||||||
|
let mplexListen = newMplex(conn)
|
||||||
|
mplexListen.streamHandler = handleMplexListen
|
||||||
|
discard mplexListen.handle()
|
||||||
|
|
||||||
|
let transport1: TcpTransport = newTransport(TcpTransport)
|
||||||
|
discard await transport1.listen(ma, connHandler)
|
||||||
|
|
||||||
|
defer:
|
||||||
|
await transport1.close()
|
||||||
|
|
||||||
|
let transport2: TcpTransport = newTransport(TcpTransport)
|
||||||
|
let conn = await transport2.dial(transport1.ma)
|
||||||
|
|
||||||
|
let mplexDial = newMplex(conn)
|
||||||
|
let stream = await mplexDial.newStream()
|
||||||
|
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
|
||||||
|
for _ in 0..<MsgSize: # write one less than max size
|
||||||
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
||||||
|
|
||||||
|
## create lenght prefixed libp2p frame
|
||||||
|
var buf = initVBuffer()
|
||||||
|
buf.writeSeq(bigseq)
|
||||||
|
buf.finish()
|
||||||
|
|
||||||
|
## create mplex header
|
||||||
|
var mplexBuf = initVBuffer()
|
||||||
|
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
|
||||||
|
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
|
||||||
|
|
||||||
|
await conn.write(mplexBuf.buffer) # write out mplex header
|
||||||
|
proc writer() {.async.} =
|
||||||
|
var sent = 0
|
||||||
|
randomize()
|
||||||
|
let total = buf.buffer.len
|
||||||
|
const max = 500
|
||||||
|
const min = 100
|
||||||
|
while sent < total:
|
||||||
|
var len = 0
|
||||||
|
while len > max or len <= 0:
|
||||||
|
len = rand(min..max)
|
||||||
|
|
||||||
|
len = if len > buf.buffer.len: buf.buffer.len else: len
|
||||||
|
var send = buf.buffer[0..<len-1]
|
||||||
|
await conn.write(send)
|
||||||
|
sent += len
|
||||||
|
buf.buffer = buf.buffer[len..^1]
|
||||||
|
|
||||||
|
await writer()
|
||||||
|
|
||||||
|
await stream.close()
|
||||||
|
await conn.close()
|
||||||
|
await complete
|
||||||
|
|
||||||
|
result = true
|
||||||
|
|
||||||
|
check:
|
||||||
|
waitFor(test()) == true
|
||||||
|
|
||||||
test "reset - channel should fail reading":
|
test "reset - channel should fail reading":
|
||||||
proc testResetRead(): Future[void] {.async.} =
|
proc testResetRead(): Future[void] {.async.} =
|
||||||
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
||||||
|
|
Loading…
Reference in New Issue