mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-27 12:00:44 +00:00
wip: rework with async iterators
This commit is contained in:
parent
e8b33c64fa
commit
a644a19a2d
@ -1,185 +0,0 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2019 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import oids
|
||||
import chronos, chronicles, metrics
|
||||
import peerinfo,
|
||||
multiaddress,
|
||||
stream/lpstream,
|
||||
peerinfo,
|
||||
varint,
|
||||
vbuffer
|
||||
|
||||
logScope:
|
||||
topic = "Connection"
|
||||
|
||||
const DefaultReadSize* = 1 shl 20
|
||||
|
||||
type
|
||||
Connection* = ref object of LPStream
|
||||
peerInfo*: PeerInfo
|
||||
stream*: LPStream
|
||||
observedAddrs*: Multiaddress
|
||||
|
||||
InvalidVarintException = object of LPStreamError
|
||||
InvalidVarintSizeException = object of LPStreamError
|
||||
|
||||
declareGauge libp2p_open_connection, "open Connection instances"
|
||||
|
||||
proc newInvalidVarintException*(): ref InvalidVarintException =
|
||||
newException(InvalidVarintException, "Unable to parse varint")
|
||||
|
||||
proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException =
|
||||
newException(InvalidVarintSizeException, "Wrong varint size")
|
||||
|
||||
proc bindStreamClose(conn: Connection) {.async.} =
|
||||
# bind stream's close event to connection's close
|
||||
# to ensure correct close propagation
|
||||
if not isNil(conn.stream.closeEvent):
|
||||
await conn.stream.closeEvent.wait()
|
||||
trace "wrapped stream closed, about to close conn", closed = conn.isClosed,
|
||||
peer = if not isNil(conn.peerInfo):
|
||||
conn.peerInfo.id else: ""
|
||||
if not conn.isClosed:
|
||||
trace "wrapped stream closed, closing conn", closed = conn.isClosed,
|
||||
peer = if not isNil(conn.peerInfo):
|
||||
conn.peerInfo.id else: ""
|
||||
asyncCheck conn.close()
|
||||
|
||||
proc init*[T: Connection](self: var T, stream: LPStream): T =
|
||||
## create a new Connection for the specified async reader/writer
|
||||
new self
|
||||
self.stream = stream
|
||||
self.closeEvent = newAsyncEvent()
|
||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||
self.oid = genOid()
|
||||
asyncCheck self.bindStreamClose()
|
||||
libp2p_open_connection.inc()
|
||||
|
||||
return self
|
||||
|
||||
proc newConnection*(stream: LPStream): Connection =
|
||||
## create a new Connection for the specified async reader/writer
|
||||
result.init(stream)
|
||||
|
||||
method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} =
|
||||
s.stream.read(n)
|
||||
|
||||
method readExactly*(s: Connection,
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[void] {.gcsafe.} =
|
||||
s.stream.readExactly(pbytes, nbytes)
|
||||
|
||||
method readLine*(s: Connection,
|
||||
limit = 0,
|
||||
sep = "\r\n"):
|
||||
Future[string] {.gcsafe.} =
|
||||
s.stream.readLine(limit, sep)
|
||||
|
||||
method readOnce*(s: Connection,
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[int] {.gcsafe.} =
|
||||
s.stream.readOnce(pbytes, nbytes)
|
||||
|
||||
method readUntil*(s: Connection,
|
||||
pbytes: pointer,
|
||||
nbytes: int,
|
||||
sep: seq[byte]):
|
||||
Future[int] {.gcsafe.} =
|
||||
s.stream.readUntil(pbytes, nbytes, sep)
|
||||
|
||||
method write*(s: Connection,
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[void] {.gcsafe.} =
|
||||
s.stream.write(pbytes, nbytes)
|
||||
|
||||
method write*(s: Connection,
|
||||
msg: string,
|
||||
msglen = -1):
|
||||
Future[void] {.gcsafe.} =
|
||||
s.stream.write(msg, msglen)
|
||||
|
||||
method write*(s: Connection,
|
||||
msg: seq[byte],
|
||||
msglen = -1):
|
||||
Future[void] {.gcsafe.} =
|
||||
s.stream.write(msg, msglen)
|
||||
|
||||
method closed*(s: Connection): bool =
|
||||
if isNil(s.stream):
|
||||
return false
|
||||
|
||||
result = s.stream.closed
|
||||
|
||||
method close*(s: Connection) {.async, gcsafe.} =
|
||||
if not s.closed:
|
||||
trace "about to close connection", closed = s.closed,
|
||||
peer = if not isNil(s.peerInfo):
|
||||
s.peerInfo.id else: ""
|
||||
|
||||
if not isNil(s.stream) and not s.stream.closed:
|
||||
trace "closing child stream", closed = s.closed,
|
||||
peer = if not isNil(s.peerInfo):
|
||||
s.peerInfo.id else: ""
|
||||
await s.stream.close()
|
||||
|
||||
s.closeEvent.fire()
|
||||
s.isClosed = true
|
||||
|
||||
trace "connection closed", closed = s.closed,
|
||||
peer = if not isNil(s.peerInfo):
|
||||
s.peerInfo.id else: ""
|
||||
libp2p_open_connection.dec()
|
||||
|
||||
proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
|
||||
## read lenght prefixed msg
|
||||
var
|
||||
size: uint
|
||||
length: int
|
||||
res: VarintStatus
|
||||
buff = newSeq[byte](10)
|
||||
try:
|
||||
for i in 0..<len(buff):
|
||||
await s.readExactly(addr buff[i], 1)
|
||||
res = LP.getUVarint(buff.toOpenArray(0, i), length, size)
|
||||
if res == VarintStatus.Success:
|
||||
break
|
||||
if res != VarintStatus.Success:
|
||||
raise newInvalidVarintException()
|
||||
if size.int > DefaultReadSize:
|
||||
raise newInvalidVarintSizeException()
|
||||
buff.setLen(size)
|
||||
if size > 0.uint:
|
||||
trace "reading exact bytes from stream", size = size
|
||||
await s.readExactly(addr buff[0], int(size))
|
||||
return buff
|
||||
except LPStreamIncompleteError as exc:
|
||||
trace "remote connection ended unexpectedly", exc = exc.msg
|
||||
raise exc
|
||||
except LPStreamReadError as exc:
|
||||
trace "couldn't read from stream", exc = exc.msg
|
||||
raise exc
|
||||
|
||||
proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} =
|
||||
## write lenght prefixed
|
||||
var buf = initVBuffer()
|
||||
buf.writeSeq(msg)
|
||||
buf.finish()
|
||||
s.write(buf.buffer)
|
||||
|
||||
method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcsafe.} =
|
||||
## get resolved multiaddresses for the connection
|
||||
result = c.observedAddrs
|
||||
|
||||
proc `$`*(conn: Connection): string =
|
||||
if not isNil(conn.peerInfo):
|
||||
result = $(conn.peerInfo)
|
@ -1,403 +0,0 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2019 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
## This module implements an asynchronous buffer stream
|
||||
## which emulates physical async IO.
|
||||
##
|
||||
## The stream is based on the standard library's `Deque`,
|
||||
## which is itself based on a ring buffer.
|
||||
##
|
||||
## It works by exposing a regular LPStream interface and
|
||||
## a method ``pushTo`` to push data to the internal read
|
||||
## buffer; as well as a handler that can be registrered
|
||||
## that gets triggered on every write to the stream. This
|
||||
## allows using the buffered stream as a sort of proxy,
|
||||
## which can be consumed as a regular LPStream but allows
|
||||
## injecting data for reads and intercepting writes.
|
||||
##
|
||||
## Another notable feature is that the stream is fully
|
||||
## ordered and asynchronous. Reads are queued up in order
|
||||
## and are suspended when not enough data available. This
|
||||
## allows preserving backpressure while maintaining full
|
||||
## asynchrony. Both writting to the internal buffer with
|
||||
## ``pushTo`` as well as reading with ``read*` methods,
|
||||
## will suspend until either the amount of elements in the
|
||||
## buffer goes below ``maxSize`` or more data becomes available.
|
||||
|
||||
import deques, math, oids
|
||||
import chronos, chronicles, metrics
|
||||
import ../stream/lpstream
|
||||
|
||||
const DefaultBufferSize* = 1024
|
||||
|
||||
type
|
||||
# TODO: figure out how to make this generic to avoid casts
|
||||
WriteHandler* = proc (data: seq[byte]): Future[void] {.gcsafe.}
|
||||
|
||||
BufferStream* = ref object of LPStream
|
||||
maxSize*: int # buffer's max size in bytes
|
||||
readBuf: Deque[byte] # this is a ring buffer based dequeue, this makes it perfect as the backing store here
|
||||
readReqs: Deque[Future[void]] # use dequeue to fire reads in order
|
||||
dataReadEvent: AsyncEvent
|
||||
writeHandler*: WriteHandler
|
||||
lock: AsyncLock
|
||||
isPiped: bool
|
||||
|
||||
AlreadyPipedError* = object of CatchableError
|
||||
NotWritableError* = object of CatchableError
|
||||
|
||||
declareGauge libp2p_open_bufferstream, "open BufferStream instances"
|
||||
|
||||
proc newAlreadyPipedError*(): ref Exception {.inline.} =
|
||||
result = newException(AlreadyPipedError, "stream already piped")
|
||||
|
||||
proc newNotWritableError*(): ref Exception {.inline.} =
|
||||
result = newException(NotWritableError, "stream is not writable")
|
||||
|
||||
proc requestReadBytes(s: BufferStream): Future[void] =
|
||||
## create a future that will complete when more
|
||||
## data becomes available in the read buffer
|
||||
result = newFuture[void]()
|
||||
s.readReqs.addLast(result)
|
||||
trace "requestReadBytes(): added a future to readReqs"
|
||||
|
||||
proc initBufferStream*(s: BufferStream,
|
||||
handler: WriteHandler = nil,
|
||||
size: int = DefaultBufferSize) =
|
||||
s.maxSize = if isPowerOfTwo(size): size else: nextPowerOfTwo(size)
|
||||
s.readBuf = initDeque[byte](s.maxSize)
|
||||
s.readReqs = initDeque[Future[void]]()
|
||||
s.dataReadEvent = newAsyncEvent()
|
||||
s.lock = newAsyncLock()
|
||||
s.writeHandler = handler
|
||||
s.closeEvent = newAsyncEvent()
|
||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||
s.oid = genOid()
|
||||
s.isClosed = false
|
||||
libp2p_open_bufferstream.inc()
|
||||
|
||||
proc newBufferStream*(handler: WriteHandler = nil,
|
||||
size: int = DefaultBufferSize): BufferStream =
|
||||
new result
|
||||
result.initBufferStream(handler, size)
|
||||
|
||||
proc popFirst*(s: BufferStream): byte =
|
||||
result = s.readBuf.popFirst()
|
||||
s.dataReadEvent.fire()
|
||||
|
||||
proc popLast*(s: BufferStream): byte =
|
||||
result = s.readBuf.popLast()
|
||||
s.dataReadEvent.fire()
|
||||
|
||||
proc shrink(s: BufferStream, fromFirst = 0, fromLast = 0) =
|
||||
s.readBuf.shrink(fromFirst, fromLast)
|
||||
s.dataReadEvent.fire()
|
||||
|
||||
proc len*(s: BufferStream): int = s.readBuf.len
|
||||
|
||||
proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} =
|
||||
## Write bytes to internal read buffer, use this to fill up the
|
||||
## buffer with data.
|
||||
##
|
||||
## This method is async and will wait until all data has been
|
||||
## written to the internal buffer; this is done so that backpressure
|
||||
## is preserved.
|
||||
##
|
||||
|
||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||
logScope:
|
||||
stream_oid = $s.oid
|
||||
|
||||
try:
|
||||
await s.lock.acquire()
|
||||
var index = 0
|
||||
while not s.closed():
|
||||
while index < data.len and s.readBuf.len < s.maxSize:
|
||||
s.readBuf.addLast(data[index])
|
||||
inc(index)
|
||||
trace "pushTo()", msg = "added " & $index & " bytes to readBuf"
|
||||
|
||||
# resolve the next queued read request
|
||||
if s.readReqs.len > 0:
|
||||
s.readReqs.popFirst().complete()
|
||||
trace "pushTo(): completed a readReqs future"
|
||||
|
||||
if index >= data.len:
|
||||
return
|
||||
|
||||
# if we couldn't transfer all the data to the
|
||||
# internal buf wait on a read event
|
||||
await s.dataReadEvent.wait()
|
||||
s.dataReadEvent.clear()
|
||||
finally:
|
||||
s.lock.release()
|
||||
|
||||
method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async.} =
|
||||
## Read all bytes (n <= 0) or exactly `n` bytes from buffer
|
||||
##
|
||||
## This procedure allocates buffer seq[byte] and return it as result.
|
||||
##
|
||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||
logScope:
|
||||
stream_oid = $s.oid
|
||||
|
||||
trace "read()", requested_bytes = n
|
||||
var size = if n > 0: n else: s.readBuf.len()
|
||||
var index = 0
|
||||
|
||||
if s.readBuf.len() == 0:
|
||||
await s.requestReadBytes()
|
||||
|
||||
while index < size:
|
||||
while s.readBuf.len() > 0 and index < size:
|
||||
result.add(s.popFirst())
|
||||
inc(index)
|
||||
trace "read()", read_bytes = index
|
||||
|
||||
if index < size:
|
||||
await s.requestReadBytes()
|
||||
|
||||
method readExactly*(s: BufferStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[void] {.async.} =
|
||||
## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store
|
||||
## it to ``pbytes``.
|
||||
##
|
||||
## If EOF is received and ``nbytes`` is not yet read, the procedure
|
||||
## will raise ``LPStreamIncompleteError``.
|
||||
##
|
||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||
logScope:
|
||||
stream_oid = $s.oid
|
||||
|
||||
var buff: seq[byte]
|
||||
try:
|
||||
buff = await s.read(nbytes)
|
||||
except LPStreamEOFError as exc:
|
||||
trace "Exception occured", exc = exc.msg
|
||||
|
||||
if nbytes > buff.len():
|
||||
raise newLPStreamIncompleteError()
|
||||
|
||||
copyMem(pbytes, addr buff[0], nbytes)
|
||||
|
||||
method readLine*(s: BufferStream,
|
||||
limit = 0,
|
||||
sep = "\r\n"):
|
||||
Future[string] {.async.} =
|
||||
## Read one line from read-only stream ``rstream``, where ``"line"`` is a
|
||||
## sequence of bytes ending with ``sep`` (default is ``"\r\n"``).
|
||||
##
|
||||
## If EOF is received, and ``sep`` was not found, the method will return the
|
||||
## partial read bytes.
|
||||
##
|
||||
## If the EOF was received and the internal buffer is empty, return an
|
||||
## empty string.
|
||||
##
|
||||
## If ``limit`` more then 0, then result string will be limited to ``limit``
|
||||
## bytes.
|
||||
##
|
||||
result = ""
|
||||
var lim = if limit <= 0: -1 else: limit
|
||||
var state = 0
|
||||
var index = 0
|
||||
|
||||
index = 0
|
||||
while index < s.readBuf.len:
|
||||
let ch = char(s.readBuf[index])
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
if state == len(sep):
|
||||
s.shrink(index + 1)
|
||||
break
|
||||
else:
|
||||
state = 0
|
||||
result.add(ch)
|
||||
if len(result) == lim:
|
||||
s.shrink(index + 1)
|
||||
break
|
||||
inc(index)
|
||||
|
||||
method readOnce*(s: BufferStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[int] {.async.} =
|
||||
## Perform one read operation on read-only stream ``rstream``.
|
||||
##
|
||||
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
|
||||
## internal buffer, otherwise it will wait until some bytes will be received.
|
||||
##
|
||||
if s.readBuf.len == 0:
|
||||
await s.requestReadBytes()
|
||||
|
||||
var len = if nbytes > s.readBuf.len: s.readBuf.len else: nbytes
|
||||
await s.readExactly(pbytes, len)
|
||||
result = len
|
||||
|
||||
method readUntil*(s: BufferStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int,
|
||||
sep: seq[byte]):
|
||||
Future[int] {.async.} =
|
||||
## Read data from the read-only stream ``rstream`` until separator ``sep`` is
|
||||
## found.
|
||||
##
|
||||
## On success, the data and separator will be removed from the internal
|
||||
## buffer (consumed). Returned data will include the separator at the end.
|
||||
##
|
||||
## If EOF is received, and `sep` was not found, procedure will raise
|
||||
## ``LPStreamIncompleteError``.
|
||||
##
|
||||
## If ``nbytes`` bytes has been received and `sep` was not found, procedure
|
||||
## will raise ``LPStreamLimitError``.
|
||||
##
|
||||
## Procedure returns actual number of bytes read.
|
||||
##
|
||||
var
|
||||
dest = cast[ptr UncheckedArray[byte]](pbytes)
|
||||
state = 0
|
||||
k = 0
|
||||
|
||||
let datalen = s.readBuf.len()
|
||||
if datalen == 0 and s.readBuf.len() == 0:
|
||||
raise newLPStreamIncompleteError()
|
||||
|
||||
var index = 0
|
||||
while index < datalen:
|
||||
let ch = s.readBuf[index]
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
else:
|
||||
state = 0
|
||||
if k < nbytes:
|
||||
dest[k] = ch
|
||||
inc(k)
|
||||
else:
|
||||
raise newLPStreamLimitError()
|
||||
if state == len(sep):
|
||||
break
|
||||
inc(index)
|
||||
|
||||
if state == len(sep):
|
||||
s.shrink(index + 1)
|
||||
result = k
|
||||
else:
|
||||
s.shrink(datalen)
|
||||
|
||||
method write*(s: BufferStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int): Future[void] =
|
||||
## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream
|
||||
## ``rstream``.
|
||||
##
|
||||
## Return number of bytes actually consumed (discarded).
|
||||
##
|
||||
if isNil(s.writeHandler):
|
||||
var retFuture = newFuture[void]("BufferStream.write(pointer)")
|
||||
retFuture.fail(newNotWritableError())
|
||||
return retFuture
|
||||
|
||||
var buf: seq[byte] = newSeq[byte](nbytes)
|
||||
copyMem(addr buf[0], pbytes, nbytes)
|
||||
result = s.writeHandler(buf)
|
||||
|
||||
method write*(s: BufferStream,
|
||||
msg: string,
|
||||
msglen = -1): Future[void] =
|
||||
## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``.
|
||||
##
|
||||
## String ``sbytes`` must not be zero-length.
|
||||
##
|
||||
## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream.
|
||||
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
|
||||
## stream.
|
||||
##
|
||||
if isNil(s.writeHandler):
|
||||
var retFuture = newFuture[void]("BufferStream.write(string)")
|
||||
retFuture.fail(newNotWritableError())
|
||||
return retFuture
|
||||
|
||||
var buf = ""
|
||||
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
|
||||
result = s.writeHandler(cast[seq[byte]](buf))
|
||||
|
||||
method write*(s: BufferStream,
|
||||
msg: seq[byte],
|
||||
msglen = -1): Future[void] =
|
||||
## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer
|
||||
## stream ``wstream``.
|
||||
##
|
||||
## Sequence of bytes ``sbytes`` must not be zero-length.
|
||||
##
|
||||
## If ``msglen < 0`` whole sequence ``sbytes`` will be writen to stream.
|
||||
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
|
||||
## stream.
|
||||
##
|
||||
if isNil(s.writeHandler):
|
||||
var retFuture = newFuture[void]("BufferStream.write(seq)")
|
||||
retFuture.fail(newNotWritableError())
|
||||
return retFuture
|
||||
|
||||
var buf: seq[byte]
|
||||
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
|
||||
result = s.writeHandler(buf)
|
||||
|
||||
proc pipe*(s: BufferStream,
|
||||
target: BufferStream): BufferStream =
|
||||
## pipe the write end of this stream to
|
||||
## be the source of the target stream
|
||||
##
|
||||
## Note that this only works with the LPStream
|
||||
## interface methods `read*` and `write` are
|
||||
## piped.
|
||||
##
|
||||
if s.isPiped:
|
||||
raise newAlreadyPipedError()
|
||||
|
||||
s.isPiped = true
|
||||
let oldHandler = target.writeHandler
|
||||
proc handler(data: seq[byte]) {.async, closure, gcsafe.} =
|
||||
if not isNil(oldHandler):
|
||||
await oldHandler(data)
|
||||
|
||||
# if we're piping to self,
|
||||
# then add the data to the
|
||||
# buffer directly and fire
|
||||
# the read event
|
||||
if s == target:
|
||||
for b in data:
|
||||
s.readBuf.addLast(b)
|
||||
|
||||
# notify main loop of available
|
||||
# data
|
||||
s.dataReadEvent.fire()
|
||||
else:
|
||||
await target.pushTo(data)
|
||||
|
||||
s.writeHandler = handler
|
||||
result = target
|
||||
|
||||
proc `|`*(s: BufferStream, target: BufferStream): BufferStream =
|
||||
## pipe operator to make piping less verbose
|
||||
pipe(s, target)
|
||||
|
||||
method close*(s: BufferStream) {.async.} =
|
||||
## close the stream and clear the buffer
|
||||
if not s.isClosed:
|
||||
trace "closing bufferstream"
|
||||
for r in s.readReqs:
|
||||
if not(isNil(r)) and not(r.finished()):
|
||||
r.fail(newLPStreamEOFError())
|
||||
s.dataReadEvent.fire()
|
||||
s.readBuf.clear()
|
||||
s.closeEvent.fire()
|
||||
s.isClosed = true
|
||||
libp2p_open_bufferstream.dec()
|
||||
|
@ -8,113 +8,43 @@
|
||||
## those terms.
|
||||
|
||||
import chronos, chronicles
|
||||
import lpstream
|
||||
|
||||
logScope:
|
||||
topic = "ChronosStream"
|
||||
|
||||
type ChronosStream* = ref object of LPStream
|
||||
const DefaultChunkSize* = 1 shl 20 # 1MB
|
||||
|
||||
type ChronosStream* = ref object
|
||||
reader: AsyncStreamReader
|
||||
writer: AsyncStreamWriter
|
||||
server: StreamServer
|
||||
client: StreamTransport
|
||||
maxChunkSize: int
|
||||
closed: bool
|
||||
|
||||
proc newChronosStream*(server: StreamServer,
|
||||
client: StreamTransport): ChronosStream =
|
||||
new result
|
||||
result.server = server
|
||||
result.client = client
|
||||
result.reader = newAsyncStreamReader(client)
|
||||
result.writer = newAsyncStreamWriter(client)
|
||||
result.closeEvent = newAsyncEvent()
|
||||
proc init*[T](c: type[ChronosStream],
|
||||
server: StreamServer,
|
||||
client: StreamTransport,
|
||||
maxChunkSize = DefaultChunkSize): c =
|
||||
|
||||
template withExceptions(body: untyped) =
|
||||
try:
|
||||
body
|
||||
except TransportIncompleteError:
|
||||
raise newLPStreamIncompleteError()
|
||||
except TransportLimitError:
|
||||
raise newLPStreamLimitError()
|
||||
except TransportError as exc:
|
||||
raise newLPStreamIncorrectError(exc.msg)
|
||||
except AsyncStreamIncompleteError:
|
||||
raise newLPStreamIncompleteError()
|
||||
ChronosStream(server: server,
|
||||
client: client,
|
||||
reader: newAsyncStreamReader(client),
|
||||
writer: newAsyncStreamWriter(client),
|
||||
maxChunkSize)
|
||||
|
||||
method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} =
|
||||
if s.reader.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
proc close*(c: ChronosStream) =
|
||||
c.closed = true
|
||||
|
||||
withExceptions:
|
||||
result = await s.reader.read(n)
|
||||
iterator source*(c: ChronosStream, size: int = c.maxChunkSize): Future[seq[byte]] =
|
||||
while not c.reader.atEof():
|
||||
yield c.reader.read(c.maxChunkSize)
|
||||
|
||||
method readExactly*(s: ChronosStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int): Future[void] {.async.} =
|
||||
if s.reader.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
proc sink*(c: ChronosStream,
|
||||
iter: iterator(): Future[seq[byte]]):
|
||||
Future[void] {.async.}=
|
||||
for chunk in iter():
|
||||
if c.closed:
|
||||
break
|
||||
|
||||
withExceptions:
|
||||
await s.reader.readExactly(pbytes, nbytes)
|
||||
|
||||
method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} =
|
||||
if s.reader.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
withExceptions:
|
||||
result = await s.reader.readLine(limit, sep)
|
||||
|
||||
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
||||
if s.reader.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
withExceptions:
|
||||
result = await s.reader.readOnce(pbytes, nbytes)
|
||||
|
||||
method readUntil*(s: ChronosStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int,
|
||||
sep: seq[byte]): Future[int] {.async.} =
|
||||
if s.reader.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
withExceptions:
|
||||
result = await s.reader.readUntil(pbytes, nbytes, sep)
|
||||
|
||||
method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} =
|
||||
if s.writer.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
withExceptions:
|
||||
await s.writer.write(pbytes, nbytes)
|
||||
|
||||
method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} =
|
||||
if s.writer.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
withExceptions:
|
||||
await s.writer.write(msg, msglen)
|
||||
|
||||
method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} =
|
||||
if s.writer.atEof:
|
||||
raise newLPStreamEOFError()
|
||||
|
||||
withExceptions:
|
||||
await s.writer.write(msg, msglen)
|
||||
|
||||
method closed*(s: ChronosStream): bool {.inline.} =
|
||||
# TODO: we might only need to check for reader's EOF
|
||||
result = s.reader.atEof()
|
||||
|
||||
method close*(s: ChronosStream) {.async.} =
|
||||
if not s.closed:
|
||||
trace "shutting chronos stream", address = $s.client.remoteAddress()
|
||||
if not s.writer.closed():
|
||||
await s.writer.closeWait()
|
||||
|
||||
if not s.reader.closed():
|
||||
await s.reader.closeWait()
|
||||
|
||||
if not s.client.closed():
|
||||
await s.client.closeWait()
|
||||
|
||||
s.closeEvent.fire()
|
||||
await c.writer.write((await chunk))
|
||||
|
106
libp2p/stream/lenprefixed.nim
Normal file
106
libp2p/stream/lenprefixed.nim
Normal file
@ -0,0 +1,106 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2020 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import chronos, chronicles
|
||||
import ringbuffer,
|
||||
../varint,
|
||||
../vbuffer
|
||||
|
||||
const
|
||||
DefaultBuffSize* = 1024
|
||||
SafeVarintSize* = 4
|
||||
|
||||
type
|
||||
LenPrefixed* = ref object
|
||||
readBuff: RingBuffer[byte]
|
||||
writeBuff: RingBuffer[byte]
|
||||
mode: Mode
|
||||
size: int
|
||||
|
||||
Mode {.pure.} = enum Decoding, Reading
|
||||
|
||||
InvalidVarintException* = object of CatchableError
|
||||
InvalidVarintSizeException* = object of CatchableError
|
||||
|
||||
proc newInvalidVarintException*(): ref InvalidVarintException =
|
||||
newException(InvalidVarintException, "Unable to parse varint")
|
||||
|
||||
proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException =
|
||||
newException(InvalidVarintSizeException, "Wrong varint size")
|
||||
|
||||
proc init*(lp: type[LenPrefixed], maxSize: int = DefaultBuffSize): lp =
|
||||
LenPrefixed(readBuff: RingBuffer[byte].init(maxSize),
|
||||
writeBuff: RingBuffer[byte].init(maxSize),
|
||||
mode: Mode.Decoding)
|
||||
|
||||
proc decodeLen(lp: LenPrefixed): int =
|
||||
var
|
||||
size: uint
|
||||
length: int
|
||||
res: VarintStatus
|
||||
buff: seq[byte]
|
||||
i: int
|
||||
while true:
|
||||
buff.add(lp.readBuff.read(1))
|
||||
res = LP.getUVarint(buff, length, size)
|
||||
i.inc
|
||||
|
||||
if res == VarintStatus.Success:
|
||||
break
|
||||
|
||||
if buff.len > SafeVarintSize:
|
||||
raise newInvalidVarintSizeException()
|
||||
|
||||
return size.int
|
||||
|
||||
proc read(lp: LenPrefixed,
|
||||
chunk: Future[seq[byte]]):
|
||||
Future[seq[byte]] {.async, gcsafe.} =
|
||||
try:
|
||||
lp.readBuff.append((await chunk))
|
||||
|
||||
while lp.readBuff.len > 0:
|
||||
case lp.mode:
|
||||
of Mode.Decoding:
|
||||
lp.size = lp.decodeLen()
|
||||
lp.mode = Mode.Reading
|
||||
else:
|
||||
result = lp.readBuff.read(lp.size)
|
||||
echo result
|
||||
lp.size -= result.len
|
||||
if lp.size == 0:
|
||||
lp.mode = Mode.Decoding
|
||||
|
||||
except CatchableError as exc:
|
||||
trace "Exception occured", exc = exc.msg
|
||||
raise exc
|
||||
|
||||
proc decode*(lp: LenPrefixed,
|
||||
i: iterator(): Future[seq[byte]]):
|
||||
iterator(): Future[seq[byte]] =
|
||||
return iterator(): Future[seq[byte]] =
|
||||
for chunk in i():
|
||||
yield lp.read(chunk)
|
||||
|
||||
proc write(lp: LenPrefixed,
|
||||
i: iterator(): Future[seq[byte]]):
|
||||
Future[seq[byte]] {.async.} =
|
||||
for chunk in i():
|
||||
lp.writeBuff.append((await chunk))
|
||||
|
||||
var buf = initVBuffer()
|
||||
buf.writeSeq(lp.writeBuff.read())
|
||||
buf.finish()
|
||||
result = buf.buffer
|
||||
|
||||
proc encode*(lp: LenPrefixed,
|
||||
i: iterator(): Future[seq[byte]]):
|
||||
iterator(): Future[seq[byte]] =
|
||||
return iterator(): Future[seq[byte]] =
|
||||
yield lp.write(i)
|
@ -1,104 +0,0 @@
|
||||
## Nim-LibP2P
|
||||
## Copyright (c) 2019 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import oids
|
||||
import chronicles, chronos
|
||||
|
||||
type
|
||||
LPStream* = ref object of RootObj
|
||||
isClosed*: bool
|
||||
closeEvent*: AsyncEvent
|
||||
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||
oid*: Oid
|
||||
|
||||
LPStreamError* = object of CatchableError
|
||||
LPStreamIncompleteError* = object of LPStreamError
|
||||
LPStreamIncorrectError* = object of Defect
|
||||
LPStreamLimitError* = object of LPStreamError
|
||||
LPStreamReadError* = object of LPStreamError
|
||||
par*: ref Exception
|
||||
LPStreamWriteError* = object of LPStreamError
|
||||
par*: ref Exception
|
||||
LPStreamEOFError* = object of LPStreamError
|
||||
|
||||
proc newLPStreamReadError*(p: ref Exception): ref Exception {.inline.} =
|
||||
var w = newException(LPStreamReadError, "Read stream failed")
|
||||
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
||||
w.par = p
|
||||
result = w
|
||||
|
||||
proc newLPStreamWriteError*(p: ref Exception): ref Exception {.inline.} =
|
||||
var w = newException(LPStreamWriteError, "Write stream failed")
|
||||
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
|
||||
w.par = p
|
||||
result = w
|
||||
|
||||
proc newLPStreamIncompleteError*(): ref Exception {.inline.} =
|
||||
result = newException(LPStreamIncompleteError, "Incomplete data received")
|
||||
|
||||
proc newLPStreamLimitError*(): ref Exception {.inline.} =
|
||||
result = newException(LPStreamLimitError, "Buffer limit reached")
|
||||
|
||||
proc newLPStreamIncorrectError*(m: string): ref Exception {.inline.} =
|
||||
result = newException(LPStreamIncorrectError, m)
|
||||
|
||||
proc newLPStreamEOFError*(): ref Exception {.inline.} =
|
||||
result = newException(LPStreamEOFError, "Stream EOF!")
|
||||
|
||||
method closed*(s: LPStream): bool {.base, inline.} =
|
||||
s.isClosed
|
||||
|
||||
method read*(s: LPStream,
|
||||
n = -1):
|
||||
Future[seq[byte]] {.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method readExactly*(s: LPStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[void] {.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method readLine*(s: LPStream,
|
||||
limit = 0,
|
||||
sep = "\r\n"):
|
||||
Future[string]
|
||||
{.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method readOnce*(s: LPStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int):
|
||||
Future[int]
|
||||
{.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method readUntil*(s: LPStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int,
|
||||
sep: seq[byte]):
|
||||
Future[int]
|
||||
{.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method write*(s: LPStream, pbytes: pointer, nbytes: int)
|
||||
{.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method write*(s: LPStream, msg: string, msglen = -1)
|
||||
{.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method write*(s: LPStream, msg: seq[byte], msglen = -1)
|
||||
{.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
||||
|
||||
method close*(s: LPStream)
|
||||
{.base, async.} =
|
||||
doAssert(false, "not implemented!")
|
233
libp2p/stream/ringbuffer.nim
Normal file
233
libp2p/stream/ringbuffer.nim
Normal file
@ -0,0 +1,233 @@
|
||||
type
|
||||
RingBuffer*[T: byte | char] = object
|
||||
buff*: seq[T]
|
||||
head: int
|
||||
tail: int
|
||||
size: int
|
||||
len*: int
|
||||
|
||||
const DefaultSize = 1024
|
||||
|
||||
## A bare bones ring buffer suited for byte oriented data.
|
||||
## The buffer uses `shallowCopy` when appending and reading
|
||||
## data to overcome Nim's copy semantics.
|
||||
##
|
||||
## This is a FIFO data structure, data is always appended to the end
|
||||
## and read from the front.
|
||||
##
|
||||
|
||||
proc init*[T](b: type[RingBuffer[T]], size = DefaultSize): b =
|
||||
## Create and initialize the ring buffer. Takes an optional
|
||||
## maximum ``size`` parameter, otherwise ``size`` will default
|
||||
## to ``DefaultSize`` which is set to 1024.
|
||||
##
|
||||
## .. code-block:: nim
|
||||
## # create a buffer with 5
|
||||
## var buff = RingBuffer[byte].init(5)
|
||||
## buff.add(@['a', 'b', 'c', 'd', 'e'])
|
||||
## var data = newSeq[char](5)
|
||||
## discard buff.read(data)
|
||||
## echo data # prints @['a', 'b', 'c', 'd', 'e']
|
||||
##
|
||||
RingBuffer[T](buff: newSeq[T](size), size: size)
|
||||
|
||||
proc append*[T](b: var RingBuffer[T], data: openArray[T]) =
|
||||
## Append data to the end of the buffer. ``data`` will be
|
||||
## ``shallowCopy``ed into the buffer to overcome Nim's copy
|
||||
## semantics for ``seq``.
|
||||
##
|
||||
## .. code-block:: nim
|
||||
## buff.append(@['a', 'b', 'b', 'c', 'd'])
|
||||
##
|
||||
if data.len + b.len > b.size:
|
||||
raise newException(CatchableError, "Buffer would overflow!")
|
||||
|
||||
for i in data:
|
||||
shallowCopy(b.buff[b.tail], i)
|
||||
if b.tail == b.size - 1:
|
||||
b.tail = 0
|
||||
else:
|
||||
b.tail.inc
|
||||
b.len.inc
|
||||
|
||||
proc read*[T](b: var RingBuffer[T],
|
||||
data: var openArray[T],
|
||||
size: int = -1): int =
|
||||
## Read up to ``size`` bytes/chars from the front of the buffer
|
||||
## into the ``data`` argument.
|
||||
##
|
||||
## Returns an int indicating the amount of bytes/chars read.
|
||||
##
|
||||
## Note that ``size`` is the maximum amount of bytes/chars to
|
||||
## read, if not enough data is available read will return what
|
||||
## it can. If ``size`` is not provided, then the ``len`` field
|
||||
## of the ``data`` argument will be used instead.
|
||||
##
|
||||
## .. code-block:: nim
|
||||
## # read 5 chars from the buffer
|
||||
## var data = newSeq[char](10)
|
||||
## assert(buff.read(data, 5) == 5)
|
||||
##
|
||||
if b.len == 0:
|
||||
return
|
||||
|
||||
if data.len == 0 or size > data.len:
|
||||
raise newException(CatchableError, "Data isn't big enough!")
|
||||
|
||||
var isize = size
|
||||
if size > b.size:
|
||||
isize = b.size
|
||||
|
||||
if size < 0 or size > b.len:
|
||||
isize = b.len
|
||||
else:
|
||||
isize = size
|
||||
|
||||
while result < isize:
|
||||
shallowCopy(data[result], b.buff[b.head])
|
||||
if b.len == 0:
|
||||
break
|
||||
|
||||
if b.head == b.size - 1:
|
||||
b.head = 0
|
||||
else:
|
||||
b.head.inc()
|
||||
b.len.dec
|
||||
result.inc
|
||||
|
||||
proc read*[T](b: var RingBuffer[T], size: int = -1): seq[T] =
|
||||
## Read up to ``size`` bytes/chars from the front of the buffer.
|
||||
##
|
||||
## Returns a `seq` with the read bytes/chars.
|
||||
##
|
||||
## Note that ``size`` is the maximum amount of bytes/chars to read,
|
||||
## if not enough data is available read will return what it can.
|
||||
## If ``size`` is not provided, the entire contents of the buffer
|
||||
## will be returned.
|
||||
##
|
||||
## .. code-block:: nim
|
||||
## # read 5 chars from the buffer
|
||||
## assert(buff.read() == @[...])
|
||||
##
|
||||
var isize = size
|
||||
if size < 0:
|
||||
isize = b.len
|
||||
|
||||
result = newSeq[T](isize)
|
||||
discard b.read(result, isize)
|
||||
|
||||
proc reset*[T](b: var RingBuffer[T]) =
|
||||
## Reset the internal state of the buffer. The
|
||||
## internal buffer itself will not be cleared,
|
||||
## but all internal pointers will be which allows
|
||||
## reusing the buffer as if new.
|
||||
b.len = 0
|
||||
b.head = 0
|
||||
b.tail = 0
|
||||
|
||||
proc clear*[T](b: var RingBuffer[T]) =
|
||||
## Reset and clear the buffer.
|
||||
b.reset()
|
||||
b.buff.setLen(0)
|
||||
|
||||
when isMainModule:
|
||||
block Basic:
|
||||
var buff = RingBuffer[char].init(10)
|
||||
var data = newSeq[char](10)
|
||||
|
||||
buff.append(@['a', 'b', 'c', 'd', 'e'])
|
||||
assert(buff.len == 5, "len should be 5")
|
||||
assert(buff.head == 0, "head should be 0")
|
||||
assert(buff.tail == 5, "tail should b4 5")
|
||||
|
||||
buff.append(@['f', 'g', 'h', 'i', 'j'])
|
||||
assert(buff.len == 10, "len should be 10")
|
||||
assert(buff.head == 0, "head should be 0")
|
||||
assert(buff.tail == 0, "tail should be 0")
|
||||
|
||||
assert(buff.read(data, 5) == 5, "should have read 5 chars")
|
||||
assert(data[0..4] == @['a', 'b', 'c', 'd', 'e'])
|
||||
assert(buff.len == 5, "len should be 5")
|
||||
assert(buff.head == 5, "head should be 5")
|
||||
assert(buff.tail == 0, "tail should be 0")
|
||||
|
||||
buff.append(@['k', 'l', 'm', 'n', 'o'])
|
||||
assert(buff.len == 10, "len should be 10")
|
||||
assert(buff.head == 5, "head should be 5")
|
||||
assert(buff.tail == 5, "tail should be 5")
|
||||
|
||||
assert(buff.read(data, 2) == 2, "should have read 2 chars")
|
||||
assert(data[0..1] == @['f', 'g'])
|
||||
assert(buff.len == 8, "len should be 8")
|
||||
assert(buff.head == 7, "head should be 7")
|
||||
assert(buff.tail == 5, "tail should be 5")
|
||||
|
||||
buff.append(@['p', 'q'])
|
||||
assert(buff.len == 10, "len should be 10")
|
||||
assert(buff.head == 7, "head should be 7")
|
||||
assert(buff.tail == 7, "tail should be 7")
|
||||
|
||||
assert(buff.read(data) == 10, "should have read 10 chars")
|
||||
assert(data == @['h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q'])
|
||||
assert(buff.len == 0, "len should be 0")
|
||||
assert(buff.head == 7, "head should be 7")
|
||||
assert(buff.tail == 7, "tail should be 7")
|
||||
|
||||
buff.append(@['x', 'y'])
|
||||
assert(buff.len == 2, "len should be 2")
|
||||
assert(buff.head == 7, "head should be 7")
|
||||
assert(buff.tail == 9, "tail should be 9")
|
||||
|
||||
assert(buff.read(data, 4) == 2, "should have read 2 chars")
|
||||
assert(data[0..1] == @['x', 'y'])
|
||||
assert(buff.len == 0, "len should be 0")
|
||||
assert(buff.head == 9, "head should be 9")
|
||||
assert(buff.tail == 9, "tail should be 9")
|
||||
|
||||
buff.append(@['a', 'b', 'c', 'd', 'e'])
|
||||
assert(buff.len == 5, "len should be 5")
|
||||
assert(buff.head == 9, "head should be 7")
|
||||
assert(buff.tail == 4, "tail should be 9")
|
||||
|
||||
assert(buff.read(5) == @['a', 'b', 'c', 'd', 'e'])
|
||||
assert(buff.len == 0, "len should be 0")
|
||||
assert(buff.head == 4, "head should be 9")
|
||||
assert(buff.tail == 4, "tail should be 9")
|
||||
|
||||
block Errors:
|
||||
var buff = RingBuffer[char].init(5)
|
||||
|
||||
try:
|
||||
buff.append(@['a', 'b', 'c', 'd', 'e', 'g'])
|
||||
assert(false, "should not allow adding pas buffer size")
|
||||
except CatchableError as exc:
|
||||
assert(true)
|
||||
|
||||
try:
|
||||
buff.append(@['a', 'b', 'c', 'd', 'e', 'g'])
|
||||
var data: seq[char]
|
||||
discard buff.read(data)
|
||||
assert(false, "should not allow passing empty container")
|
||||
except CatchableError as exc:
|
||||
assert(true)
|
||||
|
||||
try:
|
||||
buff.append(@['a', 'b', 'c', 'd', 'e', 'g'])
|
||||
var data = newSeq[char](2)
|
||||
discard buff.read(data, 5)
|
||||
assert(false, "should not allow passing size greater than container")
|
||||
except CatchableError as exc:
|
||||
assert(true)
|
||||
|
||||
block Cleanup:
|
||||
var buff = RingBuffer[char].init(5)
|
||||
|
||||
buff.reset()
|
||||
assert(buff.len == 0, "buff.len should be 0")
|
||||
assert(buff.head == 0, "buff.head should be 0")
|
||||
assert(buff.tail == 0, "buff.tail should be 0")
|
||||
|
||||
buff.clear()
|
||||
assert(buff.buff.len == 0, "buff.buff.len should be 0")
|
||||
|
||||
echo "All passed!"
|
36
tests/testlenprefixed.nim
Normal file
36
tests/testlenprefixed.nim
Normal file
@ -0,0 +1,36 @@
|
||||
import unittest
|
||||
import chronos
|
||||
import ../libp2p/stream/lenprefixed
|
||||
|
||||
suite "LenPrefixed stream":
|
||||
test "encode":
|
||||
proc test(): Future[bool] {.async.} =
|
||||
var lp = LenPrefixed.init()
|
||||
iterator stream(): Future[seq[byte]] {.closure.} =
|
||||
var fut = newFuture[seq[byte]]()
|
||||
fut.complete(cast[seq[byte]](@['a', 'b', 'c', 'd', 'e']))
|
||||
yield fut
|
||||
|
||||
var encoded = await lp.encode(stream)()
|
||||
check:
|
||||
encoded == cast[seq[byte]](@['\5', 'a', 'b', 'c', 'd', 'e'])
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(test()) == true
|
||||
|
||||
test "decode":
|
||||
proc test(): Future[bool] {.async.} =
|
||||
var lp = LenPrefixed.init()
|
||||
iterator stream(): Future[seq[byte]] {.closure.} =
|
||||
var fut = newFuture[seq[byte]]()
|
||||
fut.complete(cast[seq[byte]](@['\5', 'a', 'b', 'c', 'd', 'e']))
|
||||
yield fut
|
||||
|
||||
var decoded = await lp.decode(stream)()
|
||||
check:
|
||||
decoded == cast[seq[byte]](@['a', 'b', 'c', 'd', 'e'])
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(test()) == true
|
Loading…
x
Reference in New Issue
Block a user