import unittest import chronos, stew/byteutils import ../libp2p/stream/bufferstream, ../libp2p/stream/lpstream {.used.} suite "BufferStream": teardown: # echo getTracker(BufferStreamTrackerName).dump() check getTracker(BufferStreamTrackerName).isLeaked() == false test "push data to buffer": proc testpushData(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 var data = "12345" await buff.pushData(data.toBytes()) check buff.len == 5 result = true await buff.close() check: waitFor(testpushData()) == true test "push and wait": proc testpushData(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 let fut0 = buff.pushData("1234".toBytes()) let fut1 = buff.pushData("5".toBytes()) check buff.len == 4 # the second write should not be visible yet var data: array[1, byte] check: 1 == await buff.readOnce(addr data[0], data.len) check ['1'] == string.fromBytes(data) await fut0 await fut1 check buff.len == 4 result = true await buff.close() check: waitFor(testpushData()) == true test "read with size": proc testRead(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 await buff.pushData("12345".toBytes()) var data: array[3, byte] await buff.readExactly(addr data[0], data.len) check ['1', '2', '3'] == string.fromBytes(data) result = true await buff.close() check: waitFor(testRead()) == true test "readExactly": proc testReadExactly(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 await buff.pushData("12345".toBytes()) check buff.len == 5 var data: array[2, byte] await buff.readExactly(addr data[0], data.len) check string.fromBytes(data) == ['1', '2'] result = true await buff.close() check: waitFor(testReadExactly()) == true test "readExactly raises": proc testReadExactly(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 await buff.pushData("123".toBytes()) var data: array[5, byte] var readFut = buff.readExactly(addr data[0], data.len) await buff.close() try: await readFut except LPStreamIncompleteError: result = true check: waitFor(testReadExactly()) == true test "readOnce": proc testReadOnce(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 var data: array[3, byte] let readFut = buff.readOnce(addr data[0], data.len) await buff.pushData("123".toBytes()) check buff.len == 3 check (await readFut) == 3 check string.fromBytes(data) == ['1', '2', '3'] result = true await buff.close() check: waitFor(testReadOnce()) == true test "reads should happen in order": proc testWritePtr(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 let w1 = buff.pushData("Msg 1".toBytes()) let w2 = buff.pushData("Msg 2".toBytes()) let w3 = buff.pushData("Msg 3".toBytes()) var data: array[5, byte] await buff.readExactly(addr data[0], data.len) check string.fromBytes(data) == "Msg 1" await buff.readExactly(addr data[0], data.len) check string.fromBytes(data) == "Msg 2" await buff.readExactly(addr data[0], data.len) check string.fromBytes(data) == "Msg 3" for f in [w1, w2, w3]: await f let w4 = buff.pushData("Msg 4".toBytes()) let w5 = buff.pushData("Msg 5".toBytes()) let w6 = buff.pushData("Msg 6".toBytes()) await buff.close() await buff.readExactly(addr data[0], data.len) check string.fromBytes(data) == "Msg 4" await buff.readExactly(addr data[0], data.len) check string.fromBytes(data) == "Msg 5" await buff.readExactly(addr data[0], data.len) check string.fromBytes(data) == "Msg 6" for f in [w4, w5, w6]: await f result = true check: waitFor(testWritePtr()) == true test "small reads": proc testWritePtr(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 var writes: seq[Future[void]] var str: string for i in 0..<10: writes.add buff.pushData("123".toBytes()) str &= "123" await buff.close() # all data should still be read after close var str2: string var data: array[2, byte] try: while true: let x = await buff.readOnce(addr data[0], data.len) str2 &= string.fromBytes(data[0..