nim-libp2p/tests/testbufferstream.nim
Jacek Sieka 96d4c44fec
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.

When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.

In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.

* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 08:19:13 +02:00

234 lines
5.7 KiB
Nim

import unittest
import chronos, stew/byteutils
import ../libp2p/stream/bufferstream,
../libp2p/stream/lpstream
{.used.}
suite "BufferStream":
teardown:
# echo getTracker("libp2p.bufferstream").dump()
check getTracker("libp2p.bufferstream").isLeaked() == false
test "push data to buffer":
proc testPushTo(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
var data = "12345"
await buff.pushTo(data.toBytes())
check buff.len == 5
result = true
await buff.close()
check:
waitFor(testPushTo()) == true
test "push and wait":
proc testPushTo(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
let fut0 = buff.pushTo("1234".toBytes())
let fut1 = buff.pushTo("5".toBytes())
check buff.len == 4 # the second write should not be visible yet
var data: array[1, byte]
check: 1 == await buff.readOnce(addr data[0], data.len)
check ['1'] == string.fromBytes(data)
await fut0
await fut1
check buff.len == 4
result = true
await buff.close()
check:
waitFor(testPushTo()) == true
test "read with size":
proc testRead(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
await buff.pushTo("12345".toBytes())
var data: array[3, byte]
await buff.readExactly(addr data[0], data.len)
check ['1', '2', '3'] == string.fromBytes(data)
result = true
await buff.close()
check:
waitFor(testRead()) == true
test "readExactly":
proc testReadExactly(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
await buff.pushTo("12345".toBytes())
check buff.len == 5
var data: array[2, byte]
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == ['1', '2']
result = true
await buff.close()
check:
waitFor(testReadExactly()) == true
test "readExactly raises":
proc testReadExactly(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
await buff.pushTo("123".toBytes())
var data: array[5, byte]
var readFut = buff.readExactly(addr data[0], data.len)
await buff.close()
try:
await readFut
except LPStreamEOFError:
result = true
check:
waitFor(testReadExactly()) == true
test "readOnce":
proc testReadOnce(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
var data: array[3, byte]
let readFut = buff.readOnce(addr data[0], data.len)
await buff.pushTo("123".toBytes())
check buff.len == 3
check (await readFut) == 3
check string.fromBytes(data) == ['1', '2', '3']
result = true
await buff.close()
check:
waitFor(testReadOnce()) == true
test "reads should happen in order":
proc testWritePtr(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
let w1 = buff.pushTo("Msg 1".toBytes())
let w2 = buff.pushTo("Msg 2".toBytes())
let w3 = buff.pushTo("Msg 3".toBytes())
var data: array[5, byte]
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == "Msg 1"
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == "Msg 2"
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == "Msg 3"
for f in [w1, w2, w3]: await f
let w4 = buff.pushTo("Msg 4".toBytes())
let w5 = buff.pushTo("Msg 5".toBytes())
let w6 = buff.pushTo("Msg 6".toBytes())
await buff.close()
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == "Msg 4"
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == "Msg 5"
await buff.readExactly(addr data[0], data.len)
check string.fromBytes(data) == "Msg 6"
for f in [w4, w5, w6]: await f
result = true
check:
waitFor(testWritePtr()) == true
test "small reads":
proc testWritePtr(): Future[bool] {.async.} =
let buff = newBufferStream()
check buff.len == 0
var writes: seq[Future[void]]
var str: string
for i in 0..<10:
writes.add buff.pushTo("123".toBytes())
str &= "123"
await buff.close() # all data should still be read after close
var str2: string
var data: array[2, byte]
try:
while true:
let x = await buff.readOnce(addr data[0], data.len)
str2 &= string.fromBytes(data[0..<x])
except LPStreamEOFError:
discard
for f in writes: await f
check str == str2
result = true
await buff.close()
check:
waitFor(testWritePtr()) == true
test "shouldn't get stuck on close":
proc closeTest(): Future[bool] {.async.} =
var stream = newBufferStream()
var
fut = stream.pushTo(toBytes("hello"))
fut2 = stream.pushTo(toBytes("again"))
await stream.close()
try:
await wait(fut, 100.milliseconds)
await wait(fut2, 100.milliseconds)
result = true
except AsyncTimeoutError:
result = false
await stream.close()
check:
waitFor(closeTest()) == true
test "no push after close":
proc closeTest(): Future[bool] {.async.} =
var stream = newBufferStream()
await stream.pushTo("123".toBytes())
var data: array[3, byte]
await stream.readExactly(addr data[0], data.len)
await stream.close()
try:
await stream.pushTo("123".toBytes())
except LPStreamClosedError:
result = true
check:
waitFor(closeTest()) == true