mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-27 12:00:44 +00:00
wip
This commit is contained in:
parent
b5e60a084a
commit
f58744fabe
@ -1,276 +1,266 @@
|
|||||||
import chronos
|
import chronos
|
||||||
|
import stream
|
||||||
type
|
|
||||||
AsyncIterable*[T] = iterator(): Future[T] {.closure.}
|
|
||||||
|
|
||||||
template toFuture*[T](v: T): Future[T] =
|
template toFuture*[T](v: T): Future[T] =
|
||||||
var fut = newFuture[T]()
|
var fut = newFuture[T]()
|
||||||
fut.complete(v)
|
fut.complete(v)
|
||||||
fut
|
fut
|
||||||
|
|
||||||
iterator items*[T](i: AsyncIterable[T]): Future[T] =
|
proc forEach*[T](iter: Source[T],
|
||||||
while true:
|
pred: proc(item: Future[T]):
|
||||||
var item = i()
|
Future[void] {.gcsafe.}) {.async.} =
|
||||||
if i.finished():
|
for i in iter:
|
||||||
break
|
await pred(i)
|
||||||
|
|
||||||
yield item
|
proc collect*[T](iter: Source[T]): Future[seq[T]] =
|
||||||
|
for i in iter:
|
||||||
|
result.add(i)
|
||||||
|
|
||||||
# proc forEach*[T](iter: AsyncIterable[T],
|
proc map*[T, S](iter: Source[T],
|
||||||
# pred: proc(item: Future[T]):
|
pred: proc(item: Future[T]): Future[S] {.gcsafe.}):
|
||||||
# Future[void] {.gcsafe.}) {.async.} =
|
Source[T] =
|
||||||
# for i in iter:
|
return iterator(): Future[S] =
|
||||||
# await pred(i)
|
for i in iter:
|
||||||
|
yield pred(i)
|
||||||
|
|
||||||
# proc collect*[T](iter: AsyncIterable[T]): Future[seq[T]] =
|
proc filter*[T](iter: Source[T],
|
||||||
# for i in iter:
|
pred: proc(item: Future[T]): Future[bool] {.gcsafe.}):
|
||||||
# result.add(i)
|
Source[T] =
|
||||||
|
return iterator(): Future[T] =
|
||||||
|
proc next(item: Future[T]): Future[T] {.async.} =
|
||||||
|
for i in iter:
|
||||||
|
if not (await pred(i)):
|
||||||
|
continue
|
||||||
|
result = await i
|
||||||
|
|
||||||
# proc map*[T, S](iter: AsyncIterable[T],
|
for i in iter:
|
||||||
# pred: proc(item: Future[T]): Future[S] {.gcsafe.}):
|
yield next(i)
|
||||||
# AsyncIterable[T] =
|
|
||||||
# return iterator(): Future[S] =
|
|
||||||
# for i in iter:
|
|
||||||
# yield pred(i)
|
|
||||||
|
|
||||||
# proc filter*[T](iter: AsyncIterable[T],
|
proc zip*[T,S](i: Source[T],
|
||||||
# pred: proc(item: Future[T]): Future[bool] {.gcsafe.}):
|
j: Source[S]):
|
||||||
# AsyncIterable[T] =
|
iterator(): Future[tuple[a: Future[T], b: Future[S]]] {.gcsafe.} =
|
||||||
# return iterator(): Future[T] =
|
## Iterates through both iterators at the same time, returning a tuple of
|
||||||
# proc next(item: Future[T]): Future[T] {.async.} =
|
## both elements as long as neither of the iterators has finished.
|
||||||
# for i in iter:
|
##
|
||||||
# if not (await pred(i)):
|
## .. code-block:: nim
|
||||||
# continue
|
## for x in zip(1..4, 20..24):
|
||||||
# result = await i
|
## echo x
|
||||||
|
return iterator(): Future[tuple[a: Future[T], b: Future[S]]] {.gcsafe.} =
|
||||||
|
while true:
|
||||||
|
let res = (i(), j())
|
||||||
|
if finished(i) or finished(j):
|
||||||
|
break
|
||||||
|
yield res.toFuture
|
||||||
|
|
||||||
# for i in iter:
|
proc slice*[T](i: Source[T],
|
||||||
# yield next(i)
|
first = 0, last = 0, step = 1): Source[T] =
|
||||||
|
## Yields every `step` item in `i` from index `first` to `last`.
|
||||||
|
##
|
||||||
|
## .. code-block:: nim
|
||||||
|
## for i in slice(0..100, 10, 20)
|
||||||
|
## echo i
|
||||||
|
var pos = 0
|
||||||
|
return iterator(): Future[T] {.gcsafe.} =
|
||||||
|
for x in i:
|
||||||
|
if pos > last:
|
||||||
|
break
|
||||||
|
elif pos >= first and (pos - first) mod step == 0:
|
||||||
|
yield x
|
||||||
|
inc pos
|
||||||
|
|
||||||
# proc zip*[T,S](i: AsyncIterable[T],
|
proc delete*[T](i: Source[T],
|
||||||
# j: AsyncIterable[S]):
|
first = 0, last = 0): Source[T] =
|
||||||
# iterator(): Future[tuple[a: Future[T], b: Future[S]]] {.gcsafe.} =
|
## Yields the items in `i` except for the ones between `first` and `last`.
|
||||||
# ## Iterates through both iterators at the same time, returning a tuple of
|
##
|
||||||
# ## both elements as long as neither of the iterators has finished.
|
## .. code-block:: nim
|
||||||
# ##
|
## for x in delete(1..10, 4, 8):
|
||||||
# ## .. code-block:: nim
|
## echo x
|
||||||
# ## for x in zip(1..4, 20..24):
|
var pos = 0
|
||||||
# ## echo x
|
return iterator(): Future[T] {.gcsafe.} =
|
||||||
# return iterator(): Future[tuple[a: Future[T], b: Future[S]]] {.gcsafe.} =
|
for x in i:
|
||||||
# while true:
|
if pos notin first..last:
|
||||||
# let res = (i(), j())
|
yield x
|
||||||
# if finished(i) or finished(j):
|
inc pos
|
||||||
# break
|
|
||||||
# yield res.toFuture
|
|
||||||
|
|
||||||
# proc slice*[T](i: AsyncIterable[T],
|
proc foldl*[T,S](i: Source[T],
|
||||||
# first = 0, last = 0, step = 1): AsyncIterable[T] =
|
f: proc(x: Future[S], y: Future[T]): Future[S] {.gcsafe.},
|
||||||
# ## Yields every `step` item in `i` from index `first` to `last`.
|
y: Future[S] | S):
|
||||||
# ##
|
Future[S] =
|
||||||
# ## .. code-block:: nim
|
## Folds the values as the iterator yields them, returning the accumulation.
|
||||||
# ## for i in slice(0..100, 10, 20)
|
##
|
||||||
# ## echo i
|
## As the initial value of the accumulation `y` is used.
|
||||||
# var pos = 0
|
##
|
||||||
# return iterator(): Future[T] {.gcsafe.} =
|
## .. code-block:: nim
|
||||||
# for x in i:
|
## echo foldl(1..10, proc(x,y: int): int = x + y, 0)
|
||||||
# if pos > last:
|
when type(y) is Future:
|
||||||
# break
|
result = await y
|
||||||
# elif pos >= first and (pos - first) mod step == 0:
|
else:
|
||||||
# yield x
|
result = y.toFuture
|
||||||
# inc pos
|
|
||||||
|
|
||||||
# proc delete*[T](i: AsyncIterable[T],
|
for x in i:
|
||||||
# first = 0, last = 0): AsyncIterable[T] =
|
result = f(result, x)
|
||||||
# ## Yields the items in `i` except for the ones between `first` and `last`.
|
|
||||||
# ##
|
|
||||||
# ## .. code-block:: nim
|
|
||||||
# ## for x in delete(1..10, 4, 8):
|
|
||||||
# ## echo x
|
|
||||||
# var pos = 0
|
|
||||||
# return iterator(): Future[T] {.gcsafe.} =
|
|
||||||
# for x in i:
|
|
||||||
# if pos notin first..last:
|
|
||||||
# yield x
|
|
||||||
# inc pos
|
|
||||||
|
|
||||||
# proc foldl*[T,S](i: AsyncIterable[T],
|
when isMainModule:
|
||||||
# f: proc(x: Future[S], y: Future[T]): Future[S] {.gcsafe.},
|
import unittest, strutils
|
||||||
# y: Future[S] | S):
|
|
||||||
# Future[S] =
|
|
||||||
# ## Folds the values as the iterator yields them, returning the accumulation.
|
|
||||||
# ##
|
|
||||||
# ## As the initial value of the accumulation `y` is used.
|
|
||||||
# ##
|
|
||||||
# ## .. code-block:: nim
|
|
||||||
# ## echo foldl(1..10, proc(x,y: int): int = x + y, 0)
|
|
||||||
# when type(y) is Future:
|
|
||||||
# result = await y
|
|
||||||
# else:
|
|
||||||
# result = y.toFuture
|
|
||||||
|
|
||||||
# for x in i:
|
suite "nimstreams":
|
||||||
# result = f(result, x)
|
test "forEach":
|
||||||
|
proc test(): Future[bool] {.async.} =
|
||||||
|
iterator stream(): Future[int] {.closure, gcsafe.} =
|
||||||
|
for i in @[1, 2, 3, 4, 5]:
|
||||||
|
yield i.toFuture
|
||||||
|
|
||||||
# when isMainModule:
|
var count = 1
|
||||||
# import unittest, strutils
|
await stream.forEach(
|
||||||
|
proc(item: Future[int]): Future[void] {.async, gcsafe.} =
|
||||||
|
check:
|
||||||
|
count == await item
|
||||||
|
count.inc())
|
||||||
|
|
||||||
# suite "nimstreams":
|
result = true
|
||||||
# test "forEach":
|
|
||||||
# proc test(): Future[bool] {.async.} =
|
|
||||||
# iterator stream(): Future[int] {.closure, gcsafe.} =
|
|
||||||
# for i in @[1, 2, 3, 4, 5]:
|
|
||||||
# yield i.toFuture
|
|
||||||
|
|
||||||
# var count = 1
|
check:
|
||||||
# await stream.forEach(
|
waitFor(test()) == true
|
||||||
# proc(item: Future[int]): Future[void] {.async, gcsafe.} =
|
|
||||||
# check:
|
|
||||||
# count == await item
|
|
||||||
# count.inc())
|
|
||||||
|
|
||||||
# result = true
|
test "map":
|
||||||
|
proc test(): Future[bool] {.async.} =
|
||||||
|
iterator stream(): Future[char] {.closure, gcsafe.} =
|
||||||
|
for i in @['a', 'b', 'c', 'd', 'e']:
|
||||||
|
yield i.toFuture
|
||||||
|
|
||||||
# check:
|
var items = @['A', 'B', 'C', 'D', 'E']
|
||||||
# waitFor(test()) == true
|
var pos = 0
|
||||||
|
await stream
|
||||||
|
.map(
|
||||||
|
proc(item: Future[char]): Future[char] {.async, gcsafe.} =
|
||||||
|
result = (await item).toUpperAscii())
|
||||||
|
.forEach(
|
||||||
|
proc(item: Future[char]): Future[void] {.async, gcsafe.} =
|
||||||
|
check:
|
||||||
|
items[pos] == await item
|
||||||
|
pos.inc())
|
||||||
|
|
||||||
# test "map":
|
result = true
|
||||||
# proc test(): Future[bool] {.async.} =
|
|
||||||
# iterator stream(): Future[char] {.closure, gcsafe.} =
|
|
||||||
# for i in @['a', 'b', 'c', 'd', 'e']:
|
|
||||||
# yield i.toFuture
|
|
||||||
|
|
||||||
# var items = @['A', 'B', 'C', 'D', 'E']
|
check:
|
||||||
# var pos = 0
|
waitFor(test()) == true
|
||||||
# await stream
|
|
||||||
# .map(
|
|
||||||
# proc(item: Future[char]): Future[char] {.async, gcsafe.} =
|
|
||||||
# result = (await item).toUpperAscii())
|
|
||||||
# .forEach(
|
|
||||||
# proc(item: Future[char]): Future[void] {.async, gcsafe.} =
|
|
||||||
# check:
|
|
||||||
# items[pos] == await item
|
|
||||||
# pos.inc())
|
|
||||||
|
|
||||||
# result = true
|
test "filter not empty":
|
||||||
|
proc test(): Future[bool] {.async.} =
|
||||||
|
iterator stream(): Future[int] {.closure, gcsafe.} =
|
||||||
|
for i in @[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
|
||||||
|
yield i.toFuture
|
||||||
|
|
||||||
# check:
|
await stream
|
||||||
# waitFor(test()) == true
|
.filter(
|
||||||
|
proc (item: Future[int]): Future[bool] {.async, gcsafe.} =
|
||||||
|
result = (await item) mod 2 == 0)
|
||||||
|
.forEach(
|
||||||
|
proc(item: Future[int]): Future[void] {.async.} =
|
||||||
|
check:
|
||||||
|
(await item) mod 2 == 0)
|
||||||
|
result = true
|
||||||
|
|
||||||
# test "filter not empty":
|
check:
|
||||||
# proc test(): Future[bool] {.async.} =
|
waitFor(test()) == true
|
||||||
# iterator stream(): Future[int] {.closure, gcsafe.} =
|
|
||||||
# for i in @[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
|
|
||||||
# yield i.toFuture
|
|
||||||
|
|
||||||
# await stream
|
test "filter empty":
|
||||||
# .filter(
|
proc test(): Future[bool] {.async.} =
|
||||||
# proc (item: Future[int]): Future[bool] {.async, gcsafe.} =
|
iterator stream(): Future[int] {.closure, gcsafe.} =
|
||||||
# result = (await item) mod 2 == 0)
|
discard
|
||||||
# .forEach(
|
|
||||||
# proc(item: Future[int]): Future[void] {.async.} =
|
|
||||||
# check:
|
|
||||||
# (await item) mod 2 == 0)
|
|
||||||
# result = true
|
|
||||||
|
|
||||||
# check:
|
await stream
|
||||||
# waitFor(test()) == true
|
.filter(
|
||||||
|
proc(item: Future[int]): Future[bool] {.async, gcsafe.} =
|
||||||
|
discard)
|
||||||
|
.forEach(
|
||||||
|
proc(item: Future[int]): Future[void] {.async.} = discard)
|
||||||
|
result = true
|
||||||
|
|
||||||
# test "filter empty":
|
check:
|
||||||
# proc test(): Future[bool] {.async.} =
|
waitFor(test()) == true
|
||||||
# iterator stream(): Future[int] {.closure, gcsafe.} =
|
|
||||||
# discard
|
|
||||||
|
|
||||||
# await stream
|
test "zip":
|
||||||
# .filter(
|
proc test(): Future[bool] {.async.} =
|
||||||
# proc(item: Future[int]): Future[bool] {.async, gcsafe.} =
|
var iterable1 = @[1, 2, 3, 4, 5]
|
||||||
# discard)
|
iterator stream1(): Future[int] {.closure, gcsafe.} =
|
||||||
# .forEach(
|
for i in iterable1:
|
||||||
# proc(item: Future[int]): Future[void] {.async.} = discard)
|
yield i.toFuture
|
||||||
# result = true
|
|
||||||
|
|
||||||
# check:
|
var iterable2 = @[6, 7, 8]
|
||||||
# waitFor(test()) == true
|
iterator stream2(): Future[int] {.closure, gcsafe.} =
|
||||||
|
for i in iterable2:
|
||||||
|
yield i.toFuture
|
||||||
|
|
||||||
# test "zip":
|
var count = 0
|
||||||
# proc test(): Future[bool] {.async.} =
|
await stream1
|
||||||
# var iterable1 = @[1, 2, 3, 4, 5]
|
.zip(stream2)
|
||||||
# iterator stream1(): Future[int] {.closure, gcsafe.} =
|
.forEach(
|
||||||
# for i in iterable1:
|
proc(item: Future[(Future[int], Future[int])]):
|
||||||
# yield i.toFuture
|
Future[void] {.async.} =
|
||||||
|
var (a, b) = await item
|
||||||
|
check:
|
||||||
|
iterable1[count] == (await a)
|
||||||
|
iterable2[count] == (await b)
|
||||||
|
count.inc())
|
||||||
|
|
||||||
# var iterable2 = @[6, 7, 8]
|
result = true
|
||||||
# iterator stream2(): Future[int] {.closure, gcsafe.} =
|
|
||||||
# for i in iterable2:
|
|
||||||
# yield i.toFuture
|
|
||||||
|
|
||||||
# var count = 0
|
check:
|
||||||
# await stream1
|
waitFor(test()) == true
|
||||||
# .zip(stream2)
|
|
||||||
# .forEach(
|
|
||||||
# proc(item: Future[(Future[int], Future[int])]):
|
|
||||||
# Future[void] {.async.} =
|
|
||||||
# var (a, b) = await item
|
|
||||||
# check:
|
|
||||||
# iterable1[count] == (await a)
|
|
||||||
# iterable2[count] == (await b)
|
|
||||||
# count.inc())
|
|
||||||
|
|
||||||
# result = true
|
test "slice":
|
||||||
|
proc test(): Future[bool] {.async.} =
|
||||||
|
iterator stream(): Future[int] {.closure, gcsafe.} =
|
||||||
|
for i in @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]:
|
||||||
|
yield i.toFuture
|
||||||
|
|
||||||
# check:
|
var count = 4
|
||||||
# waitFor(test()) == true
|
await stream
|
||||||
|
.slice(4, 8)
|
||||||
|
.forEach(
|
||||||
|
proc(item: Future[int]): Future[void] {.async.} =
|
||||||
|
check:
|
||||||
|
count == await item
|
||||||
|
count.inc)
|
||||||
|
result = true
|
||||||
|
|
||||||
# test "slice":
|
check:
|
||||||
# proc test(): Future[bool] {.async.} =
|
waitFor(test()) == true
|
||||||
# iterator stream(): Future[int] {.closure, gcsafe.} =
|
|
||||||
# for i in @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]:
|
|
||||||
# yield i.toFuture
|
|
||||||
|
|
||||||
# var count = 4
|
test "delete":
|
||||||
# await stream
|
proc test(): Future[bool] {.async.} =
|
||||||
# .slice(4, 8)
|
iterator stream(): Future[int] {.closure, gcsafe.} =
|
||||||
# .forEach(
|
for i in @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]:
|
||||||
# proc(item: Future[int]): Future[void] {.async.} =
|
yield i.toFuture
|
||||||
# check:
|
|
||||||
# count == await item
|
|
||||||
# count.inc)
|
|
||||||
# result = true
|
|
||||||
|
|
||||||
# check:
|
var count = 5
|
||||||
# waitFor(test()) == true
|
await stream
|
||||||
|
.delete(5, 9)
|
||||||
|
.forEach(
|
||||||
|
proc(item: Future[int]): Future[void] {.async.} =
|
||||||
|
check:
|
||||||
|
count != await item
|
||||||
|
count.inc)
|
||||||
|
|
||||||
# test "delete":
|
result = count == 10
|
||||||
# proc test(): Future[bool] {.async.} =
|
|
||||||
# iterator stream(): Future[int] {.closure, gcsafe.} =
|
|
||||||
# for i in @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]:
|
|
||||||
# yield i.toFuture
|
|
||||||
|
|
||||||
# var count = 5
|
check:
|
||||||
# await stream
|
waitFor(test()) == true
|
||||||
# .delete(5, 9)
|
|
||||||
# .forEach(
|
|
||||||
# proc(item: Future[int]): Future[void] {.async.} =
|
|
||||||
# check:
|
|
||||||
# count != await item
|
|
||||||
# count.inc)
|
|
||||||
|
|
||||||
# result = count == 10
|
test "foldl":
|
||||||
|
proc test(): Future[bool] {.async.} =
|
||||||
|
iterator stream(): Future[int] {.closure, gcsafe.} =
|
||||||
|
for i in @[1, 2, 3, 4, 5]:
|
||||||
|
yield i.toFuture
|
||||||
|
|
||||||
# check:
|
var count = 1
|
||||||
# waitFor(test()) == true
|
var res = await stream.foldl(
|
||||||
|
proc(x: Future[int], y: Future[int]): Future[int] {.async, gcsafe.} =
|
||||||
|
result = ((await x) + (await y)),
|
||||||
|
0)
|
||||||
|
|
||||||
# test "foldl":
|
result = true
|
||||||
# proc test(): Future[bool] {.async.} =
|
|
||||||
# iterator stream(): Future[int] {.closure, gcsafe.} =
|
|
||||||
# for i in @[1, 2, 3, 4, 5]:
|
|
||||||
# yield i.toFuture
|
|
||||||
|
|
||||||
# var count = 1
|
check:
|
||||||
# var res = await stream.foldl(
|
waitFor(test()) == true
|
||||||
# proc(x: Future[int], y: Future[int]): Future[int] {.async, gcsafe.} =
|
|
||||||
# result = ((await x) + (await y)),
|
|
||||||
# 0)
|
|
||||||
|
|
||||||
# result = true
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# waitFor(test()) == true
|
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import stream
|
import stream, asynciters
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topic = "ChronosStream"
|
topic = "ChronosStream"
|
||||||
@ -36,7 +36,7 @@ proc init*(C: type[ChronosStream],
|
|||||||
method source*(c: ChronosStream): Source[seq[byte]] =
|
method source*(c: ChronosStream): Source[seq[byte]] =
|
||||||
return iterator(): Future[seq[byte]] =
|
return iterator(): Future[seq[byte]] =
|
||||||
while not c.reader.atEof():
|
while not c.reader.atEof():
|
||||||
yield c.reader.read(c.maxChunkSize)
|
yield c.reader.read(c.maxChunkSize)
|
||||||
|
|
||||||
method sink*(c: ChronosStream): Sink[seq[byte]] =
|
method sink*(c: ChronosStream): Sink[seq[byte]] =
|
||||||
return proc(i: Source[seq[byte]]) {.async.} =
|
return proc(i: Source[seq[byte]]) {.async.} =
|
||||||
@ -44,7 +44,7 @@ method sink*(c: ChronosStream): Sink[seq[byte]] =
|
|||||||
if c.closed:
|
if c.closed:
|
||||||
break
|
break
|
||||||
|
|
||||||
# saddly `await c.writer.write((await chunk))` breaks
|
# sadly `await c.writer.write((await chunk))` breaks
|
||||||
var cchunk = await chunk
|
var cchunk = await chunk
|
||||||
await c.writer.write(cchunk)
|
await c.writer.write(cchunk)
|
||||||
|
|
||||||
|
@ -9,11 +9,11 @@
|
|||||||
|
|
||||||
import chronos, chronicles, sequtils
|
import chronos, chronicles, sequtils
|
||||||
import transport,
|
import transport,
|
||||||
../wire,
|
../streams/connection,
|
||||||
../connection,
|
|
||||||
../multiaddress,
|
../multiaddress,
|
||||||
../multicodec,
|
../multicodec,
|
||||||
../stream/chronosstream
|
../streams/chronosstream,
|
||||||
|
../wire
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topic = "TcpTransport"
|
topic = "TcpTransport"
|
||||||
@ -21,9 +21,8 @@ logScope:
|
|||||||
type TcpTransport* = ref object of Transport
|
type TcpTransport* = ref object of Transport
|
||||||
server*: StreamServer
|
server*: StreamServer
|
||||||
|
|
||||||
proc cleanup(t: Transport, conn: Connection) {.async.} =
|
# proc cleanup(t: Transport, conn: Connection) {.async.} =
|
||||||
await conn.closeEvent.wait()
|
# t.connections.keepItIf(it != conn)
|
||||||
t.connections.keepItIf(it != conn)
|
|
||||||
|
|
||||||
proc connHandler*(t: Transport,
|
proc connHandler*(t: Transport,
|
||||||
server: StreamServer,
|
server: StreamServer,
|
||||||
@ -31,18 +30,18 @@ proc connHandler*(t: Transport,
|
|||||||
initiator: bool = false):
|
initiator: bool = false):
|
||||||
Future[Connection] {.async, gcsafe.} =
|
Future[Connection] {.async, gcsafe.} =
|
||||||
trace "handling connection for", address = $client.remoteAddress
|
trace "handling connection for", address = $client.remoteAddress
|
||||||
let conn: Connection = newConnection(newChronosStream(server, client))
|
let conn: Connection = Connection.init(ChronosStream.init(server, client))
|
||||||
conn.observedAddrs = MultiAddress.init(client.remoteAddress)
|
conn.observedAddr = MultiAddress.init(client.remoteAddress)
|
||||||
if not initiator:
|
if not initiator:
|
||||||
if not isNil(t.handler):
|
if not isNil(t.handler):
|
||||||
asyncCheck t.handler(conn)
|
asyncCheck t.handler(conn)
|
||||||
|
|
||||||
t.connections.add(conn)
|
t.connections.add(conn)
|
||||||
asyncCheck t.cleanup(conn)
|
# asyncCheck t.cleanup(conn)
|
||||||
|
|
||||||
result = conn
|
result = conn
|
||||||
|
|
||||||
proc connCb(server: StreamServer,
|
proc connCallback(server: StreamServer,
|
||||||
client: StreamTransport) {.async, gcsafe.} =
|
client: StreamTransport) {.async, gcsafe.} =
|
||||||
trace "incomming connection for", address = $client.remoteAddress
|
trace "incomming connection for", address = $client.remoteAddress
|
||||||
let t: Transport = cast[Transport](server.udata)
|
let t: Transport = cast[Transport](server.udata)
|
||||||
@ -71,7 +70,7 @@ method listen*(t: TcpTransport,
|
|||||||
discard await procCall Transport(t).listen(ma, handler) # call base
|
discard await procCall Transport(t).listen(ma, handler) # call base
|
||||||
|
|
||||||
## listen on the transport
|
## listen on the transport
|
||||||
t.server = createStreamServer(t.ma, connCb, {}, t)
|
t.server = createStreamServer(t.ma, connCallback, {}, t)
|
||||||
t.server.start()
|
t.server.start()
|
||||||
|
|
||||||
# always get the resolved address in case we're bound to 0.0.0.0:0
|
# always get the resolved address in case we're bound to 0.0.0.0:0
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
|
|
||||||
import sequtils
|
import sequtils
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import ../connection,
|
import ../streams/[connection, stream],
|
||||||
../multiaddress,
|
../multiaddress,
|
||||||
../multicodec,
|
../multicodec,
|
||||||
../errors
|
../errors
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import unittest
|
import unittest
|
||||||
import chronos
|
import chronos
|
||||||
import ../libp2p/stream/[lenprefixed, stream]
|
import ../libp2p/streams/[lenprefixed, stream]
|
||||||
|
|
||||||
suite "LenPrefixed stream":
|
suite "LenPrefixed stream":
|
||||||
test "encode":
|
test "encode":
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import unittest
|
import unittest
|
||||||
import chronos
|
import chronos
|
||||||
import ../libp2p/[connection,
|
import ../libp2p/[streams/stream,
|
||||||
|
streams/connection,
|
||||||
transports/transport,
|
transports/transport,
|
||||||
transports/tcptransport,
|
transports/tcptransport,
|
||||||
multiaddress,
|
multiaddress,
|
||||||
@ -9,127 +10,147 @@ import ../libp2p/[connection,
|
|||||||
when defined(nimHasUsed): {.used.}
|
when defined(nimHasUsed): {.used.}
|
||||||
|
|
||||||
suite "TCP transport":
|
suite "TCP transport":
|
||||||
test "test listener: handle write":
|
# test "test listener: handle write":
|
||||||
proc testListener(): Future[bool] {.async, gcsafe.} =
|
# proc test(): Future[bool] {.async.} =
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
# proc connHandler(conn: Connection) {.async.} =
|
||||||
result = conn.write(cstring("Hello!"), 6)
|
# iterator source(): Future[seq[byte]] {.closure.} =
|
||||||
|
# yield cast[seq[byte]]("Hello!").toFuture
|
||||||
|
|
||||||
let transport: TcpTransport = newTransport(TcpTransport)
|
# var sink = conn.sink()
|
||||||
asyncCheck await transport.listen(ma, connHandler)
|
# await source.sink()
|
||||||
let streamTransport: StreamTransport = await connect(transport.ma)
|
# await conn.close()
|
||||||
let msg = await streamTransport.read(6)
|
|
||||||
await transport.close()
|
|
||||||
await streamTransport.closeWait()
|
|
||||||
|
|
||||||
result = cast[string](msg) == "Hello!"
|
# let transport: TcpTransport = newTransport(TcpTransport)
|
||||||
|
# var transportFut = await transport.listen(ma, connHandler)
|
||||||
|
# let streamTransport: StreamTransport = await transport.ma.connect()
|
||||||
|
# let msg = await streamTransport.read(6)
|
||||||
|
|
||||||
check:
|
# await transport.close()
|
||||||
waitFor(testListener()) == true
|
# await streamTransport.closeWait()
|
||||||
|
# await transportFut
|
||||||
|
|
||||||
|
# result = cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
|
# check:
|
||||||
|
# waitFor(test()) == true
|
||||||
|
|
||||||
test "test listener: handle read":
|
test "test listener: handle read":
|
||||||
proc testListener(): Future[bool] {.async.} =
|
proc testListener(): Future[bool] {.async.} =
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
||||||
let msg = await conn.read(6)
|
var source = conn.source()
|
||||||
check cast[string](msg) == "Hello!"
|
for item in source:
|
||||||
|
var msg = await item
|
||||||
|
|
||||||
let transport: TcpTransport = newTransport(TcpTransport)
|
check:
|
||||||
asyncCheck await transport.listen(ma, connHandler)
|
cast[string](msg) == "Hello!"
|
||||||
let streamTransport: StreamTransport = await connect(transport.ma)
|
|
||||||
|
let transport = newTransport(TcpTransport)
|
||||||
|
let transportFut = await transport.listen(ma, connHandler)
|
||||||
|
let streamTransport = await connect(transport.ma)
|
||||||
let sent = await streamTransport.write("Hello!", 6)
|
let sent = await streamTransport.write("Hello!", 6)
|
||||||
result = sent == 6
|
result = sent == 6
|
||||||
|
|
||||||
|
await transport.close()
|
||||||
|
await streamTransport.closeWait()
|
||||||
|
await transportFut
|
||||||
|
|
||||||
check:
|
check:
|
||||||
waitFor(testListener()) == true
|
waitFor(testListener()) == true
|
||||||
|
|
||||||
test "test dialer: handle write":
|
# test "test dialer handle write":
|
||||||
proc testDialer(address: TransportAddress): Future[bool] {.async.} =
|
# proc testDialer(address: TransportAddress): Future[bool] {.async.} =
|
||||||
proc serveClient(server: StreamServer,
|
# proc serveClient(server: StreamServer,
|
||||||
transp: StreamTransport) {.async, gcsafe.} =
|
# transp: StreamTransport) {.async, gcsafe.} =
|
||||||
var wstream = newAsyncStreamWriter(transp)
|
# var wstream = newAsyncStreamWriter(transp)
|
||||||
await wstream.write("Hello!")
|
# await wstream.write("Hello!")
|
||||||
await wstream.finish()
|
|
||||||
await wstream.closeWait()
|
|
||||||
await transp.closeWait()
|
|
||||||
server.stop()
|
|
||||||
server.close()
|
|
||||||
|
|
||||||
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
# await wstream.finish()
|
||||||
server.start()
|
# await wstream.closeWait()
|
||||||
|
# await transp.closeWait()
|
||||||
|
# server.stop()
|
||||||
|
# server.close()
|
||||||
|
|
||||||
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress())
|
# var server = createStreamServer(address, serveClient)
|
||||||
let transport: TcpTransport = newTransport(TcpTransport)
|
# server.start()
|
||||||
let conn = await transport.dial(ma)
|
|
||||||
let msg = await conn.read(6)
|
|
||||||
result = cast[string](msg) == "Hello!"
|
|
||||||
|
|
||||||
server.stop()
|
# let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress())
|
||||||
server.close()
|
# let transport: TcpTransport = newTransport(TcpTransport)
|
||||||
await server.join()
|
# let conn = await transport.dial(ma)
|
||||||
check waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
|
# let source = conn.source()
|
||||||
|
# for item in source:
|
||||||
|
# let msg = await item
|
||||||
|
# result = cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
test "test dialer: handle write":
|
# await conn.close()
|
||||||
proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} =
|
# server.stop()
|
||||||
proc serveClient(server: StreamServer,
|
# server.close()
|
||||||
transp: StreamTransport) {.async, gcsafe.} =
|
# await server.join()
|
||||||
var rstream = newAsyncStreamReader(transp)
|
# check:
|
||||||
let msg = await rstream.read(6)
|
# waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
|
||||||
check cast[string](msg) == "Hello!"
|
|
||||||
|
|
||||||
await rstream.closeWait()
|
# test "test dialer: handle write":
|
||||||
await transp.closeWait()
|
# proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} =
|
||||||
server.stop()
|
# proc serveClient(server: StreamServer,
|
||||||
server.close()
|
# transp: StreamTransport) {.async, gcsafe.} =
|
||||||
|
# var rstream = newAsyncStreamReader(transp)
|
||||||
|
# let msg = await rstream.read(6)
|
||||||
|
# check cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
# await rstream.closeWait()
|
||||||
server.start()
|
# await transp.closeWait()
|
||||||
|
# server.stop()
|
||||||
|
# server.close()
|
||||||
|
|
||||||
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress())
|
# var server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||||
let transport: TcpTransport = newTransport(TcpTransport)
|
# server.start()
|
||||||
let conn = await transport.dial(ma)
|
|
||||||
await conn.write(cstring("Hello!"), 6)
|
|
||||||
result = true
|
|
||||||
|
|
||||||
server.stop()
|
# let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress())
|
||||||
server.close()
|
# let transport: TcpTransport = newTransport(TcpTransport)
|
||||||
await server.join()
|
# let conn = await transport.dial(ma)
|
||||||
check waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
|
# await conn.write(cstring("Hello!"), 6)
|
||||||
|
# result = true
|
||||||
|
|
||||||
test "e2e: handle write":
|
# server.stop()
|
||||||
proc testListenerDialer(): Future[bool] {.async.} =
|
# server.close()
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
# await server.join()
|
||||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
# check waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
|
||||||
result = conn.write(cstring("Hello!"), 6)
|
|
||||||
|
|
||||||
let transport1: TcpTransport = newTransport(TcpTransport)
|
# test "e2e: handle write":
|
||||||
asyncCheck await transport1.listen(ma, connHandler)
|
# proc testListenerDialer(): Future[bool] {.async.} =
|
||||||
|
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||||
|
# proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
||||||
|
# result = conn.write(cstring("Hello!"), 6)
|
||||||
|
|
||||||
let transport2: TcpTransport = newTransport(TcpTransport)
|
# let transport1: TcpTransport = newTransport(TcpTransport)
|
||||||
let conn = await transport2.dial(transport1.ma)
|
# asyncCheck await transport1.listen(ma, connHandler)
|
||||||
let msg = await conn.read(6)
|
|
||||||
await transport1.close()
|
|
||||||
|
|
||||||
result = cast[string](msg) == "Hello!"
|
# let transport2: TcpTransport = newTransport(TcpTransport)
|
||||||
|
# let conn = await transport2.dial(transport1.ma)
|
||||||
|
# let msg = await conn.read(6)
|
||||||
|
# await transport1.close()
|
||||||
|
|
||||||
check:
|
# result = cast[string](msg) == "Hello!"
|
||||||
waitFor(testListenerDialer()) == true
|
|
||||||
|
|
||||||
test "e2e: handle read":
|
# check:
|
||||||
proc testListenerDialer(): Future[bool] {.async.} =
|
# waitFor(testListenerDialer()) == true
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
|
||||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
|
||||||
let msg = await conn.read(6)
|
|
||||||
check cast[string](msg) == "Hello!"
|
|
||||||
|
|
||||||
let transport1: TcpTransport = newTransport(TcpTransport)
|
# test "e2e: handle read":
|
||||||
asyncCheck await transport1.listen(ma, connHandler)
|
# proc testListenerDialer(): Future[bool] {.async.} =
|
||||||
|
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
|
||||||
|
# proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
||||||
|
# let msg = await conn.read(6)
|
||||||
|
# check cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
let transport2: TcpTransport = newTransport(TcpTransport)
|
# let transport1: TcpTransport = newTransport(TcpTransport)
|
||||||
let conn = await transport2.dial(transport1.ma)
|
# asyncCheck await transport1.listen(ma, connHandler)
|
||||||
await conn.write(cstring("Hello!"), 6)
|
|
||||||
await transport1.close()
|
|
||||||
result = true
|
|
||||||
|
|
||||||
check:
|
# let transport2: TcpTransport = newTransport(TcpTransport)
|
||||||
waitFor(testListenerDialer()) == true
|
# let conn = await transport2.dial(transport1.ma)
|
||||||
|
# await conn.write(cstring("Hello!"), 6)
|
||||||
|
# await transport1.close()
|
||||||
|
# result = true
|
||||||
|
|
||||||
|
# check:
|
||||||
|
# waitFor(testListenerDialer()) == true
|
||||||
|
Loading…
x
Reference in New Issue
Block a user