From 7c9eb27c5cbd1f5a5084b3210261c510e4a2aa36 Mon Sep 17 00:00:00 2001 From: "Michael Bradley, Jr" Date: Sun, 21 Nov 2021 20:36:14 -0600 Subject: [PATCH] [wip] stream writes to threads in task_runner threadpool It works but there are some strange behaviors that need be sorted out --- dagger.nimble | 66 ++++++++++++++++---------- experiments/localstore.nim | 97 +++++++++++++++++++++++++++++--------- 2 files changed, 116 insertions(+), 47 deletions(-) diff --git a/dagger.nimble b/dagger.nimble index b88ea98a..9a4e8d9b 100644 --- a/dagger.nimble +++ b/dagger.nimble @@ -39,39 +39,55 @@ task testAll, "Build & run Waku v1 tests": import os -const build_opts = - when defined(danger) or defined(release): - (if defined(danger): " --define:danger" else: " --define:release") & - " --define:strip" & - " --hints:off" & - " --opt:size" & - " --passC:-flto" & - " --passL:-flto" - else: - " --debugger:native" & - " --define:chronicles_line_numbers" & - " --define:debug" & - " --linetrace:on" & - " --stacktrace:on" - -const common_opts = - " --define:ssl" & - " --threads:on" & - " --tlsEmulation:off" - -const chronos_preferred = - " --path:\"" & - staticExec("nimble path chronos --silent").parentDir / - "chronos-#export-selector-field\"" - task localstore, "Build localstore experiment": + const + build_opts = + when defined(danger) or defined(release): + (if defined(danger): " --define:danger" else: " --define:release") & + " --define:strip" & + " --hints:off" & + " --opt:size" & + " --passC:-flto" & + " --passL:-flto" + else: + " --debugger:native" & + " --define:chronicles_line_numbers" & + " --define:debug" & + " --linetrace:on" & + " --stacktrace:on" + + common_opts = + " --define:ssl" & + " --threads:on" & + " --tlsEmulation:off" + + chronos_preferred = + " --path:\"" & + staticExec("nimble path chronos --silent").parentDir / + "chronos-#export-selector-field\"" + + chronicles_log_level {.strdefine.} = + when defined(danger) or defined(release): + "INFO" + else: + "DEBUG" + + host {.strdefine.} = "" + maxRequestBodySize {.strdefine.} = "" + port {.strdefine.} = "" + var commands = [ "nim c" & build_opts & common_opts & chronos_preferred & + " --define:chronicles_log_level=" & chronicles_log_level & + (when host != "": " --define:host=" & host else: "") & + (when maxRequestBodySize != "": " --define:maxRequestBodySize=" & maxRequestBodySize else: "") & + (when port != "": " --define:port=" & port else: "") & " experiments/localstore.nim", "experiments/localstore" ] + for command in commands: exec command diff --git a/experiments/localstore.nim b/experiments/localstore.nim index b15f8f73..ad34ed00 100644 --- a/experiments/localstore.nim +++ b/experiments/localstore.nim @@ -1,13 +1,18 @@ import std/os import pkg/[chronicles, stew/byteutils, task_runner] +import pkg/[chronos/apps/http/httpcommon, chronos/apps/http/httpserver] logScope: topics = "localstore" type LocalstoreArg = ref object of ContextArg -const localstore = "localstore" +const + host {.strdefine.} = "127.0.0.1" + localstore = "localstore" + maxRequestBodySize {.intdefine.} = 10 * 1_048_576 + port {.strdefine.} = "30080" proc localstoreContext(arg: ContextArg) {.async, gcsafe, nimcall, raises: [Defect].} = @@ -15,32 +20,35 @@ proc localstoreContext(arg: ContextArg) {.async, gcsafe, nimcall, let contextArg = cast[LocalstoreArg](arg) discard -proc readFromStreamWriteToFile(rfd: int, destFilePath: string) +proc readFromStreamWriteToFile(rfd: int, destPath: string) {.task(kind=no_rts, stoppable=false).} = - let task = taskArg.taskName - let reader = cast[AsyncFD](rfd).fromPipe - var destFile = destFilePath.open(fmWrite) + var destFile = destPath.open(fmWrite) - while workerRunning[].load: - let data = await reader.read(12) - discard destFile.writeBytes(data, 0, 12) + proc pred(data: openarray[byte]): tuple[consumed: int, done: bool] + {.gcsafe, raises: [Defect].} = + debug "task pred got data", byteCount=data.len + + if data.len > 0: + try: + discard destFile.writeBytes(data, 0, data.len) + except Exception as e: + debug "exception raised when task wrote to file", error=e.msg + + (data.len, data.len < 4096) + + else: + (0, true) + + await reader.readMessage(pred) + await reader.closeWait + destFile.flushFile destFile.close -proc runTasks(runner: TaskRunner) {.async.} = - let (rfd, wfd) = createAsyncPipe() - asyncSpawn readFromStreamWriteToFile(runner, localstore, rfd.int, - currentSourcePath.parentDir / "foo.txt") - - let writer = wfd.fromPipe - while runner.running.load: - let n = await writer.write("hello there ".toBytes) - await sleepAsync 10.milliseconds - proc scheduleStop(runner: TaskRunner, s: Duration) {.async.} = - await sleepAsync 10.seconds + await sleepAsync s await runner.stop proc main() {.async.} = @@ -50,12 +58,57 @@ proc main() {.async.} = runnerPtr {.threadvar.}: pointer runnerPtr = cast[pointer](runner) - proc stop() {.noconv.} = waitFor cast[TaskRunner](runnerPtr).stop + + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + let + request = r.tryGet + filename = ($request.uri).split("/")[^1] + destPath = currentSourcePath.parentDir / "files" / filename + (rfd, wfd) = createAsyncPipe() + writer = wfd.fromPipe + + proc pred(data: openarray[byte]): tuple[consumed: int, done: bool] + {.gcsafe, raises: [Defect].} = + + debug "http server pred got data", byteCount=data.len + + if data.len > 0: + try: + # discard waitFor writer.write(@data, data.len) + discard writer.write(@data, data.len) + except Exception as e: + debug "exception raised when http server wrote to task", + error=e.msg, stacktrace=getStackTrace(e) + + (data.len, false) + + else: + (0, true) + + asyncSpawn readFromStreamWriteToFile(runner, localstore, rfd.int, destPath) + await request.getBodyReader.tryGet.readMessage(pred) + await writer.closeWait + discard request.respond(Http200) + + let + address = initTAddress(host & ":" & port) + socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr} + server = HttpServerRef.new(address, process, + socketFlags = socketFlags, maxRequestBodySize = maxRequestBodySize).tryGet + + var serverPtr {.threadvar.}: pointer + serverPtr = cast[pointer](server) + + proc stop() {.noconv.} = + waitFor cast[HttpServerRef](serverPtr).stop + waitFor cast[TaskRunner](runnerPtr).stop + setControlCHook(stop) - runner.createWorker(thread, localstore, localstoreContext, localstoreArg) + runner.createWorker(pool, localstore, localstoreContext, localstoreArg, 8) + runner.workers[localstore].worker.awaitTasks = false await runner.start - asyncSpawn runner.runTasks + server.start asyncSpawn runner.scheduleStop(10.seconds) while runner.running.load: poll()