Initial work on pipelines
This commit is contained in:
parent
4b147d64a0
commit
69fc4e24ee
|
@ -8,7 +8,8 @@ license = "Apache License 2.0"
|
|||
skipDirs = @["tests"]
|
||||
|
||||
requires "nim >= 0.17.0",
|
||||
"stew"
|
||||
"stew",
|
||||
"chronos"
|
||||
|
||||
task test, "Run all tests":
|
||||
exec "nim c -r --threads:off tests/all_tests"
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
const
|
||||
faststreams_async_backend {.strdefine.} = "chronos"
|
||||
|
||||
when faststreams_async_backend == "chronos":
|
||||
import chronos # import chronos/[asyncfutures2, asyncmacro2]
|
||||
export chronos # export asyncfutures2, asyncmacro2
|
||||
|
||||
template faststreamsAwait*(f: Future): untyped =
|
||||
await f
|
||||
|
||||
elif faststreams_async_backend in ["std", "asyncdispatch"]:
|
||||
import std/[asyncfutures, asyncmacro]
|
||||
export asyncfutures, asyncmacro
|
||||
|
||||
template faststreamsAwait*(awaited: Future[T]): untyped =
|
||||
# TODO revisit after https://github.com/nim-lang/Nim/pull/12085/ is merged
|
||||
let f = awaited
|
||||
yield f
|
||||
if not isNil(f.error):
|
||||
raise f.error
|
||||
f.read
|
||||
|
||||
else:
|
||||
{.fatal: "Unrecognized network backend: " & faststreams_async_backend.}
|
||||
|
||||
template raiseFaststreamsError*(errMsg: string, body: untyped) =
|
||||
try:
|
||||
body
|
||||
except CatchableError as err:
|
||||
raise newException(IOError, errMsg, err)
|
||||
|
|
@ -0,0 +1,123 @@
|
|||
import
|
||||
chronos,
|
||||
input_stream, output_stream, multisync
|
||||
|
||||
export
|
||||
chronos, fsMultiSync
|
||||
|
||||
type
|
||||
ChronosInputStream* = ref object of InputStream
|
||||
transport: StreamTransport
|
||||
allowWaitFor: bool
|
||||
|
||||
ChronosOutputStream* = ref object of OutputStream
|
||||
transport: StreamTransport
|
||||
allowWaitFor: bool
|
||||
|
||||
const
|
||||
readingErrMsg = "Failed to read from Chronos transport"
|
||||
writingErrMsg = "Failed to write to Chronos transport"
|
||||
closingErrMsg = "Failed to close Chronos transport"
|
||||
writeIncompleteErrMsg = "Failed to write all bytes to Chronos transport"
|
||||
|
||||
proc fsCloseWait(t: StreamTransport) {.async, raises: [Defect, IOError].} =
|
||||
raiseFaststreamsError closingErrMsg:
|
||||
await t.closeWait()
|
||||
|
||||
proc fsReadOnce(t: StreamTransport,
|
||||
buffer: ptr byte, bufSize: int): Future[int] {.async, raises: [Defect, IOError].} =
|
||||
raiseFaststreamsError readingErrMsg:
|
||||
return t.readOnce(pointer(buffer), bufSize)
|
||||
|
||||
# TODO: Use the Raising type here
|
||||
let ChronosInputStreamVTable = InputStreamVTable(
|
||||
readSync: proc (s: InputStream, buffer: ptr byte, bufSize: int): int
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
var cs = ChronosInputStream(s)
|
||||
doAssert cs.allowWaitFor
|
||||
raiseFaststreamsError readingErrMsg:
|
||||
return waitFor cs.transport.readOnce(pointer(buffer), bufSize)
|
||||
,
|
||||
readAsync: proc (s: InputStream, buffer: ptr byte, bufSize: int): Future[int]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
ChronosInputStream(s).transport.fsReadOnce(buffer, bufSize)
|
||||
,
|
||||
closeSync: proc (s: InputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
raiseFaststreamsError closingErrMsg:
|
||||
ChronosInputStream(s).transport.close()
|
||||
,
|
||||
closeAsync: proc (s: InputStream, cb: CloseAsyncCallback): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
ChronosInputStream(s).transport.fsCloseWait()
|
||||
)
|
||||
|
||||
func chronosInput*(s: StreamTransport,
|
||||
pageSize = output_stream.defaultPageSize,
|
||||
allowWaitFor = false): InputStreamHandle =
|
||||
InputStreamHandle(s: ChronosInputStream(
|
||||
vtable: vtableAddr ChronosInputStreamVTable,
|
||||
pageSize: pageSize,
|
||||
allowWaitFor: allowWaitFor))
|
||||
|
||||
let ChronosOutputStreamVTable = OutputStreamVTable(
|
||||
writePageSync: proc (s: OutputStream, page: openarray[byte])
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
var cs = ChronosOutputStream(s)
|
||||
doAssert cs.allowWaitFor
|
||||
let bytesWritten = raiseFaststreamsError writingErrMsg:
|
||||
waitFor cs.transport.write(unsafeAddr page[0], page.len)
|
||||
if bytesWritten != page.len:
|
||||
raise newException(IOError, writeIncompleteErrMsg)
|
||||
,
|
||||
writePageAsync: proc (s: OutputStream, buf: pointer, bufLen: int): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
var
|
||||
cs = ChronosOutputStream(s)
|
||||
retFuture = newFuture[void]("ChronosOutputStream.writePageAsync")
|
||||
writeFut: Future[int]
|
||||
|
||||
proc continuation(udata: pointer) {.gcsafe.} =
|
||||
if writeFut.error != nil:
|
||||
retFuture.fail newException(IOError, writingErrMsg, writeFut.error)
|
||||
elif writeFut.read != bufLen:
|
||||
retFuture.fail newException(IOError, writeIncompleteErrMsg)
|
||||
else:
|
||||
retFuture.complete()
|
||||
|
||||
var writeFut = cs.transport.write(unsafeAddr page[0], page.len)
|
||||
writeFut.addCallback(continuation, nil)
|
||||
|
||||
retFuture.cancelCallback = proc (udata: pointer) {.gcsafe.} =
|
||||
writeFut.removeCallback(continuation, nil)
|
||||
|
||||
return retFuture
|
||||
,
|
||||
flushSync: proc (s: OutputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
discard
|
||||
,
|
||||
flushAsync: proc (s: OutputStream): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
result = newFuture[void]("ChronosOutputStream.flushAsync")
|
||||
result.complete()
|
||||
,
|
||||
closeSync: ChronosInputStreamVTable.closeSync,
|
||||
closeAsyncProc: ChronosInputStreamVTable.closeAsyncProc
|
||||
)
|
||||
|
||||
func chronosOutput*(s: StreamTransport,
|
||||
pageSize = output_stream.defaultPageSize,
|
||||
allowWaitFor = false): OutputStreamHandle =
|
||||
var stream = ChronosOutputStream(
|
||||
vtable: vtableAddr(SnappyStreamVTable),
|
||||
pageSize: pageSize,
|
||||
minWriteSize: 1,
|
||||
maxWriteSize: high(int),
|
||||
transport: s,
|
||||
allowWaitFor: allowWaitFor)
|
||||
|
||||
stream.initWithSinglePage()
|
||||
|
||||
OutputStreamHandle(s: stream)
|
||||
|
|
@ -1,15 +1,16 @@
|
|||
import
|
||||
memfiles, options,
|
||||
stew/[ptrops, ranges/ptr_arith]
|
||||
stew/[ptrops, ranges/ptr_arith],
|
||||
async_backend
|
||||
|
||||
type
|
||||
InputStream* = ref object of RootObj
|
||||
vtable*: ptr InputStreamVTable
|
||||
head*: ptr byte
|
||||
pageSize*: int
|
||||
bufferSize: int
|
||||
bufferStart, bufferEnd: ptr byte
|
||||
bufferEndPos: int
|
||||
inputDevice*: RootRef
|
||||
|
||||
LayeredInputStream* = ref object of InputStream
|
||||
subStream*: InputStream
|
||||
|
@ -17,27 +18,21 @@ type
|
|||
InputStreamHandle* = object
|
||||
s*: InputStream
|
||||
|
||||
AsciiInputStream* = distinct InputStream
|
||||
Utf8InputStream* = distinct InputStream
|
||||
AsyncInputStream* {.borrow: `.`.} = distinct InputStream
|
||||
|
||||
ReadSyncProc* = proc (s: InputStream, buffer: ptr byte, bufSize: int): int
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
ReadAsyncCallback* = proc (s: InputStream, bytesRead: int)
|
||||
{.nimcall, gcsafe, raises: [Defect].}
|
||||
|
||||
ReadAsyncProc* = proc (s: InputStream,
|
||||
buffer: ptr byte, bufSize: int,
|
||||
cb: ReadAsyncCallback)
|
||||
ReadAsyncProc* = proc (s: InputStream, buffer: ptr byte, bufSize: int): Future[int]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
CloseSyncProc* = proc (s: InputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
CloseAsyncCallback* = proc (s: InputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
CloseAsyncProc* = proc (s: InputStream): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
CloseAsyncProc* = proc (s: InputStream)
|
||||
GetLenSyncProc* = proc (s: InputStream): int
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
InputStreamVTable* = object
|
||||
|
@ -45,12 +40,13 @@ type
|
|||
readAsync*: ReadAsyncProc
|
||||
closeSync*: CloseSyncProc
|
||||
closeAsync*: CloseAsyncProc
|
||||
hasKnownLen*: bool
|
||||
getLenSync*: GetLenSyncProc
|
||||
|
||||
FileInputStream = ref object of InputStream
|
||||
file: MemFile
|
||||
|
||||
const
|
||||
lengthUnknown* = -1
|
||||
debugHelpers = false
|
||||
nimAllocatorMetadataSize* = 0
|
||||
# TODO: Get this from Nim's allocator.
|
||||
|
@ -86,7 +82,9 @@ let FileStreamVTable = InputStreamVTable(
|
|||
except OSError as err:
|
||||
raise newException(IOError, "Failed to close file", err)
|
||||
,
|
||||
hasKnownLen: true
|
||||
getLenSync: proc (s: InputStream): int
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
distance(s.head, s.bufferEnd)
|
||||
)
|
||||
|
||||
template vtableAddr*(vtable: InputStreamVTable): ptr InputStreamVTable =
|
||||
|
@ -124,26 +122,57 @@ proc memoryInput*(mem: openarray[byte]): InputStreamHandle =
|
|||
proc memoryInput*(str: string): InputStreamHandle =
|
||||
memoryInput str.toOpenArrayByte(0, str.len - 1)
|
||||
|
||||
# TODO: Is this used, should we deprecate it?
|
||||
proc endPos*(s: InputStream): int =
|
||||
doAssert s.vtable == nil or s.vtable.hasKnownLen
|
||||
doAssert s.vtable == nil or s.vtable.getLenSync != nil
|
||||
return s.bufferEndPos
|
||||
|
||||
proc syncRead(s: InputStream): bool =
|
||||
# TODO The return type here could be Option[Natural] if Nim had
|
||||
# the Option[range] optimisation that will make it equvalent to `int`.
|
||||
proc len*(s: InputStream): int {.raises: [Defect, IOError].} =
|
||||
if s.vtable == nil:
|
||||
distance(s.head, s.bufferEnd)
|
||||
elif s.vtable.getLenSync != nil:
|
||||
s.vtable.getLenSync(s)
|
||||
else:
|
||||
lengthUnknown
|
||||
|
||||
template len*(s: AsyncInputStream): int =
|
||||
len InputStream(s)
|
||||
|
||||
proc bufferMoreDataSync(s: InputStream): bool =
|
||||
# Returns true if more data was successfully buffered
|
||||
if s.vtable == nil or s.vtable.readSync == nil:
|
||||
return true
|
||||
return false
|
||||
|
||||
let bytesRead = s.vtable.readSync(s, s.bufferStart, s.bufferSize)
|
||||
if bytesRead == 0:
|
||||
# TODO close the input device
|
||||
s.inputDevice = nil
|
||||
s.vtable = nil
|
||||
return true
|
||||
return false
|
||||
else:
|
||||
s.bufferEnd = offset(s.bufferStart, bytesRead)
|
||||
s.bufferEndPos += bytesRead
|
||||
return false
|
||||
return true
|
||||
|
||||
proc ensureBytes*(s: InputStream, n: int): bool =
|
||||
proc bufferMoreDataAsync(s: AsyncInputStream): Future[bool] {.async.} =
|
||||
# Returns true if more data was successfully buffered
|
||||
return false
|
||||
|
||||
proc readable*(s: InputStream): bool =
|
||||
if s.head != s.bufferEnd:
|
||||
true
|
||||
else:
|
||||
s.bufferMoreDataSync()
|
||||
|
||||
template readable*(sp: AsyncInputStream): bool =
|
||||
let s = sp
|
||||
if s.head != s.bufferEnd:
|
||||
true
|
||||
else:
|
||||
faststreamsAwait bufferMoreDataAsync(s)
|
||||
|
||||
proc readable*(s: InputStream, n: int): bool =
|
||||
if distance(s.head, s.bufferEnd) >= n:
|
||||
return true
|
||||
|
||||
|
@ -153,16 +182,37 @@ proc ensureBytes*(s: InputStream, n: int): bool =
|
|||
# TODO
|
||||
doAssert false, "Multi-buffer reading will be implemented later"
|
||||
|
||||
proc eof*(s: InputStream): bool =
|
||||
if s.head != s.bufferEnd:
|
||||
template readable*(sp: AsyncInputStream, n: int): bool =
|
||||
let s = sp
|
||||
|
||||
if distance(s.head, s.bufferEnd) >= n:
|
||||
return true
|
||||
|
||||
if s.vtable == nil:
|
||||
return false
|
||||
|
||||
return s.syncRead()
|
||||
# TODO
|
||||
doAssert false, "Multi-buffer reading will be implemented later"
|
||||
|
||||
template close*(s: AsyncInputStream) =
|
||||
close InputStream(s)
|
||||
|
||||
proc peek*(s: InputStream): byte {.inline.} =
|
||||
doAssert s.head != s.bufferEnd
|
||||
return s.head[]
|
||||
|
||||
template peek*(s: AsyncInputStream): byte =
|
||||
peek InputStream(s)
|
||||
|
||||
proc peekAt*(s: InputStream, pos: int): byte {.inline.} =
|
||||
# TODO implement page flipping
|
||||
let peekHead = offset(s.head, pos)
|
||||
doAssert cast[uint](peekHead) < cast[uint](s.bufferEnd)
|
||||
return peekHead[]
|
||||
|
||||
template peekAt*(s: AsyncInputStream, pos: int): byte =
|
||||
peekAt InputStream(s)
|
||||
|
||||
when debugHelpers:
|
||||
proc showPosition*(s: InputStream) =
|
||||
echo "head at ", distance(s.bufferStart, s.head), "/",
|
||||
|
@ -171,23 +221,44 @@ when debugHelpers:
|
|||
proc advance*(s: InputStream) =
|
||||
if s.head != s.bufferEnd:
|
||||
s.head = offset(s.head, 1)
|
||||
discard s.syncRead()
|
||||
else:
|
||||
discard s.bufferMoreDataSync()
|
||||
|
||||
template advance*(sp: AsyncInputStream) =
|
||||
let s = sp
|
||||
if s.head != s.bufferEnd:
|
||||
s.head = offset(s.head, 1)
|
||||
else:
|
||||
discard faststreamsAwait(bufferMoreDataAsync(s))
|
||||
|
||||
proc read*(s: InputStream): byte =
|
||||
result = s.peek()
|
||||
advance s
|
||||
|
||||
template read*(sp: AsyncInputStream): byte =
|
||||
let s = sp
|
||||
let res = s.peek()
|
||||
advance(s)
|
||||
res
|
||||
|
||||
proc checkReadAhead(s: InputStream, n: int): ptr byte =
|
||||
result = s.head
|
||||
doAssert distance(s.head, s.bufferEnd) >= n
|
||||
s.head = offset(s.head, n)
|
||||
|
||||
template readBytes*(s: InputStream, n: int): auto =
|
||||
template read*(s: InputStream, n: int): auto =
|
||||
makeOpenArray(checkReadAhead(s, n), n)
|
||||
|
||||
proc next*(s: InputStream): Option[byte] =
|
||||
if not s.eof:
|
||||
result = some s.read()
|
||||
if readable(s):
|
||||
result = some read(s)
|
||||
|
||||
template next*(sp: AsyncInputStream): Option[byte] =
|
||||
let s = sp
|
||||
if readable(s):
|
||||
some read(s)
|
||||
else:
|
||||
none byte
|
||||
|
||||
proc bufferPos(s: InputStream, pos: int): ptr byte =
|
||||
let offsetFromEnd = pos - s.bufferEndPos
|
||||
|
@ -198,6 +269,9 @@ proc bufferPos(s: InputStream, pos: int): ptr byte =
|
|||
proc pos*(s: InputStream): int {.inline.} =
|
||||
s.bufferEndPos - distance(s.head, s.bufferEnd)
|
||||
|
||||
template pos*(s: AsyncInputStream): int =
|
||||
pos InputStream(s)
|
||||
|
||||
proc firstAccessiblePos*(s: InputStream): int {.inline.} =
|
||||
s.bufferEndPos - distance(s.bufferStart, s.bufferEnd)
|
||||
|
||||
|
@ -211,24 +285,3 @@ proc rewind*(s: InputStream, delta: int) =
|
|||
proc rewindTo*(s: InputStream, pos: int) {.inline.} =
|
||||
s.head = s.bufferPos(pos)
|
||||
|
||||
template pos*(s: AsciiInputStream|Utf8InputStream): int =
|
||||
pos InputStream(s)
|
||||
|
||||
template eof*(s: AsciiInputStream|Utf8InputStream): bool =
|
||||
eof InputStream(s)
|
||||
|
||||
template close*(s: AsciiInputStream|Utf8InputStream) =
|
||||
close InputStream(s)
|
||||
|
||||
template advance*(s: AsciiInputStream) =
|
||||
advance InputStream(s)
|
||||
|
||||
template peek*(s: AsciiInputStream): char =
|
||||
char InputStream(s).peek()
|
||||
|
||||
template read*(s: var AsciiInputStream): char =
|
||||
char InputStream(s).read()
|
||||
|
||||
template next*(s: var AsciiInputStream): Option[char] =
|
||||
cast[Option[char]](InputStream(s).next())
|
||||
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
import
|
||||
stew/shims/macros,
|
||||
async_backend, input_stream, output_stream
|
||||
|
||||
macro fsMultiSync*(body: untyped) =
|
||||
# We will produce an identical copy of the annotated proc,
|
||||
# but taking async parameters and having the async pragma.
|
||||
var
|
||||
asyncProcBody = copy body
|
||||
asyncProcParams = asyncBody[3]
|
||||
|
||||
asyncProcBody.addPragma(bindSym"async")
|
||||
|
||||
# The return types becomes Future[T]
|
||||
if asyncProcParams[0].kind == nnkEmpty
|
||||
asyncProcParams[0] = newBracketExpr(ident"Future", ident"void")
|
||||
else:
|
||||
asyncProcParams[0] = newBracketExpr(ident"Future", asyncProcParams[0])
|
||||
|
||||
# We replace all stream inputs with their async counterparts
|
||||
for i in 1 ..< asyncProcParams.len:
|
||||
let paramsDef = asyncProcParams[i]
|
||||
let typ = paramsDef[^2]
|
||||
if sameType(typ, bindSym"InputStream"):
|
||||
paramsDef[^2] = bindSym "AsyncInputStream"
|
||||
elif sameType(typ, bindSym"OutputStream"):
|
||||
paramsDef[^2] = bindSym "AsyncOutputStream"
|
||||
|
||||
result = newStmtList(body, asyncBody)
|
||||
if defined(debugSupportAsync):
|
||||
echo result.repr
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
import
|
||||
deques, typetraits,
|
||||
stew/[ptrops, strings, ranges/ptr_arith]
|
||||
stew/[ptrops, strings, ranges/ptr_arith],
|
||||
async_backend
|
||||
|
||||
type
|
||||
OutputPage = object
|
||||
|
@ -23,24 +24,31 @@ type
|
|||
OutputStreamHandle* = object
|
||||
s*: OutputStream
|
||||
|
||||
WritePageProc* = proc (s: OutputStream, page: openarray[byte])
|
||||
AsyncOutputStream* {.borrow: `.`.} = distinct OutputStream
|
||||
|
||||
WritePageSyncProc* = proc (s: OutputStream, page: openarray[byte])
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
WritePageAsyncProc* = proc (s: OutputStream, buf: pointer, bufLen: int): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
FlushSyncProc* = proc (s: OutputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
FlushProc* = proc (s: OutputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
FlushAsyncProc* = proc (s: OutputStream): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
CloseSyncProc* = proc (s: OutputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
CloseAsyncCallback* = proc (s: OutputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
CloseAsyncProc* = proc (s: OutputStream)
|
||||
CloseAsyncProc* = proc (s: OutputStream): Future[void]
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].}
|
||||
|
||||
OutputStreamVTable* = object
|
||||
writePage*: WritePageProc
|
||||
flush*: FlushProc
|
||||
writePageSync*: WritePageSyncProc
|
||||
writePageAsync*: WritePageAsyncProc
|
||||
flushSync*: FlushSyncProc
|
||||
flushAsync*: FlushAsyncProc
|
||||
closeSync*: CloseSyncProc
|
||||
closeAsyncProc*: CloseAsyncProc
|
||||
|
||||
|
@ -91,6 +99,13 @@ template isExternalCursor(c: var WriteCursor): bool =
|
|||
func runway*(c: var WriteCursor): int {.inline.} =
|
||||
distance(c.head, c.bufferEnd)
|
||||
|
||||
proc prepareRunway*(s: OutputStream, length: int) =
|
||||
# TODO implement this
|
||||
discard
|
||||
|
||||
template prepareRunway*(s: AsyncOutputStream, length: int) =
|
||||
prepareRunway OutputStream(s)
|
||||
|
||||
proc flipPage(s: OutputStream) =
|
||||
s.cursor.head = cast[ptr byte](addr s.pages[s.pages.len - 1].buffer[0])
|
||||
# TODO: There is an assumption here and elsewhere that `s.pages[^1]` has
|
||||
|
@ -129,16 +144,15 @@ proc memoryOutput*(buffer: pointer, len: int): OutputStreamHandle =
|
|||
OutputStreamHandle(s: stream)
|
||||
|
||||
let FileStreamVTable = OutputStreamVTable(
|
||||
writePage: proc (s: OutputStream,
|
||||
data: openarray[byte])
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
writePageSync: proc (s: OutputStream, data: openarray[byte])
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
var file = FileOutputStream(s).file
|
||||
var written = file.writeBuffer(unsafeAddr data[0], data.len)
|
||||
if written != data.len:
|
||||
raise newException(IOError, "Failed to write OutputStream page.")
|
||||
,
|
||||
flush: proc (s: OutputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
flushSync: proc (s: OutputStream)
|
||||
{.nimcall, gcsafe, raises: [IOError, Defect].} =
|
||||
flushFile FileOutputStream(s).file
|
||||
,
|
||||
closeSync: proc (s: OutputStream)
|
||||
|
@ -175,7 +189,7 @@ proc pos*(s: OutputStream): int =
|
|||
s.endPos - s.cursor.runway
|
||||
|
||||
proc safeWritePage(s: OutputStream, data: openarray[byte]) {.inline.} =
|
||||
if data.len > 0: s.vtable.writePage(s, data)
|
||||
if data.len > 0: s.vtable.writePageSync(s, data)
|
||||
|
||||
proc writePages(s: OutputStream, skipLast = 0) =
|
||||
assert s.vtable != nil
|
||||
|
@ -205,7 +219,7 @@ proc flush*(s: OutputStream) =
|
|||
# Then we write the current page, which is probably incomplete
|
||||
s.writePartialPage s.pages[0]
|
||||
# Finally, we flush
|
||||
s.vtable.flush(s)
|
||||
s.vtable.flushSync(s)
|
||||
|
||||
proc writePendingPagesAndLeaveOne(s: OutputStream) {.inline.} =
|
||||
s.writePages
|
||||
|
@ -277,12 +291,12 @@ proc writeDataAsPages(s: OutputStream, data: ptr byte, dataLen: int) =
|
|||
|
||||
if dataLen > s.pageSize:
|
||||
if dataLen < s.maxWriteSize:
|
||||
s.vtable.writePage(s, makeOpenArray(data, dataLen))
|
||||
s.vtable.writePageSync(s, makeOpenArray(data, dataLen))
|
||||
s.endPos += dataLen
|
||||
return
|
||||
|
||||
while dataLen > s.pageSize:
|
||||
s.vtable.writePage(s, makeOpenArray(data, s.pageSize))
|
||||
s.vtable.writePageSync(s, makeOpenArray(data, s.pageSize))
|
||||
data = offset(data, s.pageSize)
|
||||
dec dataLen, s.pageSize
|
||||
s.endPos += s.pageSize
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
import
|
||||
macros,
|
||||
input_stream, output_stream
|
||||
|
||||
export
|
||||
input_stream, output_stream
|
||||
|
||||
macro executePipeline*(start: InputStream, steps: varargs[untyped]) =
|
||||
var input = start
|
||||
result = newStmtList()
|
||||
|
||||
for i in 0 .. steps.len - 2:
|
||||
var
|
||||
step = steps[i]
|
||||
outputVar = genSym(nskVar, "out")
|
||||
output = if i == steps.len - 2: steps[^1]
|
||||
else: newCall(bindSym"memoryOutput")
|
||||
|
||||
result.add quote do:
|
||||
var `outputVar` = `output`
|
||||
`step`(`input`, `outputVar`)
|
||||
|
||||
input = quote do:
|
||||
memoryInput(getOutput(`outputVar`))
|
||||
|
||||
if defined(debugMacros) or defined(debugPipelines):
|
||||
echo result.repr
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
import
|
||||
test_input_stream,
|
||||
test_output_stream
|
||||
test_output_stream,
|
||||
test_pipelines
|
||||
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
import
|
||||
../faststreams
|
||||
|
||||
template cbBase(a, b): untyped = [
|
||||
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
|
||||
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
|
||||
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
|
||||
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
|
||||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', a, b]
|
||||
|
||||
const
|
||||
cb64 = cbBase('+', '/')
|
||||
invalidChar = 255
|
||||
paddingByte = byte('=')
|
||||
|
||||
template encodeSize(size: int): int = (size * 4 div 3) + 6
|
||||
|
||||
proc base64encode*(i: InputStream, o: OutputStream) =
|
||||
var
|
||||
n: uint32
|
||||
b: uint32
|
||||
|
||||
template inputByte(exp: untyped) =
|
||||
b = uint32(i.read)
|
||||
n = exp
|
||||
|
||||
template outputChar(x: typed) =
|
||||
o.append cb64[x and 63]
|
||||
|
||||
while i.readable(3):
|
||||
inputByte(b shl 16)
|
||||
inputByte(n or b shl 8)
|
||||
inputByte(n or b shl 0)
|
||||
outputChar(n shr 18)
|
||||
outputChar(n shr 12)
|
||||
outputChar(n shr 6)
|
||||
outputChar(n shr 0)
|
||||
|
||||
if i.readable:
|
||||
inputByte(b shl 16)
|
||||
if i.readable:
|
||||
inputByte(n or b shl 8)
|
||||
outputChar(n shr 18)
|
||||
outputChar(n shr 12)
|
||||
outputChar(n shr 6)
|
||||
o.append paddingByte
|
||||
else:
|
||||
outputChar(n shr 18)
|
||||
outputChar(n shr 12)
|
||||
o.append paddingByte
|
||||
o.append paddingByte
|
||||
|
||||
proc initDecodeTable*(): array[256, char] =
|
||||
# computes a decode table at compile time
|
||||
for i in 0 ..< 256:
|
||||
let ch = char(i)
|
||||
var code = invalidChar
|
||||
if ch >= 'A' and ch <= 'Z': code = i - 0x00000041
|
||||
if ch >= 'a' and ch <= 'z': code = i - 0x00000047
|
||||
if ch >= '0' and ch <= '9': code = i + 0x00000004
|
||||
if ch == '+' or ch == '-': code = 0x0000003E
|
||||
if ch == '/' or ch == '_': code = 0x0000003F
|
||||
result[i] = char(code)
|
||||
|
||||
const
|
||||
decodeTable = initDecodeTable()
|
||||
|
||||
proc base64decode*(i: InputStream, o: OutputStream) =
|
||||
proc decodeSize(size: int): int =
|
||||
return (size * 3 div 4) + 6
|
||||
|
||||
proc raiseInvalidChar(c: byte, pos: int) {.noReturn.} =
|
||||
raise newException(ValueError,
|
||||
"Invalid base64 format character `" & char(c) & "` at location " & $pos & ".")
|
||||
|
||||
template inputChar(x: untyped) =
|
||||
let c = i.read()
|
||||
let x = int decodeTable[c]
|
||||
if x == invalidChar:
|
||||
raiseInvalidChar(c, i.pos - 1)
|
||||
|
||||
template outputChar(x: untyped) =
|
||||
o.append char(x and 255)
|
||||
|
||||
let inputLen = i.len
|
||||
if inputLen != lengthUnknown:
|
||||
o.prepareRunway decodeSize(inputLen)
|
||||
|
||||
# hot loop: read 4 characters at at time
|
||||
while i.readable(8):
|
||||
inputChar(a)
|
||||
inputChar(b)
|
||||
inputChar(c)
|
||||
inputChar(d)
|
||||
outputChar(a shl 2 or b shr 4)
|
||||
outputChar(b shl 4 or c shr 2)
|
||||
outputChar(c shl 6 or d shr 0)
|
||||
|
||||
if i.readable(4):
|
||||
inputChar(a)
|
||||
inputChar(b)
|
||||
outputChar(a shl 2 or b shr 4)
|
||||
|
||||
if i.peek == paddingByte:
|
||||
let next = i.peekAt(1)
|
||||
if next != paddingByte:
|
||||
raiseInvalidChar(next, i.pos + 1)
|
||||
else:
|
||||
inputChar(c)
|
||||
outputChar(b shl 4 or c shr 2)
|
||||
if i.peek != paddingByte:
|
||||
inputChar(d)
|
||||
outputChar(c shl 6 or d shr 0)
|
||||
elif i.readable:
|
||||
raise newException(ValueError, "The input stream has insufficient nymber of bytes for base64 decoding")
|
||||
|
|
@ -8,5 +8,5 @@ suite "input stream":
|
|||
var stream = memoryInput(input)
|
||||
|
||||
check:
|
||||
(stream.readBytes(4) == "1234".toOpenArrayByte(0, 3))
|
||||
(stream.read(4) == "1234".toOpenArrayByte(0, 3))
|
||||
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
import
|
||||
std/[unittest, strutils, base64],
|
||||
../faststreams/pipelines,
|
||||
./base64 as fsBase64
|
||||
|
||||
include system/timers
|
||||
|
||||
type
|
||||
TestTimes = object
|
||||
fsPipeline: Nanos
|
||||
fsAsyncPipeline: Nanos
|
||||
stdFunctionCalls: Nanos
|
||||
|
||||
proc upcaseAllCharacters(i: InputStream, o: OutputStream) =
|
||||
while i.readable:
|
||||
o.append toUpperAscii(char i.read())
|
||||
|
||||
template timeit(timerVar: var Nanos, code: untyped) =
|
||||
let t0 = getTicks()
|
||||
code
|
||||
timerVar = int(getTicks() - t0) div 1000000
|
||||
|
||||
suite "pipelines":
|
||||
var loremIpsum = """
|
||||
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod
|
||||
tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim
|
||||
veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex
|
||||
ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate
|
||||
velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat
|
||||
cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id
|
||||
est laborum.
|
||||
|
||||
""".repeat(100)
|
||||
|
||||
test "upper-case/base64 pipeline":
|
||||
var
|
||||
times: TestTimes
|
||||
stdRes: string
|
||||
fsRes: string
|
||||
|
||||
timeIt times.fsPipeline:
|
||||
var memOut = memoryOutput()
|
||||
executePipeline(memoryInput(loremIpsum),
|
||||
upcaseAllCharacters,
|
||||
base64encode,
|
||||
base64decode,
|
||||
memOut)
|
||||
fsRes = memOut.getOutput(string)
|
||||
|
||||
timeIt times.stdFunctionCalls:
|
||||
stdRes = base64.decode(base64.encode(toUpperAscii(loremIpsum)))
|
||||
|
||||
check fsRes == stdRes
|
||||
|
Loading…
Reference in New Issue