refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
import unittest, strformat, strformat, random, oids
|
2020-05-18 13:49:49 +00:00
|
|
|
import chronos, nimcrypto/utils, chronicles, stew/byteutils
|
2020-04-21 01:24:42 +00:00
|
|
|
import ../libp2p/[errors,
|
2020-06-19 17:29:43 +00:00
|
|
|
stream/connection,
|
2019-10-29 18:51:48 +00:00
|
|
|
stream/bufferstream,
|
|
|
|
transports/tcptransport,
|
|
|
|
transports/transport,
|
2019-10-03 19:30:22 +00:00
|
|
|
multiaddress,
|
2019-10-29 18:51:48 +00:00
|
|
|
muxers/mplex/mplex,
|
|
|
|
muxers/mplex/coder,
|
2020-03-11 15:12:08 +00:00
|
|
|
muxers/mplex/lpchannel,
|
|
|
|
vbuffer,
|
|
|
|
varint]
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-05-08 20:10:06 +00:00
|
|
|
import ./helpers
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
{.used.}
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2019-09-03 20:40:51 +00:00
|
|
|
suite "Mplex":
|
2020-04-21 01:24:42 +00:00
|
|
|
teardown:
|
2020-09-21 17:48:19 +00:00
|
|
|
checkTrackers()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2019-09-07 06:20:03 +00:00
|
|
|
test "encode header with channel id 0":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testEncodeHeader() {.async.} =
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("000873747265616d2031")
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testEncodeHeader())
|
2019-09-07 06:20:03 +00:00
|
|
|
|
|
|
|
test "encode header with channel id other than 0":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testEncodeHeader() {.async.} =
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("88010873747265616d2031")
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testEncodeHeader())
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2019-09-07 06:20:03 +00:00
|
|
|
test "encode header and body with channel id 0":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testEncodeHeaderBody() {.async.} =
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("020873747265616d2031")
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testEncodeHeaderBody())
|
2019-09-07 06:20:03 +00:00
|
|
|
|
|
|
|
test "encode header and body with channel id other than 0":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testEncodeHeaderBody() {.async.} =
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("8a010873747265616d2031")
|
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
|
2019-09-07 06:20:03 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testEncodeHeaderBody())
|
2019-09-07 06:20:03 +00:00
|
|
|
|
|
|
|
test "decode header with channel id 0":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testDecodeHeader() {.async.} =
|
2019-12-06 02:16:18 +00:00
|
|
|
let stream = newBufferStream()
|
2020-06-19 17:29:43 +00:00
|
|
|
let conn = stream
|
2020-09-21 17:48:19 +00:00
|
|
|
await stream.pushData(fromHex("000873747265616d2031"))
|
2019-09-08 06:32:41 +00:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-02-12 14:43:42 +00:00
|
|
|
check msg.id == 0
|
|
|
|
check msg.msgType == MessageType.New
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testDecodeHeader())
|
2019-09-07 06:20:03 +00:00
|
|
|
|
|
|
|
test "decode header and body with channel id 0":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testDecodeHeader() {.async.} =
|
2019-12-06 02:16:18 +00:00
|
|
|
let stream = newBufferStream()
|
2020-06-19 17:29:43 +00:00
|
|
|
let conn = stream
|
2020-09-21 17:48:19 +00:00
|
|
|
await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
|
2019-09-08 06:32:41 +00:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-07 06:20:03 +00:00
|
|
|
|
2020-02-12 14:43:42 +00:00
|
|
|
check msg.id == 0
|
|
|
|
check msg.msgType == MessageType.MsgOut
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg.data) == "hello from channel 0!!"
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testDecodeHeader())
|
2019-09-07 06:20:03 +00:00
|
|
|
|
|
|
|
test "decode header and body with channel id other than 0":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testDecodeHeader() {.async.} =
|
2019-12-06 02:16:18 +00:00
|
|
|
let stream = newBufferStream()
|
2020-06-19 17:29:43 +00:00
|
|
|
let conn = stream
|
2020-09-21 17:48:19 +00:00
|
|
|
await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121"))
|
2019-09-08 06:32:41 +00:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-07 06:20:03 +00:00
|
|
|
|
2020-02-12 14:43:42 +00:00
|
|
|
check msg.id == 17
|
|
|
|
check msg.msgType == MessageType.MsgOut
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg.data) == "hello from channel 0!!"
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testDecodeHeader())
|
2019-09-07 06:20:03 +00:00
|
|
|
|
2020-08-12 05:23:49 +00:00
|
|
|
test "half closed (local close) - should close for write":
|
2020-05-18 17:05:34 +00:00
|
|
|
proc testClosedForWrite(): Future[bool] {.async.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
try:
|
|
|
|
await chann.write("Hello")
|
2020-05-20 00:14:15 +00:00
|
|
|
except LPStreamClosedError:
|
2020-05-18 17:05:34 +00:00
|
|
|
result = true
|
2020-05-18 13:49:49 +00:00
|
|
|
finally:
|
|
|
|
await chann.reset()
|
|
|
|
await conn.close()
|
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
check:
|
|
|
|
waitFor(testClosedForWrite()) == true
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-08-12 05:23:49 +00:00
|
|
|
test "half closed (local close) - should allow reads until remote closes":
|
|
|
|
proc testOpenForRead(): Future[bool] {.async.} =
|
|
|
|
let
|
|
|
|
conn = newBufferStream(
|
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
|
|
|
discard,
|
|
|
|
)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushData(("Hello!").toBytes)
|
2020-08-12 05:23:49 +00:00
|
|
|
|
|
|
|
var data = newSeq[byte](6)
|
|
|
|
await chann.close() # closing channel
|
|
|
|
# should be able to read on local clsoe
|
|
|
|
await chann.readExactly(addr data[0], 3)
|
|
|
|
# closing remote end
|
2020-09-21 17:48:19 +00:00
|
|
|
let closeFut = chann.pushEof()
|
2020-08-12 05:23:49 +00:00
|
|
|
# should still allow reading until buffer EOF
|
|
|
|
await chann.readExactly(addr data[3], 3)
|
|
|
|
try:
|
|
|
|
# this should fail now
|
|
|
|
await chann.readExactly(addr data[0], 3)
|
|
|
|
except LPStreamEOFError:
|
|
|
|
result = true
|
|
|
|
finally:
|
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
await closeFut
|
2020-08-12 05:23:49 +00:00
|
|
|
|
|
|
|
check:
|
|
|
|
waitFor(testOpenForRead()) == true
|
|
|
|
|
|
|
|
test "half closed (remote close) - channel should close for reading by remote":
|
2020-05-18 17:05:34 +00:00
|
|
|
proc testClosedForRead(): Future[bool] {.async.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(
|
2020-05-18 13:49:49 +00:00
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
2020-08-10 22:17:11 +00:00
|
|
|
discard,
|
2020-06-19 17:29:43 +00:00
|
|
|
)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushData(("Hello!").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
var data = newSeq[byte](6)
|
2020-08-10 22:17:11 +00:00
|
|
|
await chann.readExactly(addr data[0], 3)
|
2020-09-21 17:48:19 +00:00
|
|
|
let closeFut = chann.pushEof() # closing channel
|
2020-08-10 22:17:11 +00:00
|
|
|
let readFut = chann.readExactly(addr data[3], 3)
|
|
|
|
await all(closeFut, readFut)
|
2020-05-18 17:05:34 +00:00
|
|
|
try:
|
2020-08-10 22:17:11 +00:00
|
|
|
await chann.readExactly(addr data[0], 6) # this should fail now
|
2020-05-18 17:05:34 +00:00
|
|
|
except LPStreamEOFError:
|
|
|
|
result = true
|
2020-05-18 13:49:49 +00:00
|
|
|
finally:
|
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
check:
|
|
|
|
waitFor(testClosedForRead()) == true
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
test "half closed (remote close) - channel should allow writing on remote close":
|
2020-08-10 22:17:11 +00:00
|
|
|
proc testClosedForRead(): Future[bool] {.async.} =
|
|
|
|
let
|
|
|
|
testData = "Hello!".toBytes
|
|
|
|
conn = newBufferStream(
|
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
|
|
|
discard
|
|
|
|
)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushEof() # closing channel
|
2020-08-10 22:17:11 +00:00
|
|
|
try:
|
|
|
|
await chann.writeLp(testData)
|
|
|
|
return true
|
|
|
|
finally:
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.reset() # there's nobody reading the EOF!
|
2020-08-10 22:17:11 +00:00
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
check:
|
|
|
|
waitFor(testClosedForRead()) == true
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
test "should not allow pushing data to channel when remote end closed":
|
2020-05-18 17:05:34 +00:00
|
|
|
proc testResetWrite(): Future[bool] {.async.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushEof()
|
|
|
|
var buf: array[1, byte]
|
|
|
|
check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read
|
2020-05-18 13:49:49 +00:00
|
|
|
try:
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushData(@[byte(1)])
|
2020-05-18 17:05:34 +00:00
|
|
|
except LPStreamEOFError:
|
|
|
|
result = true
|
2020-05-18 13:49:49 +00:00
|
|
|
finally:
|
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
check:
|
|
|
|
waitFor(testResetWrite()) == true
|
2020-05-18 13:49:49 +00:00
|
|
|
|
|
|
|
test "reset - channel should fail reading":
|
2020-05-18 17:05:34 +00:00
|
|
|
proc testResetRead(): Future[bool] {.async.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.reset()
|
|
|
|
var data = newSeq[byte](1)
|
2020-05-18 13:49:49 +00:00
|
|
|
try:
|
|
|
|
await chann.readExactly(addr data[0], 1)
|
2020-05-18 17:05:34 +00:00
|
|
|
except LPStreamEOFError:
|
|
|
|
result = true
|
2020-05-18 13:49:49 +00:00
|
|
|
finally:
|
|
|
|
await conn.close()
|
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
check:
|
|
|
|
waitFor(testResetRead()) == true
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
test "reset - should complete read":
|
|
|
|
proc testResetRead(): Future[bool] {.async.} =
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let fut = chann.readExactly(addr data[0], 1)
|
|
|
|
await chann.reset()
|
|
|
|
try:
|
|
|
|
await fut
|
|
|
|
except LPStreamEOFError:
|
|
|
|
result = true
|
|
|
|
finally:
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
check:
|
|
|
|
waitFor(testResetRead()) == true
|
|
|
|
|
|
|
|
test "reset - should complete pushData":
|
|
|
|
proc testResetRead(): Future[bool] {.async.} =
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
let fut = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
|
|
|
result = await fut.withTimeout(100.millis)
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
check:
|
|
|
|
waitFor(testResetRead()) == true
|
|
|
|
|
|
|
|
test "reset - should complete both read and push":
|
|
|
|
proc testResetRead(): Future[bool] {.async.} =
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let rfut = chann.readExactly(addr data[0], 1)
|
|
|
|
let wfut = chann.pushData(@[0'u8])
|
|
|
|
let wfut2 = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
|
|
|
result = await allFutures(rfut, wfut, wfut2).withTimeout(100.millis)
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
check:
|
|
|
|
waitFor(testResetRead()) == true
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
test "reset - channel should fail writing":
|
2020-05-18 17:05:34 +00:00
|
|
|
proc testResetWrite(): Future[bool] {.async.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.reset()
|
2020-05-18 13:49:49 +00:00
|
|
|
try:
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.write(("Hello!").toBytes)
|
2020-05-20 00:14:15 +00:00
|
|
|
except LPStreamClosedError:
|
2020-05-18 17:05:34 +00:00
|
|
|
result = true
|
2020-05-18 13:49:49 +00:00
|
|
|
finally:
|
|
|
|
await conn.close()
|
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
check:
|
|
|
|
waitFor(testResetWrite()) == true
|
2020-02-12 14:43:42 +00:00
|
|
|
|
2020-08-10 22:17:11 +00:00
|
|
|
test "reset - channel should reset on timeout":
|
|
|
|
proc testResetWrite(): Future[bool] {.async.} =
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(
|
|
|
|
1, conn, true, timeout = 100.millis)
|
2020-07-17 18:44:41 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
check await chann.closeEvent.wait().withTimeout(1.minutes)
|
2020-08-10 22:17:11 +00:00
|
|
|
await conn.close()
|
|
|
|
result = true
|
2020-07-17 18:44:41 +00:00
|
|
|
|
2020-08-10 22:17:11 +00:00
|
|
|
check:
|
|
|
|
waitFor(testResetWrite())
|
2020-07-17 18:44:41 +00:00
|
|
|
|
2019-09-25 22:57:27 +00:00
|
|
|
test "e2e - read/write receiver":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testNewStream() {.async.} =
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
var done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == "HELLO"
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-05-18 13:49:49 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-02-12 14:43:42 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-02-11 17:30:36 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.writeLp("HELLO")
|
2020-06-19 17:29:43 +00:00
|
|
|
check LPChannel(stream).isOpen # not lazy
|
2020-04-21 01:24:42 +00:00
|
|
|
await stream.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
|
|
|
|
await done.wait(1.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-06-29 15:15:31 +00:00
|
|
|
await mplexDialFut.wait(1.seconds)
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-05-18 13:49:49 +00:00
|
|
|
await listenFut
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testNewStream())
|
2020-02-11 17:30:36 +00:00
|
|
|
|
|
|
|
test "e2e - read/write receiver lazy":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testNewStream() {.async.} =
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
var done = newFuture[void]()
|
2020-02-11 17:30:36 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == "HELLO"
|
2020-02-11 17:30:36 +00:00
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
done.complete()
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2020-02-11 17:30:36 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let stream = await mplexDial.newStream(lazy = true)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-06-19 17:29:43 +00:00
|
|
|
check not LPChannel(stream).isOpen # assert lazy
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.writeLp("HELLO")
|
2020-06-19 17:29:43 +00:00
|
|
|
check LPChannel(stream).isOpen # assert lazy
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await done.wait(1.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testNewStream())
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-03 15:26:46 +00:00
|
|
|
test "e2e - write fragmented":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testNewStream() {.async.} =
|
2020-02-07 06:41:24 +00:00
|
|
|
let
|
2020-05-31 14:22:49 +00:00
|
|
|
ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-02-09 03:46:52 +00:00
|
|
|
listenJob = newFuture[void]()
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2020-04-03 15:26:46 +00:00
|
|
|
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
|
|
|
|
for _ in 0..<MaxMsgSize:
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
|
|
|
|
2020-02-06 06:24:11 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-08-10 22:17:11 +00:00
|
|
|
try:
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
let msg = await stream.readLp(MaxMsgSize)
|
|
|
|
check msg == bigseq
|
|
|
|
trace "Bigseq check passed!"
|
|
|
|
await stream.close()
|
|
|
|
listenJob.complete()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await sleepAsync(1.seconds) # give chronos some slack to process things
|
|
|
|
await mplexListen.close()
|
|
|
|
except CancelledError as exc:
|
|
|
|
raise exc
|
|
|
|
except CatchableError as exc:
|
|
|
|
check false
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2020-02-06 06:24:11 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-02-06 06:24:11 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
2020-03-27 14:25:52 +00:00
|
|
|
|
2020-04-03 15:26:46 +00:00
|
|
|
await stream.writeLp(bigseq)
|
2020-02-11 05:48:52 +00:00
|
|
|
try:
|
2020-05-18 13:49:49 +00:00
|
|
|
await listenJob.wait(10.seconds)
|
2020-02-11 05:48:52 +00:00
|
|
|
except AsyncTimeoutError:
|
2020-04-03 15:26:46 +00:00
|
|
|
check false
|
2020-03-27 14:25:52 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testNewStream())
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2019-09-25 22:57:27 +00:00
|
|
|
test "e2e - read/write initiator":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testNewStream() {.async.} =
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
let done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.writeLp("Hello from stream!")
|
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2019-09-25 22:57:27 +00:00
|
|
|
let stream = await mplexDial.newStream("DIALER")
|
2020-05-18 17:05:34 +00:00
|
|
|
let msg = string.fromBytes(await stream.readLp(1024))
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
check msg == "Hello from stream!"
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await done.wait(1.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testNewStream())
|
2019-09-25 22:57:27 +00:00
|
|
|
|
|
|
|
test "e2e - multiple streams":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testNewStream() {.async.} =
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
let done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
var count = 1
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 13:49:49 +00:00
|
|
|
check string.fromBytes(msg) == &"stream {count}!"
|
2019-09-25 22:57:27 +00:00
|
|
|
count.inc
|
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
if count == 10:
|
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1 = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
# TODO: Reenable once half-closed is working properly
|
2020-05-20 00:14:15 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-03-27 23:37:00 +00:00
|
|
|
for i in 1..10:
|
2019-09-25 22:57:27 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
await stream.writeLp(&"stream {i}!")
|
|
|
|
await stream.close()
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await done.wait(10.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-05-20 00:14:15 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testNewStream())
|
2019-09-25 22:57:27 +00:00
|
|
|
|
|
|
|
test "e2e - multiple read/write streams":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc testNewStream() {.async.} =
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
let done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
var count = 1
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == &"stream {count} from dialer!"
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.writeLp(&"stream {count} from listener!")
|
|
|
|
count.inc
|
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
if count == 10:
|
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-05-18 13:49:49 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2019-09-25 22:57:27 +00:00
|
|
|
for i in 1..10:
|
|
|
|
let stream = await mplexDial.newStream("dialer stream")
|
|
|
|
await stream.writeLp(&"stream {i} from dialer!")
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == &"stream {i} from listener!"
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.close()
|
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await done.wait(5.seconds)
|
2019-09-25 22:57:27 +00:00
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-09-21 17:48:19 +00:00
|
|
|
await mplexDial.close()
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-05-18 13:49:49 +00:00
|
|
|
await listenFut
|
2020-05-15 04:02:05 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(testNewStream())
|
2019-09-04 06:40:11 +00:00
|
|
|
|
2020-03-11 15:12:08 +00:00
|
|
|
test "jitter - channel should be able to handle erratic read/writes":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc test() {.async.} =
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-03-11 15:12:08 +00:00
|
|
|
|
|
|
|
var complete = newFuture[void]()
|
2020-03-11 22:23:39 +00:00
|
|
|
const MsgSize = 1024
|
2020-03-11 15:12:08 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
try:
|
|
|
|
let msg = await stream.readLp(MsgSize)
|
|
|
|
check msg.len == MsgSize
|
|
|
|
except CatchableError as e:
|
|
|
|
echo e.msg
|
2020-03-11 15:12:08 +00:00
|
|
|
await stream.close()
|
|
|
|
complete.complete()
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
2020-03-11 15:12:08 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-03-11 15:12:08 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2020-03-11 15:12:08 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-03-11 15:12:08 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
|
|
|
|
for _ in 0..<MsgSize: # write one less than max size
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
|
|
|
|
|
|
|
## create lenght prefixed libp2p frame
|
|
|
|
var buf = initVBuffer()
|
|
|
|
buf.writeSeq(bigseq)
|
|
|
|
buf.finish()
|
|
|
|
|
|
|
|
## create mplex header
|
|
|
|
var mplexBuf = initVBuffer()
|
|
|
|
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
|
|
|
|
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
|
|
|
|
|
2020-03-11 15:36:45 +00:00
|
|
|
await conn.write(mplexBuf.buffer)
|
2020-03-11 15:12:08 +00:00
|
|
|
proc writer() {.async.} =
|
|
|
|
var sent = 0
|
|
|
|
randomize()
|
|
|
|
let total = buf.buffer.len
|
2020-03-11 22:23:39 +00:00
|
|
|
const min = 20
|
|
|
|
const max = 50
|
2020-03-11 15:12:08 +00:00
|
|
|
while sent < total:
|
2020-03-11 22:23:39 +00:00
|
|
|
var size = rand(min..max)
|
|
|
|
size = if size > buf.buffer.len: buf.buffer.len else: size
|
|
|
|
var send = buf.buffer[0..<size]
|
2020-03-11 15:12:08 +00:00
|
|
|
await conn.write(send)
|
2020-03-11 22:23:39 +00:00
|
|
|
sent += size
|
|
|
|
buf.buffer = buf.buffer[size..^1]
|
|
|
|
|
|
|
|
await writer()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
await complete.wait(1.seconds)
|
2020-03-11 22:23:39 +00:00
|
|
|
|
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(test())
|
2020-03-11 22:23:39 +00:00
|
|
|
|
|
|
|
test "jitter - channel should handle 1 byte read/write":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc test() {.async.} =
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-03-11 22:23:39 +00:00
|
|
|
|
|
|
|
var complete = newFuture[void]()
|
|
|
|
const MsgSize = 512
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(MsgSize)
|
2020-03-11 22:23:39 +00:00
|
|
|
check msg.len == MsgSize
|
|
|
|
await stream.close()
|
|
|
|
complete.complete()
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2020-03-11 22:23:39 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-03-11 22:23:39 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var bigseq = newSeqOfCap[uint8](MsgSize + 1)
|
2020-03-11 22:23:39 +00:00
|
|
|
for _ in 0..<MsgSize: # write one less than max size
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
|
|
|
|
|
|
|
## create lenght prefixed libp2p frame
|
|
|
|
var buf = initVBuffer()
|
|
|
|
buf.writeSeq(bigseq)
|
|
|
|
buf.finish()
|
|
|
|
|
|
|
|
## create mplex header
|
|
|
|
var mplexBuf = initVBuffer()
|
|
|
|
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
|
|
|
|
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
|
|
|
|
|
|
|
|
await conn.write(mplexBuf.buffer)
|
|
|
|
proc writer() {.async.} =
|
|
|
|
for i in buf.buffer:
|
|
|
|
await conn.write(@[i])
|
2020-03-11 15:12:08 +00:00
|
|
|
|
|
|
|
await writer()
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await complete.wait(5.seconds)
|
2020-03-11 15:12:08 +00:00
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
waitFor(test())
|