2020-04-17 15:52:55 -06:00
|
|
|
import unittest
|
2020-04-19 18:38:05 -06:00
|
|
|
import chronos, chronicles, stew/byteutils
|
|
|
|
import multiaddress,
|
|
|
|
transports/[transport, tcptransport],
|
|
|
|
streams/[stream,
|
|
|
|
pushable,
|
|
|
|
chronosstream,
|
|
|
|
connection,
|
|
|
|
pushable,
|
|
|
|
utils]
|
2020-04-17 15:52:55 -06:00
|
|
|
|
|
|
|
suite "streams":
|
2020-04-19 18:38:05 -06:00
|
|
|
# test "interuption":
|
|
|
|
# proc test() {.async.} =
|
|
|
|
# iterator source(): Future[int] {.closure.} =
|
|
|
|
# for i in 0..5:
|
|
|
|
# if i == 4:
|
|
|
|
# break
|
|
|
|
|
|
|
|
# yield i.toFuture
|
|
|
|
|
|
|
|
# proc sink(i: Source[int]): Future[void] {.async.} =
|
|
|
|
# var count = 0
|
|
|
|
# for item in i:
|
|
|
|
# echo await item
|
|
|
|
# count.inc
|
|
|
|
|
|
|
|
# check: count == 4
|
|
|
|
|
|
|
|
# await source.sink()
|
|
|
|
|
|
|
|
# waitFor(test())
|
|
|
|
|
|
|
|
# test "error propagation":
|
|
|
|
# proc test() {.async.} =
|
|
|
|
# iterator source(): Future[int] {.closure.} =
|
|
|
|
# for i in 0..5:
|
|
|
|
# if i == 4:
|
|
|
|
# raise newException(CatchableError, "Uh-Oh!")
|
|
|
|
|
|
|
|
# yield i.toFuture
|
|
|
|
|
|
|
|
# proc sink(i: Source[int]): Future[void] {.async.} =
|
|
|
|
# var count = 0
|
|
|
|
# for item in i:
|
|
|
|
# echo await item
|
|
|
|
# count.inc
|
|
|
|
|
|
|
|
# check: count == 4
|
|
|
|
|
|
|
|
# try:
|
|
|
|
# await source.sink()
|
|
|
|
# except CatchableError as exc:
|
|
|
|
# check: true
|
|
|
|
|
|
|
|
# waitFor(test())
|
|
|
|
|
|
|
|
# test "error propagation with pipeline":
|
|
|
|
# proc test() {.async.} =
|
|
|
|
# iterator source(): Future[int] {.closure.} =
|
|
|
|
# defer:
|
|
|
|
# echo "exiting source"
|
|
|
|
|
|
|
|
# for i in 0..5:
|
|
|
|
# if i == 4:
|
|
|
|
# raise newException(CatchableError, "Uh-Oh!")
|
|
|
|
|
|
|
|
# yield i.toFuture
|
|
|
|
|
|
|
|
# proc sink(i: Source[int]): Future[void] {.async.} =
|
|
|
|
# defer:
|
|
|
|
# echo "exiting sink"
|
|
|
|
|
|
|
|
# var count = 0
|
|
|
|
# for item in i:
|
|
|
|
# echo await item
|
|
|
|
# count.inc
|
|
|
|
|
|
|
|
# check: count == 4
|
|
|
|
|
|
|
|
# proc mul2(f: Future[int]): Future[int] {.async.} =
|
|
|
|
# result = (await f) * 2
|
|
|
|
|
|
|
|
# proc through1(i: Source[int]): Source[int] {.gcsafe.} =
|
|
|
|
# return iterator(): Future[int] {.closure.} =
|
|
|
|
# defer:
|
|
|
|
# echo "exiting through1"
|
|
|
|
|
|
|
|
# for item in i:
|
|
|
|
# yield mul2(item)
|
|
|
|
|
|
|
|
# proc through2(i: Source[int]): Source[int] {.gcsafe.} =
|
|
|
|
# return iterator(): Future[int] {.closure.} =
|
|
|
|
# defer:
|
|
|
|
# echo "exiting through2"
|
|
|
|
|
|
|
|
# for item in i:
|
|
|
|
# yield mul2(item)
|
|
|
|
# echo "exiting through2"
|
|
|
|
|
|
|
|
# try:
|
|
|
|
# await source.
|
|
|
|
# through1.
|
|
|
|
# through2.
|
|
|
|
# sink
|
|
|
|
# except CatchableError as exc:
|
|
|
|
# check: true
|
|
|
|
|
|
|
|
# waitFor(test())
|
|
|
|
|
|
|
|
# test "resource destruction":
|
|
|
|
# proc test() {.async.} =
|
|
|
|
# type
|
|
|
|
# MyType = object of RootObj
|
|
|
|
# inner: string
|
|
|
|
|
|
|
|
# MyTypeRef = ref MyType
|
|
|
|
|
|
|
|
# proc `=destroy`(t: var MyType) =
|
|
|
|
# echo "destroyed!"
|
|
|
|
|
|
|
|
# iterator source(): Future[MyTypeRef] {.closure.} =
|
|
|
|
# for i in 0..5:
|
|
|
|
# yield MyTypeRef().toFuture
|
|
|
|
# if i == 4:
|
|
|
|
# raise newException(CatchableError, "Uh-Oh!")
|
|
|
|
|
|
|
|
# proc sink(i: Source[MyTypeRef]): Future[void] {.async.} =
|
|
|
|
# var count = 0
|
|
|
|
# for item in i:
|
|
|
|
# discard await item
|
|
|
|
# count.inc
|
|
|
|
|
|
|
|
# check: count == 4
|
|
|
|
|
|
|
|
# proc through1(i: Source[MyTypeRef]): Source[MyTypeRef] {.gcsafe.} =
|
|
|
|
# return iterator(): Future[MyTypeRef] {.closure.} =
|
|
|
|
# for item in i:
|
|
|
|
# yield item
|
|
|
|
|
|
|
|
# proc through2(i: Source[MyTypeRef]): Source[MyTypeRef] {.gcsafe.} =
|
|
|
|
# return iterator(): Future[MyTypeRef] {.closure.} =
|
|
|
|
# for item in i:
|
|
|
|
# yield item
|
|
|
|
|
|
|
|
# try:
|
|
|
|
# await source.
|
|
|
|
# through1.
|
|
|
|
# through2.
|
|
|
|
# sink
|
|
|
|
# except CatchableError as exc:
|
|
|
|
# check: true
|
|
|
|
|
|
|
|
# await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
# waitFor(test())
|
|
|
|
|
|
|
|
# test "pushable should not terminate prematurely":
|
|
|
|
# proc test() {.async.} =
|
|
|
|
# var pushable: Pushable[string] = Pushable[string].init(eofTag = "\0")
|
|
|
|
|
|
|
|
# var dest: Sink[string] = proc (i: Source[string]): Future[void] {.async.} =
|
|
|
|
# for item in i:
|
|
|
|
# echo await item
|
|
|
|
|
|
|
|
# var sink = pipe(pushable, dest)
|
|
|
|
|
|
|
|
# await pushable.push("item 1")
|
|
|
|
# # await pushable.push("item 2")
|
|
|
|
# # await pushable.push("item 3")
|
|
|
|
# await pushable.close()
|
|
|
|
|
|
|
|
# await sink
|
|
|
|
|
|
|
|
# waitFor(test())
|
|
|
|
|
|
|
|
test "pushable should not terminate prematurely":
|
2020-04-17 15:52:55 -06:00
|
|
|
proc test() {.async.} =
|
2020-04-19 18:38:05 -06:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
|
|
|
|
|
|
|
var finished = newFuture[void]()
|
|
|
|
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
|
|
|
var source = conn.source()
|
|
|
|
for i in source:
|
|
|
|
var msg = await i
|
2020-04-21 16:01:01 -06:00
|
|
|
if msg.len <= 0:
|
|
|
|
break
|
2020-04-19 18:38:05 -06:00
|
|
|
echo "STREAM ", string.fromBytes(msg)
|
|
|
|
finished.complete()
|
|
|
|
|
|
|
|
let transport1: TcpTransport = newTransport(TcpTransport)
|
|
|
|
let transportFut = await transport1.listen(ma, connHandler)
|
2020-04-17 15:52:55 -06:00
|
|
|
|
2020-04-19 18:38:05 -06:00
|
|
|
let transport2: TcpTransport = newTransport(TcpTransport)
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
2020-04-17 15:52:55 -06:00
|
|
|
|
2020-04-21 16:01:01 -06:00
|
|
|
# iterator source(): Future[seq[byte]] {.closure.} =
|
|
|
|
# var count = 0
|
|
|
|
# proc src(): Future[seq[byte]] =
|
|
|
|
# result = (("item " & $count).toBytes()).toFuture
|
|
|
|
|
|
|
|
# while true:
|
|
|
|
# var res = src()
|
|
|
|
# echo "HERE 1"
|
|
|
|
# if count > 2:
|
|
|
|
# break
|
|
|
|
# count.inc
|
|
|
|
|
|
|
|
# echo "HERE 2"
|
|
|
|
# yield res
|
|
|
|
# echo "HERE 3"
|
|
|
|
# echo "DONE!"
|
|
|
|
|
|
|
|
# var sink = conn.sink()(source)
|
|
|
|
|
2020-04-19 18:38:05 -06:00
|
|
|
var pushable = BytePushable.init()
|
|
|
|
var sink = pipe(pushable, conn)
|
2020-04-17 15:52:55 -06:00
|
|
|
|
2020-04-19 18:38:05 -06:00
|
|
|
await pushable.push("item 1".toBytes())
|
|
|
|
await pushable.push("item 2".toBytes())
|
|
|
|
await pushable.push("item 3".toBytes())
|
|
|
|
await pushable.close()
|
2020-04-17 15:52:55 -06:00
|
|
|
|
2020-04-19 18:38:05 -06:00
|
|
|
await finished
|
|
|
|
await transport1.close()
|
|
|
|
await transportFut
|
|
|
|
await sink
|
2020-04-17 15:52:55 -06:00
|
|
|
|
|
|
|
waitFor(test())
|