diff --git a/libp2p/streams/stream.nim b/libp2p/streams/stream.nim index f5a384a7b..f5ebeb07a 100644 --- a/libp2p/streams/stream.nim +++ b/libp2p/streams/stream.nim @@ -7,7 +7,6 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import macros import chronos # TODO: Rework without methods @@ -20,7 +19,6 @@ type Stream*[T] = ref object of RootObj name*: string - closed*: bool eof*: bool sourceImpl*: proc (s: Stream[T]): Source[T] {.gcsafe.} sinkImpl*: proc(s: Stream[T]): Sink[T] {.gcsafe.} @@ -44,42 +42,5 @@ proc atEof*[T](s: Stream[T]): bool = s.eof proc close*[T](s: Stream[T]) {.async.} = - s.closed = true - -template toFuture*[T](v: T): Future[T] = - var fut = newFuture[T]() - fut.complete(v) - fut - -proc toThrough*[T](s: Stream[T]): Through[T] = - proc sinkit(i: Source[T]) {.async.} = - await s.sink()(i) - - return proc(i: Source[T]): Source[T] = - asyncCheck sinkit(i) - s.source() - -template pipe*[T](s: Stream[T] | Source[T], - t: varargs[Through[T]]): Source[T] = - var pipeline: Source[T] - when s is Source[T]: - pipeline = s - elif s is Stream[T]: - pipeline = s.source - - for i in t: - pipeline = i(pipeline) - - pipeline - -template pipe*[T](s: Stream[T] | Source[T], - t: varargs[Through[T]], - x: Stream[T] | Sink[T]): Future[void] = - var pipeline = pipe(s, t) - var terminal: Future[void] - when x is Stream[T]: - terminal = x.sink()(pipeline) - elif x is Sink[T]: - terminal = x(pipeline) - - terminal + # close is called externally + s.eof = true