mirror of
https://github.com/vacp2p/nim-libp2p-experimental.git
synced 2025-01-16 05:14:36 +00:00
49a12e619d
* channel close race and deadlock fixes * remove send lock, write chunks in one go * push some of half-closed implementation to BufferStream * fix some hangs where LPChannel readers and writers would not always wake up * simplify lazy channels * fix close happening more than once in some orderings * reenable connection tracking tests * close channels first on mplex close such that consumers can read bytes A notable difference is that BufferedStream is no longer considered EOF until someone has actually read the EOF marker. * docs, simplification
234 lines
5.8 KiB
Nim
234 lines
5.8 KiB
Nim
import unittest
|
|
import chronos, stew/byteutils
|
|
import ../libp2p/stream/bufferstream,
|
|
../libp2p/stream/lpstream
|
|
|
|
{.used.}
|
|
|
|
suite "BufferStream":
|
|
teardown:
|
|
# echo getTracker("libp2p.bufferstream").dump()
|
|
check getTracker("libp2p.bufferstream").isLeaked() == false
|
|
|
|
test "push data to buffer":
|
|
proc testpushData(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
var data = "12345"
|
|
await buff.pushData(data.toBytes())
|
|
check buff.len == 5
|
|
result = true
|
|
|
|
await buff.close()
|
|
|
|
check:
|
|
waitFor(testpushData()) == true
|
|
|
|
test "push and wait":
|
|
proc testpushData(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
|
|
let fut0 = buff.pushData("1234".toBytes())
|
|
let fut1 = buff.pushData("5".toBytes())
|
|
check buff.len == 4 # the second write should not be visible yet
|
|
|
|
var data: array[1, byte]
|
|
check: 1 == await buff.readOnce(addr data[0], data.len)
|
|
|
|
check ['1'] == string.fromBytes(data)
|
|
await fut0
|
|
await fut1
|
|
check buff.len == 4
|
|
|
|
result = true
|
|
|
|
await buff.close()
|
|
|
|
check:
|
|
waitFor(testpushData()) == true
|
|
|
|
test "read with size":
|
|
proc testRead(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
|
|
await buff.pushData("12345".toBytes())
|
|
var data: array[3, byte]
|
|
await buff.readExactly(addr data[0], data.len)
|
|
check ['1', '2', '3'] == string.fromBytes(data)
|
|
|
|
result = true
|
|
|
|
await buff.close()
|
|
|
|
check:
|
|
waitFor(testRead()) == true
|
|
|
|
test "readExactly":
|
|
proc testReadExactly(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
|
|
await buff.pushData("12345".toBytes())
|
|
check buff.len == 5
|
|
var data: array[2, byte]
|
|
await buff.readExactly(addr data[0], data.len)
|
|
check string.fromBytes(data) == ['1', '2']
|
|
|
|
result = true
|
|
|
|
await buff.close()
|
|
|
|
check:
|
|
waitFor(testReadExactly()) == true
|
|
|
|
test "readExactly raises":
|
|
proc testReadExactly(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
|
|
await buff.pushData("123".toBytes())
|
|
var data: array[5, byte]
|
|
var readFut = buff.readExactly(addr data[0], data.len)
|
|
await buff.close()
|
|
|
|
try:
|
|
await readFut
|
|
except LPStreamIncompleteError:
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testReadExactly()) == true
|
|
|
|
test "readOnce":
|
|
proc testReadOnce(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
|
|
var data: array[3, byte]
|
|
let readFut = buff.readOnce(addr data[0], data.len)
|
|
await buff.pushData("123".toBytes())
|
|
check buff.len == 3
|
|
|
|
check (await readFut) == 3
|
|
check string.fromBytes(data) == ['1', '2', '3']
|
|
|
|
result = true
|
|
|
|
await buff.close()
|
|
|
|
check:
|
|
waitFor(testReadOnce()) == true
|
|
|
|
test "reads should happen in order":
|
|
proc testWritePtr(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
|
|
let w1 = buff.pushData("Msg 1".toBytes())
|
|
let w2 = buff.pushData("Msg 2".toBytes())
|
|
let w3 = buff.pushData("Msg 3".toBytes())
|
|
|
|
var data: array[5, byte]
|
|
await buff.readExactly(addr data[0], data.len)
|
|
|
|
check string.fromBytes(data) == "Msg 1"
|
|
|
|
await buff.readExactly(addr data[0], data.len)
|
|
check string.fromBytes(data) == "Msg 2"
|
|
|
|
await buff.readExactly(addr data[0], data.len)
|
|
check string.fromBytes(data) == "Msg 3"
|
|
|
|
for f in [w1, w2, w3]: await f
|
|
|
|
let w4 = buff.pushData("Msg 4".toBytes())
|
|
let w5 = buff.pushData("Msg 5".toBytes())
|
|
let w6 = buff.pushData("Msg 6".toBytes())
|
|
|
|
await buff.close()
|
|
|
|
await buff.readExactly(addr data[0], data.len)
|
|
check string.fromBytes(data) == "Msg 4"
|
|
|
|
await buff.readExactly(addr data[0], data.len)
|
|
check string.fromBytes(data) == "Msg 5"
|
|
|
|
await buff.readExactly(addr data[0], data.len)
|
|
check string.fromBytes(data) == "Msg 6"
|
|
|
|
for f in [w4, w5, w6]: await f
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testWritePtr()) == true
|
|
|
|
test "small reads":
|
|
proc testWritePtr(): Future[bool] {.async.} =
|
|
let buff = newBufferStream()
|
|
check buff.len == 0
|
|
|
|
var writes: seq[Future[void]]
|
|
var str: string
|
|
for i in 0..<10:
|
|
writes.add buff.pushData("123".toBytes())
|
|
str &= "123"
|
|
await buff.close() # all data should still be read after close
|
|
|
|
var str2: string
|
|
var data: array[2, byte]
|
|
try:
|
|
while true:
|
|
let x = await buff.readOnce(addr data[0], data.len)
|
|
str2 &= string.fromBytes(data[0..<x])
|
|
except LPStreamEOFError:
|
|
discard
|
|
|
|
for f in writes: await f
|
|
|
|
check str == str2
|
|
|
|
result = true
|
|
|
|
await buff.close()
|
|
|
|
check:
|
|
waitFor(testWritePtr()) == true
|
|
|
|
test "shouldn't get stuck on close":
|
|
proc closeTest(): Future[bool] {.async.} =
|
|
var stream = newBufferStream()
|
|
var
|
|
fut = stream.pushData(toBytes("hello"))
|
|
fut2 = stream.pushData(toBytes("again"))
|
|
await stream.close()
|
|
try:
|
|
await wait(fut, 100.milliseconds)
|
|
await wait(fut2, 100.milliseconds)
|
|
result = true
|
|
except AsyncTimeoutError:
|
|
result = false
|
|
|
|
await stream.close()
|
|
|
|
check:
|
|
waitFor(closeTest()) == true
|
|
|
|
test "no push after close":
|
|
proc closeTest(): Future[bool] {.async.} =
|
|
var stream = newBufferStream()
|
|
await stream.pushData("123".toBytes())
|
|
var data: array[3, byte]
|
|
await stream.readExactly(addr data[0], data.len)
|
|
await stream.close()
|
|
|
|
try:
|
|
await stream.pushData("123".toBytes())
|
|
except LPStreamClosedError:
|
|
result = true
|
|
|
|
check:
|
|
waitFor(closeTest()) == true
|