adding bufferred stream
This commit is contained in:
parent
cbf0f4f186
commit
fceea14aa5
|
@ -0,0 +1,219 @@
|
|||
## Nim-LibP2P
|
||||
## Copyright (c) 2018 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import deques, tables, sequtils, math
|
||||
import chronos
|
||||
import ../stream/lpstream
|
||||
|
||||
## I use a dequeu here because it uses a ring buffer under the hood
|
||||
## which should on average be more performant than memMoves and copies
|
||||
|
||||
type
|
||||
WriteHandler* = proc (data: seq[byte]) {.gcsafe.} # TODO: figure out how to make this generic to avoid casts
|
||||
|
||||
BufferStream* = ref object of LPStream
|
||||
maxSize*: int
|
||||
readBuf: Deque[byte] # a deque is based on a ring buffer
|
||||
readReqs: Deque[Future[int]] # use dequeue to fire reads in order
|
||||
dataReadEvent: AsyncEvent
|
||||
writeHandler: WriteHandler
|
||||
|
||||
proc requestReadBytes(s: BufferStream): Future[int] =
|
||||
## create a future that will complete when more
|
||||
## data becomes available in the read buffer
|
||||
result = newFuture[int]()
|
||||
s.readReqs.addLast(result)
|
||||
|
||||
proc newBufferStream*(handler: WriteHandler, size: int = 1024): BufferStream =
|
||||
new result
|
||||
result.maxSize = if isPowerOfTwo(size): size else: nextPowerOfTwo(size)
|
||||
result.readBuf = initDeque[byte](result.maxSize)
|
||||
result.readReqs = initDeque[Future[int]]()
|
||||
result.dataReadEvent = newAsyncEvent()
|
||||
result.writeHandler = handler
|
||||
|
||||
proc popFirst*(s: BufferStream): byte =
|
||||
result = s.readBuf.popFirst()
|
||||
s.dataReadEvent.fire()
|
||||
|
||||
proc popLast*(s: BufferStream): byte =
|
||||
result = s.readBuf.popLast()
|
||||
s.dataReadEvent.fire()
|
||||
|
||||
proc shrink(s: BufferStream, fromFirst = 0, fromLast = 0) =
|
||||
s.readBuf.shrink(fromFirst, fromLast)
|
||||
s.dataReadEvent.fire()
|
||||
|
||||
proc len*(s: BufferStream): int = s.readBuf.len
|
||||
|
||||
proc pushTo*(s: BufferStream, data: seq[byte]) {.async, gcsafe.} =
|
||||
## Write bytes to internal read buffer, use this to fill up the
|
||||
## buffer with data.
|
||||
##
|
||||
## This method is async and will wait until all data has been
|
||||
## written to the internal buffer; this is done so that backpressure
|
||||
## is preserved.
|
||||
var index = 0
|
||||
while true:
|
||||
while index < data.len and s.readBuf.len < s.maxSize:
|
||||
s.readBuf.addLast(data[index])
|
||||
inc(index)
|
||||
|
||||
# resolve the next queued read request
|
||||
if s.readReqs.len > 0:
|
||||
s.readReqs.popFirst().complete(index + 1)
|
||||
|
||||
if index >= data.len:
|
||||
break
|
||||
|
||||
# if we couldn't transfer all the data to the
|
||||
# internal buf do an async sleep and try writting
|
||||
# some more, this should preserve backpresure
|
||||
await s.dataReadEvent.wait()
|
||||
|
||||
method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async, gcsafe.} =
|
||||
## Read all bytes (n <= 0) or exactly `n` bytes from buffer
|
||||
##
|
||||
## This procedure allocates buffer seq[byte] and return it as result.
|
||||
var size = if n > 0: n else: s.readBuf.len()
|
||||
var index = 0
|
||||
while index < size:
|
||||
while s.readBuf.len() > 0 and index < size:
|
||||
result.add(s.popFirst())
|
||||
inc(index)
|
||||
|
||||
if index < size:
|
||||
discard await s.requestReadBytes()
|
||||
|
||||
method readExactly*(s: BufferStream, pbytes: pointer, nbytes: int): Future[void] {.async, gcsafe.} =
|
||||
## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store
|
||||
## it to ``pbytes``.
|
||||
##
|
||||
## If EOF is received and ``nbytes`` is not yet readed, the procedure
|
||||
## will raise ``LPStreamIncompleteError``.
|
||||
if nbytes > s.readBuf.len():
|
||||
raise newLPStreamIncompleteError()
|
||||
|
||||
let buff = await s.read(nbytes)
|
||||
copyMem(pbytes, unsafeAddr buff[0], nbytes)
|
||||
|
||||
method readLine*(s: BufferStream, limit = 0, sep = "\r\n"): Future[string] {.async, gcsafe.} =
|
||||
## Read one line from read-only stream ``rstream``, where ``"line"`` is a
|
||||
## sequence of bytes ending with ``sep`` (default is ``"\r\n"``).
|
||||
##
|
||||
## If EOF is received, and ``sep`` was not found, the method will return the
|
||||
## partial read bytes.
|
||||
##
|
||||
## If the EOF was received and the internal buffer is empty, return an
|
||||
## empty string.
|
||||
##
|
||||
## If ``limit`` more then 0, then result string will be limited to ``limit``
|
||||
## bytes.
|
||||
result = ""
|
||||
var lim = if limit <= 0: -1 else: limit
|
||||
var state = 0
|
||||
var index = 0
|
||||
|
||||
index = 0
|
||||
while index < s.readBuf.len:
|
||||
let ch = char(s.readBuf[index])
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
if state == len(sep):
|
||||
s.shrink(index + 1)
|
||||
break
|
||||
else:
|
||||
state = 0
|
||||
result.add(ch)
|
||||
if len(result) == lim:
|
||||
s.shrink(index + 1)
|
||||
break
|
||||
inc(index)
|
||||
|
||||
method readOnce*(s: BufferStream, pbytes: pointer, nbytes: int): Future[int] {.async, gcsafe.} =
|
||||
## Perform one read operation on read-only stream ``rstream``.
|
||||
##
|
||||
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
|
||||
## internal buffer, otherwise it will wait until some bytes will be received.
|
||||
if s.readBuf.len == 0:
|
||||
discard await s.requestReadBytes()
|
||||
|
||||
var len = if nbytes > s.readBuf.len: s.readBuf.len else: nbytes
|
||||
await s.readExactly(pbytes, len)
|
||||
result = len
|
||||
|
||||
method readUntil*(s: BufferStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int,
|
||||
sep: seq[byte]):
|
||||
Future[int] {.async, gcsafe.} =
|
||||
## Read data from the read-only stream ``rstream`` until separator ``sep`` is
|
||||
## found.
|
||||
##
|
||||
## On success, the data and separator will be removed from the internal
|
||||
## buffer (consumed). Returned data will include the separator at the end.
|
||||
##
|
||||
## If EOF is received, and `sep` was not found, procedure will raise
|
||||
## ``LPStreamIncompleteError``.
|
||||
##
|
||||
## If ``nbytes`` bytes has been received and `sep` was not found, procedure
|
||||
## will raise ``LPStreamLimitError``.
|
||||
##
|
||||
## Procedure returns actual number of bytes read.
|
||||
var
|
||||
dest = cast[ptr UncheckedArray[byte]](pbytes)
|
||||
state = 0
|
||||
k = 0
|
||||
|
||||
let datalen = s.readBuf.len()
|
||||
if datalen == 0 and s.readBuf.len() == 0:
|
||||
raise newLPStreamIncompleteError()
|
||||
|
||||
var index = 0
|
||||
while index < datalen:
|
||||
let ch = s.readBuf[index]
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
else:
|
||||
state = 0
|
||||
if k < nbytes:
|
||||
dest[k] = ch
|
||||
inc(k)
|
||||
else:
|
||||
raise newLPStreamLimitError()
|
||||
if state == len(sep):
|
||||
break
|
||||
inc(index)
|
||||
|
||||
if state == len(sep):
|
||||
s.shrink(index + 1)
|
||||
result = k
|
||||
else:
|
||||
s.shrink(datalen)
|
||||
|
||||
method write*(s: BufferStream, pbytes: pointer, nbytes: int) {.async, gcsafe.} =
|
||||
var buf: seq[byte] = newSeq[byte](nbytes)
|
||||
copyMem(addr buf[0], pbytes, nbytes)
|
||||
s.writeHandler(buf)
|
||||
|
||||
method write*(s: BufferStream, msg: string, msglen = -1) {.async, gcsafe.} =
|
||||
var buf = ""
|
||||
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
|
||||
s.writeHandler(cast[seq[byte]](toSeq(buf.items)))
|
||||
|
||||
method write*(s: BufferStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
|
||||
var buf: seq[byte]
|
||||
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
|
||||
s.writeHandler(buf)
|
||||
|
||||
method close*(s: BufferStream) {.async, gcsafe.} =
|
||||
for r in s.readReqs:
|
||||
r.cancel()
|
||||
s.readBuf.clear()
|
||||
s.closed = true
|
|
@ -0,0 +1,192 @@
|
|||
import unittest, deques, sequtils
|
||||
import chronos
|
||||
import ../libp2p/stream/bufferstream
|
||||
|
||||
suite "BufferStream":
|
||||
test "push data to buffer":
|
||||
proc testPushTo(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 16)
|
||||
check buff.len == 0
|
||||
var data: seq[char]
|
||||
data.add(@"12345")
|
||||
await buff.pushTo(cast[seq[byte]](data))
|
||||
check buff.len == 5
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testPushTo()) == true
|
||||
|
||||
test "push and wait":
|
||||
proc testPushTo(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 4)
|
||||
check buff.len == 0
|
||||
|
||||
let fut = buff.pushTo(cast[seq[byte]](@"12345"))
|
||||
check buff.len == 4
|
||||
check buff.popFirst() == byte(ord('1'))
|
||||
await fut
|
||||
check buff.len == 4
|
||||
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testPushTo()) == true
|
||||
|
||||
test "read":
|
||||
proc testRead(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
await buff.pushTo(cast[seq[byte]](@"12345"))
|
||||
check @"12345" == cast[string](await buff.read())
|
||||
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testRead()) == true
|
||||
|
||||
test "read with size":
|
||||
proc testRead(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
await buff.pushTo(cast[seq[byte]](@"12345"))
|
||||
let data = cast[string](await buff.read(3))
|
||||
check ['1', '2', '3'] == data
|
||||
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testRead()) == true
|
||||
|
||||
test "read and wait":
|
||||
proc testRead(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
await buff.pushTo(cast[seq[byte]](@"123"))
|
||||
check buff.len == 3
|
||||
let readFut = buff.read(5)
|
||||
await buff.pushTo(cast[seq[byte]](@"45"))
|
||||
check buff.len == 2
|
||||
|
||||
check cast[string](await readFut) == ['1', '2', '3', '4', '5']
|
||||
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testRead()) == true
|
||||
|
||||
test "readExactly":
|
||||
proc testReadExactly(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
await buff.pushTo(cast[seq[byte]](@"12345"))
|
||||
check buff.len == 5
|
||||
var data: seq[byte] = newSeq[byte](2)
|
||||
await buff.readExactly(addr data[0], 2)
|
||||
check cast[string](data) == @['1', '2']
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testReadExactly()) == true
|
||||
|
||||
test "readLine":
|
||||
proc testReadLine(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 16)
|
||||
check buff.len == 0
|
||||
|
||||
await buff.pushTo(cast[seq[byte]](@"12345\n67890"))
|
||||
check buff.len == 11
|
||||
check "12345" == await buff.readLine(0, "\n")
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testReadLine()) == true
|
||||
|
||||
test "readOnce":
|
||||
proc testReadOnce(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
var data: seq[byte] = newSeq[byte](3)
|
||||
let readFut = buff.readOnce(addr data[0], 5)
|
||||
await buff.pushTo(cast[seq[byte]](@"123"))
|
||||
check buff.len == 3
|
||||
|
||||
check (await readFut) == 3
|
||||
check cast[string](data) == @['1', '2', '3']
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testReadOnce()) == true
|
||||
|
||||
test "readUntil":
|
||||
proc testReadUntil(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) = discard
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
var data: seq[byte] = newSeq[byte](3)
|
||||
await buff.pushTo(cast[seq[byte]](@"123$45"))
|
||||
check buff.len == 6
|
||||
let readFut = buff.readUntil(addr data[0], 5, @[byte('$')])
|
||||
|
||||
check (await readFut) == 4
|
||||
check cast[string](data) == @['1', '2', '3']
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testReadUntil()) == true
|
||||
|
||||
test "write ptr":
|
||||
proc testWritePtr(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) =
|
||||
check cast[string](data) == "Hello!"
|
||||
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
var data = "Hello!"
|
||||
await buff.write(addr data[0], data.len)
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testWritePtr()) == true
|
||||
|
||||
test "write string":
|
||||
proc testWritePtr(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) =
|
||||
check cast[string](data) == "Hello!"
|
||||
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
await buff.write("Hello!", 6)
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testWritePtr()) == true
|
||||
|
||||
test "write bytes":
|
||||
proc testWritePtr(): Future[bool] {.async.} =
|
||||
proc writeHandler(data: seq[byte]) =
|
||||
check cast[string](data) == "Hello!"
|
||||
|
||||
let buff = newBufferStream(writeHandler, 10)
|
||||
check buff.len == 0
|
||||
|
||||
await buff.write(cast[seq[byte]](toSeq("Hello!".items)), 6)
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testWritePtr()) == true
|
Loading…
Reference in New Issue