logos-storage-nim/experiments/task_runner_streams.nim

126 lines
3.4 KiB
Nim

import std/os
import pkg/[chronicles, stew/byteutils, task_runner]
import pkg/[chronos/apps/http/httpcommon, chronos/apps/http/httpserver]
logScope:
topics = "experiment"
type ExperimentArg = ref object of ContextArg
const
experiment = "experiment"
host {.strdefine.} = "127.0.0.1"
maxRequestBodySize {.intdefine.} = 10 * 1_048_576
port {.strdefine.} = "30080"
proc experimentContext(arg: ContextArg) {.async, gcsafe, nimcall,
raises: [Defect].} =
let contextArg = cast[ExperimentArg](arg)
discard
proc readFromStreamWriteToFile(rfd: int, destPath: string)
{.task(kind=no_rts, stoppable=false).} =
let reader = cast[AsyncFD](rfd).fromPipe
var destFile = destPath.open(fmWrite)
proc pred(data: openarray[byte]): tuple[consumed: int, done: bool]
{.gcsafe, raises: [Defect].} =
let dataCopy = @data
debug "task pred got data", byteCount=dataCopy.len
if dataCopy.len > 0:
try:
discard destFile.writeBytes(dataCopy, 0, dataCopy.len)
except Exception as e:
error "exception raised when task wrote to file", error=e.msg
(dataCopy.len, dataCopy.len < 4096)
else:
(0, true)
await reader.readMessage(pred)
await reader.closeWait
destFile.flushFile
destFile.close
proc scheduleStop(runner: TaskRunner, s: Duration) {.async.} =
await sleepAsync s
await runner.stop
proc main() {.async.} =
const destDir = currentSourcePath.parentDir.parentDir / "build" / "files"
createDir(destDir)
var
experimentArg = ExperimentArg()
runner = TaskRunner.new
runnerPtr {.threadvar.}: pointer
runnerPtr = cast[pointer](runner)
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
let
request = r.tryGet
filename = ($request.uri).split("/")[^1]
destPath = destDir / filename
(rfd, wfd) = createAsyncPipe()
writer = wfd.fromPipe
proc pred(data: openarray[byte]): tuple[consumed: int, done: bool]
{.gcsafe, raises: [Defect].} =
let dataCopy = @data
debug "http server pred got data", byteCount=dataCopy.len
if dataCopy.len > 0:
try:
# discard waitFor writer.write(dataCopy, dataCopy.len)
discard writer.write(dataCopy, dataCopy.len)
except Exception as e:
error "exception raised when http server wrote to task",
error=e.msg, stacktrace=getStackTrace(e)
(dataCopy.len, false)
else:
(0, true)
asyncSpawn readFromStreamWriteToFile(runner, experiment, rfd.int, destPath)
await request.getBodyReader.tryGet.readMessage(pred)
await writer.closeWait
discard request.respond(Http200)
let
address = initTAddress(host & ":" & port)
socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
server = HttpServerRef.new(address, process,
socketFlags = socketFlags, maxRequestBodySize = maxRequestBodySize).tryGet
var serverPtr {.threadvar.}: pointer
serverPtr = cast[pointer](server)
proc stop() {.noconv.} =
waitFor cast[HttpServerRef](serverPtr).stop
waitFor cast[TaskRunner](runnerPtr).stop
setControlCHook(stop)
runner.createWorker(pool, experiment, experimentContext, experimentArg, 8)
runner.workers[experiment].worker.awaitTasks = false
await runner.start
server.start
asyncSpawn runner.scheduleStop(10.seconds)
while runner.running.load: poll()
when isMainModule:
waitFor main()
quit QuitSuccess