[wip] stream writes to threads in task_runner threadpool

It works but there are some strange behaviors that need be sorted out
This commit is contained in:
Michael Bradley, Jr 2021-11-21 20:36:14 -06:00
parent 9a74326211
commit 7c9eb27c5c
No known key found for this signature in database
GPG Key ID: D0307DBCF21A9A58
2 changed files with 116 additions and 47 deletions

View File

@ -39,7 +39,9 @@ task testAll, "Build & run Waku v1 tests":
import os import os
const build_opts = task localstore, "Build localstore experiment":
const
build_opts =
when defined(danger) or defined(release): when defined(danger) or defined(release):
(if defined(danger): " --define:danger" else: " --define:release") & (if defined(danger): " --define:danger" else: " --define:release") &
" --define:strip" & " --define:strip" &
@ -54,24 +56,38 @@ const build_opts =
" --linetrace:on" & " --linetrace:on" &
" --stacktrace:on" " --stacktrace:on"
const common_opts = common_opts =
" --define:ssl" & " --define:ssl" &
" --threads:on" & " --threads:on" &
" --tlsEmulation:off" " --tlsEmulation:off"
const chronos_preferred = chronos_preferred =
" --path:\"" & " --path:\"" &
staticExec("nimble path chronos --silent").parentDir / staticExec("nimble path chronos --silent").parentDir /
"chronos-#export-selector-field\"" "chronos-#export-selector-field\""
task localstore, "Build localstore experiment": chronicles_log_level {.strdefine.} =
when defined(danger) or defined(release):
"INFO"
else:
"DEBUG"
host {.strdefine.} = ""
maxRequestBodySize {.strdefine.} = ""
port {.strdefine.} = ""
var commands = [ var commands = [
"nim c" & "nim c" &
build_opts & build_opts &
common_opts & common_opts &
chronos_preferred & chronos_preferred &
" --define:chronicles_log_level=" & chronicles_log_level &
(when host != "": " --define:host=" & host else: "") &
(when maxRequestBodySize != "": " --define:maxRequestBodySize=" & maxRequestBodySize else: "") &
(when port != "": " --define:port=" & port else: "") &
" experiments/localstore.nim", " experiments/localstore.nim",
"experiments/localstore" "experiments/localstore"
] ]
for command in commands: for command in commands:
exec command exec command

View File

@ -1,13 +1,18 @@
import std/os import std/os
import pkg/[chronicles, stew/byteutils, task_runner] import pkg/[chronicles, stew/byteutils, task_runner]
import pkg/[chronos/apps/http/httpcommon, chronos/apps/http/httpserver]
logScope: logScope:
topics = "localstore" topics = "localstore"
type LocalstoreArg = ref object of ContextArg type LocalstoreArg = ref object of ContextArg
const localstore = "localstore" const
host {.strdefine.} = "127.0.0.1"
localstore = "localstore"
maxRequestBodySize {.intdefine.} = 10 * 1_048_576
port {.strdefine.} = "30080"
proc localstoreContext(arg: ContextArg) {.async, gcsafe, nimcall, proc localstoreContext(arg: ContextArg) {.async, gcsafe, nimcall,
raises: [Defect].} = raises: [Defect].} =
@ -15,32 +20,35 @@ proc localstoreContext(arg: ContextArg) {.async, gcsafe, nimcall,
let contextArg = cast[LocalstoreArg](arg) let contextArg = cast[LocalstoreArg](arg)
discard discard
proc readFromStreamWriteToFile(rfd: int, destFilePath: string) proc readFromStreamWriteToFile(rfd: int, destPath: string)
{.task(kind=no_rts, stoppable=false).} = {.task(kind=no_rts, stoppable=false).} =
let task = taskArg.taskName
let reader = cast[AsyncFD](rfd).fromPipe let reader = cast[AsyncFD](rfd).fromPipe
var destFile = destFilePath.open(fmWrite) var destFile = destPath.open(fmWrite)
while workerRunning[].load: proc pred(data: openarray[byte]): tuple[consumed: int, done: bool]
let data = await reader.read(12) {.gcsafe, raises: [Defect].} =
discard destFile.writeBytes(data, 0, 12)
debug "task pred got data", byteCount=data.len
if data.len > 0:
try:
discard destFile.writeBytes(data, 0, data.len)
except Exception as e:
debug "exception raised when task wrote to file", error=e.msg
(data.len, data.len < 4096)
else:
(0, true)
await reader.readMessage(pred)
await reader.closeWait
destFile.flushFile
destFile.close destFile.close
proc runTasks(runner: TaskRunner) {.async.} =
let (rfd, wfd) = createAsyncPipe()
asyncSpawn readFromStreamWriteToFile(runner, localstore, rfd.int,
currentSourcePath.parentDir / "foo.txt")
let writer = wfd.fromPipe
while runner.running.load:
let n = await writer.write("hello there ".toBytes)
await sleepAsync 10.milliseconds
proc scheduleStop(runner: TaskRunner, s: Duration) {.async.} = proc scheduleStop(runner: TaskRunner, s: Duration) {.async.} =
await sleepAsync 10.seconds await sleepAsync s
await runner.stop await runner.stop
proc main() {.async.} = proc main() {.async.} =
@ -50,12 +58,57 @@ proc main() {.async.} =
runnerPtr {.threadvar.}: pointer runnerPtr {.threadvar.}: pointer
runnerPtr = cast[pointer](runner) runnerPtr = cast[pointer](runner)
proc stop() {.noconv.} = waitFor cast[TaskRunner](runnerPtr).stop
proc process(r: RequestFence): Future[HttpResponseRef] {.async.} =
let
request = r.tryGet
filename = ($request.uri).split("/")[^1]
destPath = currentSourcePath.parentDir / "files" / filename
(rfd, wfd) = createAsyncPipe()
writer = wfd.fromPipe
proc pred(data: openarray[byte]): tuple[consumed: int, done: bool]
{.gcsafe, raises: [Defect].} =
debug "http server pred got data", byteCount=data.len
if data.len > 0:
try:
# discard waitFor writer.write(@data, data.len)
discard writer.write(@data, data.len)
except Exception as e:
debug "exception raised when http server wrote to task",
error=e.msg, stacktrace=getStackTrace(e)
(data.len, false)
else:
(0, true)
asyncSpawn readFromStreamWriteToFile(runner, localstore, rfd.int, destPath)
await request.getBodyReader.tryGet.readMessage(pred)
await writer.closeWait
discard request.respond(Http200)
let
address = initTAddress(host & ":" & port)
socketFlags = {ServerFlags.TcpNoDelay, ServerFlags.ReuseAddr}
server = HttpServerRef.new(address, process,
socketFlags = socketFlags, maxRequestBodySize = maxRequestBodySize).tryGet
var serverPtr {.threadvar.}: pointer
serverPtr = cast[pointer](server)
proc stop() {.noconv.} =
waitFor cast[HttpServerRef](serverPtr).stop
waitFor cast[TaskRunner](runnerPtr).stop
setControlCHook(stop) setControlCHook(stop)
runner.createWorker(thread, localstore, localstoreContext, localstoreArg) runner.createWorker(pool, localstore, localstoreContext, localstoreArg, 8)
runner.workers[localstore].worker.awaitTasks = false
await runner.start await runner.start
asyncSpawn runner.runTasks server.start
asyncSpawn runner.scheduleStop(10.seconds) asyncSpawn runner.scheduleStop(10.seconds)
while runner.running.load: poll() while runner.running.load: poll()