342 lines
11 KiB
Nim

import
"."/[inputs, outputs, async_backend]
export
inputs, outputs, async_backend
when fsAsyncSupport:
import
std/macros,
./buffers
type
Pipe* = ref object
# TODO: Make these stream handles
input*: AsyncInputStream
output*: AsyncOutputStream
buffers*: PageBuffers
template enterWait(fut: var Future, context: static string) =
let wait = newFuture[void](context)
fut = wait
try: fsAwait wait
finally: fut = nil
template awake(fp: Future) =
let f = fp
if f != nil and not finished(f):
complete f
proc pipeRead(s: LayeredInputStream,
dst: pointer, dstLen: Natural): Future[Natural] {.async.} =
let buffers = s.buffers
if buffers.eofReached: return 0
var
bytesInBuffersAtStart = buffers.totalBufferedBytes
minBytesExpected = max(1, dstLen)
bytesInBuffersNow = bytesInBuffersAtStart
while bytesInBuffersNow < minBytesExpected:
awake buffers.waitingWriter
buffers.waitingReader.enterWait "waiting for writer to buffer more data"
bytesInBuffersNow = buffers.totalBufferedBytes
if buffers.eofReached:
return bytesInBuffersNow - bytesInBuffersAtStart
if dst != nil:
let drained {.used.} = drainBuffersInto(s, cast[ptr byte](dst), dstLen)
fsAssert drained == dstLen
awake buffers.waitingWriter
return bytesInBuffersNow - bytesInBuffersAtStart
proc pipeWrite(s: LayeredOutputStream, src: pointer, srcLen: Natural) {.async.} =
let buffers = s.buffers
while buffers.canAcceptWrite(srcLen) == false:
buffers.waitingWriter.enterWait "waiting for reader to drain the buffers"
if src != nil:
buffers.appendUnbufferedWrite(src, srcLen)
awake buffers.waitingReader
describeBuffers "pipeWrite", buffers
template completedFuture(name: static string): untyped =
let fut = newFuture[void](name)
complete fut
fut
let pipeInputVTable = InputStreamVTable(
readSync: proc (s: InputStream, dst: pointer, dstLen: Natural): Natural
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Failed to read from pipe":
let ls = LayeredInputStream(s)
fsAssert ls.allowWaitFor
return waitFor pipeRead(ls, dst, dstLen)
,
readAsync: proc (s: InputStream, dst: pointer, dstLen: Natural): Future[Natural]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected error from the async macro":
let ls = LayeredInputStream(s)
return pipeRead(ls, dst, dstLen)
,
getLenSync: proc (s: InputStream): Option[Natural]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
let source = LayeredInputStream(s).source
if source != nil:
return source.len
,
closeSync: proc (s: InputStream)
{.nimcall, gcsafe, raises: [IOError, Defect].} =
let source = LayeredInputStream(s).source
if source != nil:
close source
,
closeAsync: proc (s: InputStream): Future[void]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected error from the async macro":
let source = LayeredInputStream(s).source
if source != nil:
return closeAsync(Async source)
else:
return completedFuture("pipeInput.closeAsync")
)
let pipeOutputVTable = OutputStreamVTable(
writeSync: proc (s: OutputStream, src: pointer, srcLen: Natural)
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Failed to write all bytes to pipe":
var ls = LayeredOutputStream(s)
fsAssert ls.allowWaitFor
waitFor pipeWrite(ls, src, srcLen)
,
writeAsync: proc (s: OutputStream, src: pointer, srcLen: Natural): Future[void]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
# TODO: The async macro is raising exceptions even when
# merely forwarding a future:
fsTranslateErrors "Unexpected error from the async macro":
return pipeWrite(LayeredOutputStream s, src, srcLen)
,
flushSync: proc (s: OutputStream)
{.nimcall, gcsafe, raises: [IOError, Defect].} =
let destination = LayeredOutputStream(s).destination
if destination != nil:
flush destination
,
flushAsync: proc (s: OutputStream): Future[void]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
fsTranslateErrors "Unexpected error from the async macro":
let destination = LayeredOutputStream(s).destination
if destination != nil:
return flushAsync(Async destination)
else:
return completedFuture("pipeOutput.flushAsync")
,
closeSync: proc (s: OutputStream)
{.nimcall, gcsafe, raises: [IOError, Defect].} =
s.buffers.eofReached = true
fsTranslateErrors "Unexpected error from Future.complete":
awake s.buffers.waitingReader
let destination = LayeredOutputStream(s).destination
if destination != nil:
close destination
,
closeAsync: proc (s: OutputStream): Future[void]
{.nimcall, gcsafe, raises: [IOError, Defect].} =
s.buffers.eofReached = true
fsTranslateErrors "Unexpected error from Future.complete":
awake s.buffers.waitingReader
fsTranslateErrors "Unexpected error from the async macro":
let destination = LayeredOutputStream(s).destination
if destination != nil:
return closeAsync(Async destination)
else:
return completedFuture("pipeOutput.closeAsync")
)
func pipeInput*(source: InputStream,
pageSize = defaultPageSize,
allowWaitFor = false): AsyncInputStream =
fsAssert pageSize > 0
AsyncInputStream LayeredInputStream(
vtable: vtableAddr pipeInputVTable,
buffers: initPageBuffers pageSize,
allowWaitFor: allowWaitFor,
source: source)
func pipeInput*(buffers: PageBuffers,
allowWaitFor = false,
source: InputStream = nil): AsyncInputStream =
var spanEndPos = Natural 0
var span = if buffers.len == 0: default(PageSpan)
else: buffers.obtainReadableSpan(spanEndPos)
AsyncInputStream LayeredInputStream(
vtable: vtableAddr pipeInputVTable,
buffers: buffers,
span: span,
spanEndPos: span.len,
allowWaitFor: allowWaitFor,
source: source)
proc pipeOutput*(destination: OutputStream,
pageSize = defaultPageSize,
maxBufferedBytes = defaultPageSize * 4,
allowWaitFor = false): AsyncOutputStream =
fsAssert pageSize > 0
var
buffers = initPageBuffers pageSize
span = buffers.getWritableSpan()
AsyncOutputStream LayeredOutputStream(
vtable: vtableAddr pipeOutputVTable,
buffers: buffers,
span: span,
spanEndPos: span.len,
allowWaitFor: allowWaitFor,
destination: destination)
proc pipeOutput*(buffers: PageBuffers,
allowWaitFor = false,
destination: OutputStream = nil): AsyncOutputStream =
var span = buffers.getWritableSpan()
AsyncOutputStream LayeredOutputStream(
vtable: vtableAddr pipeOutputVTable,
buffers: buffers,
span: span,
# TODO What if the buffers are partially populated?
# Should we adjust the spanEndPos? This would
# need the old buffers.totalBytesWritten var.
spanEndPos: span.len,
allowWaitFor: allowWaitFor,
destination: destination)
func asyncPipe*(pageSize = defaultPageSize,
maxBufferedBytes = defaultPageSize * 4): Pipe =
fsAssert pageSize > 0
Pipe(buffers: initPageBuffers(pageSize, maxBufferedBytes))
func initReader*(pipe: Pipe): AsyncInputStream =
result = pipeInput(pipe.buffers)
pipe.input = result
func initWriter*(pipe: Pipe): AsyncOutputStream =
result = pipeOutput(pipe.buffers)
pipe.output = result
proc exchangeBuffersAfterPipilineStep(input: InputStream, output: OutputStream) =
let formerInputBuffers = input.buffers
let formerOutputBuffers = output.getBuffers
input.resetBuffers formerOutputBuffers
output.recycleBuffers formerInputBuffers
macro executePipeline*(start: InputStream, steps: varargs[untyped]): untyped =
result = newTree(nnkStmtListExpr)
var
inputVal = start
outputVal = newCall(bindSym"memoryOutput")
inputVar = genSym(nskVar, "input")
outputVar = genSym(nskVar, "output")
step0 = steps[0]
result.add quote do:
var
`inputVar` = `inputVal`
`outputVar` = OutputStream `outputVal`
`step0`(`inputVar`, `outputVar`)
if steps.len > 2:
let step1 = steps[1]
result.add quote do:
let formerInputBuffers = `inputVar`.buffers
`inputVar` = memoryInput(getBuffers `outputVar`)
recycleBuffers(`outputVar`, formerInputBuffers)
`step1`(`inputVar`, `outputVar`)
for i in 2 .. steps.len - 2:
let step = steps[i]
result.add quote do:
exchangeBuffersAfterPipilineStep(`inputVar`, `outputVar`)
`step`(`inputVar`, `outputVar`)
var closingCall = steps[^1]
closingCall.insert(1, outputVar)
result.add closingCall
if defined(debugMacros) or defined(debugPipelines):
echo result.repr
macro executePipeline*(start: AsyncInputStream, steps: varargs[untyped]): untyped =
var
stream = ident "stream"
pipelineSteps = ident "pipelineSteps"
pipelineBody = newTree(nnkStmtList)
step0 = steps[0]
stepOutput = genSym(nskVar, "pipe")
pipelineBody.add quote do:
var `pipelineSteps` = newSeq[Future[void]]()
var `stepOutput` = asyncPipe()
add `pipelineSteps`, `step0`(`stream`, initWriter(`stepOutput`))
var
stepInput = stepOutput
for i in 1 .. steps.len - 2:
var step = steps[i]
stepOutput = genSym(nskVar, "pipe")
pipelineBody.add quote do:
var `stepOutput` = asyncPipe()
add `pipelineSteps`, `step`(initReader(`stepInput`), initWriter(`stepOutput`))
stepInput = stepOutput
var RetTypeExpr = copy steps[^1]
RetTypeExpr.insert(1, newCall("default", ident"AsyncInputStream"))
var closingCall = steps[^1]
closingCall.insert(1, newCall(bindSym"initReader", stepInput))
pipelineBody.add quote do:
fsAwait allFutures(`pipelineSteps`)
`closingCall`
result = quote do:
type UserOpRetType = type(`RetTypeExpr`)
when UserOpRetType is Future:
type RetType = type(default(UserOpRetType).read)
else:
type RetType = UserOpRetType
proc pipelineProc(`stream`: AsyncInputStream): Future[RetType] {.async.} =
when UserOpRetType is Future:
var f = `pipelineBody`
return fsAwait(f)
else:
return `pipelineBody`
pipelineProc(`start`)
when defined(debugMacros):
echo result.repr