added 1 byte jitter test

This commit is contained in:
Dmitriy Ryajov 2020-03-11 16:23:39 -06:00
parent 406b79887d
commit 5c234f704d
3 changed files with 69 additions and 12 deletions

View File

@ -57,8 +57,7 @@ proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
var data: seq[byte] = newSeq[byte](dataLenVarint.int) var data: seq[byte] = newSeq[byte](dataLenVarint.int)
if dataLenVarint.int > 0: if dataLenVarint.int > 0:
while data.len < dataLenVarint.int: await conn.readExactly(addr data[0], dataLenVarint.int)
data &= await conn.read(dataLenVarint.int - data.len)
trace "read data", data = data trace "read data", data = data
let header = headerVarint let header = headerVarint

View File

@ -104,7 +104,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
size = data.len size = data.len
if data.len > MaxMsgSize: if data.len > MaxMsgSize:
raise newLPStreamLimitError(); raise newLPStreamLimitError()
await channel.pushTo(data) await channel.pushTo(data)
of MessageType.CloseIn, MessageType.CloseOut: of MessageType.CloseIn, MessageType.CloseOut:
trace "closing channel", id = id, trace "closing channel", id = id,

View File

@ -383,7 +383,7 @@ suite "Mplex":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var complete = newFuture[void]() var complete = newFuture[void]()
const MsgSize = 512 * 10 const MsgSize = 1024
proc connHandler(conn: Connection) {.async, gcsafe.} = proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp() let msg = await stream.readLp()
@ -425,15 +425,73 @@ suite "Mplex":
var sent = 0 var sent = 0
randomize() randomize()
let total = buf.buffer.len let total = buf.buffer.len
const max = 500 const min = 20
const min = 100 const max = 50
while sent < total: while sent < total:
var len = rand(min..max) var size = rand(min..max)
len = if len > buf.buffer.len: buf.buffer.len else: len size = if size > buf.buffer.len: buf.buffer.len else: size
var send = buf.buffer[0..<len-1] var send = buf.buffer[0..<size]
await conn.write(send) await conn.write(send)
sent += len sent += size
buf.buffer = buf.buffer[len..^1] buf.buffer = buf.buffer[size..^1]
await writer()
await stream.close()
await conn.close()
await complete
result = true
check:
waitFor(test()) == true
test "jitter - channel should handle 1 byte read/write":
proc test(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var complete = newFuture[void]()
const MsgSize = 512
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
check msg.len == MsgSize
await stream.close()
complete.complete()
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
discard mplexListen.handle()
let transport1: TcpTransport = newTransport(TcpTransport)
discard await transport1.listen(ma, connHandler)
defer:
await transport1.close()
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
for _ in 0..<MsgSize: # write one less than max size
bigseq.add(uint8(rand(uint('A')..uint('z'))))
## create lenght prefixed libp2p frame
var buf = initVBuffer()
buf.writeSeq(bigseq)
buf.finish()
## create mplex header
var mplexBuf = initVBuffer()
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
await conn.write(mplexBuf.buffer)
proc writer() {.async.} =
for i in buf.buffer:
await conn.write(@[i])
await writer() await writer()