nim-libp2p/tests/teststreams.nim

209 lines
5.4 KiB
Nim
Raw Normal View History

2020-04-17 15:52:55 -06:00
import unittest
2020-04-19 18:38:05 -06:00
import chronos, chronicles, stew/byteutils
import multiaddress,
transports/[transport, tcptransport],
streams/[stream,
pushable,
chronosstream,
connection,
pushable,
utils]
2020-04-17 15:52:55 -06:00
suite "streams":
2020-04-19 18:38:05 -06:00
# test "interuption":
# proc test() {.async.} =
# iterator source(): Future[int] {.closure.} =
# for i in 0..5:
# if i == 4:
# break
# yield i.toFuture
# proc sink(i: Source[int]): Future[void] {.async.} =
# var count = 0
# for item in i:
# echo await item
# count.inc
# check: count == 4
# await source.sink()
# waitFor(test())
# test "error propagation":
# proc test() {.async.} =
# iterator source(): Future[int] {.closure.} =
# for i in 0..5:
# if i == 4:
# raise newException(CatchableError, "Uh-Oh!")
# yield i.toFuture
# proc sink(i: Source[int]): Future[void] {.async.} =
# var count = 0
# for item in i:
# echo await item
# count.inc
# check: count == 4
# try:
# await source.sink()
# except CatchableError as exc:
# check: true
# waitFor(test())
# test "error propagation with pipeline":
# proc test() {.async.} =
# iterator source(): Future[int] {.closure.} =
# defer:
# echo "exiting source"
# for i in 0..5:
# if i == 4:
# raise newException(CatchableError, "Uh-Oh!")
# yield i.toFuture
# proc sink(i: Source[int]): Future[void] {.async.} =
# defer:
# echo "exiting sink"
# var count = 0
# for item in i:
# echo await item
# count.inc
# check: count == 4
# proc mul2(f: Future[int]): Future[int] {.async.} =
# result = (await f) * 2
# proc through1(i: Source[int]): Source[int] {.gcsafe.} =
# return iterator(): Future[int] {.closure.} =
# defer:
# echo "exiting through1"
# for item in i:
# yield mul2(item)
# proc through2(i: Source[int]): Source[int] {.gcsafe.} =
# return iterator(): Future[int] {.closure.} =
# defer:
# echo "exiting through2"
# for item in i:
# yield mul2(item)
# echo "exiting through2"
# try:
# await source.
# through1.
# through2.
# sink
# except CatchableError as exc:
# check: true
# waitFor(test())
# test "resource destruction":
# proc test() {.async.} =
# type
# MyType = object of RootObj
# inner: string
# MyTypeRef = ref MyType
# proc `=destroy`(t: var MyType) =
# echo "destroyed!"
# iterator source(): Future[MyTypeRef] {.closure.} =
# for i in 0..5:
# yield MyTypeRef().toFuture
# if i == 4:
# raise newException(CatchableError, "Uh-Oh!")
# proc sink(i: Source[MyTypeRef]): Future[void] {.async.} =
# var count = 0
# for item in i:
# discard await item
# count.inc
# check: count == 4
# proc through1(i: Source[MyTypeRef]): Source[MyTypeRef] {.gcsafe.} =
# return iterator(): Future[MyTypeRef] {.closure.} =
# for item in i:
# yield item
# proc through2(i: Source[MyTypeRef]): Source[MyTypeRef] {.gcsafe.} =
# return iterator(): Future[MyTypeRef] {.closure.} =
# for item in i:
# yield item
# try:
# await source.
# through1.
# through2.
# sink
# except CatchableError as exc:
# check: true
# await sleepAsync(500.millis)
# waitFor(test())
# test "pushable should not terminate prematurely":
# proc test() {.async.} =
# var pushable: Pushable[string] = Pushable[string].init(eofTag = "\0")
# var dest: Sink[string] = proc (i: Source[string]): Future[void] {.async.} =
# for item in i:
# echo await item
# var sink = pipe(pushable, dest)
# await pushable.push("item 1")
# # await pushable.push("item 2")
# # await pushable.push("item 3")
# await pushable.close()
# await sink
# waitFor(test())
test "pushable should not terminate prematurely":
2020-04-17 15:52:55 -06:00
proc test() {.async.} =
2020-04-19 18:38:05 -06:00
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var finished = newFuture[void]()
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
var source = conn.source()
for i in source:
var msg = await i
echo "STREAM ", string.fromBytes(msg)
finished.complete()
let transport1: TcpTransport = newTransport(TcpTransport)
let transportFut = await transport1.listen(ma, connHandler)
2020-04-17 15:52:55 -06:00
2020-04-19 18:38:05 -06:00
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
2020-04-17 15:52:55 -06:00
2020-04-19 18:38:05 -06:00
var pushable = BytePushable.init()
var sink = pipe(pushable, conn)
2020-04-17 15:52:55 -06:00
2020-04-19 18:38:05 -06:00
await pushable.push("item 1".toBytes())
await pushable.push("item 2".toBytes())
await pushable.push("item 3".toBytes())
await pushable.close()
2020-04-17 15:52:55 -06:00
2020-04-19 18:38:05 -06:00
await finished
await transport1.close()
await transportFut
await sink
2020-04-17 15:52:55 -06:00
waitFor(test())