[wip] simple working experiment using a task_runner task in combo with async pipe/stream

This commit is contained in:
Michael Bradley, Jr 2021-11-20 21:12:27 -06:00
parent 797ea84c9f
commit 9a74326211
No known key found for this signature in database
GPG Key ID: D0307DBCF21A9A58
1 changed files with 63 additions and 3 deletions

View File

@ -1,5 +1,65 @@
import pkg/task_runner
import std/os
var runner = TaskRunner.new()
import pkg/[chronicles, stew/byteutils, task_runner]
echo "Hello from localstore experiment #1"
logScope:
topics = "localstore"
type LocalstoreArg = ref object of ContextArg
const localstore = "localstore"
proc localstoreContext(arg: ContextArg) {.async, gcsafe, nimcall,
raises: [Defect].} =
let contextArg = cast[LocalstoreArg](arg)
discard
proc readFromStreamWriteToFile(rfd: int, destFilePath: string)
{.task(kind=no_rts, stoppable=false).} =
let task = taskArg.taskName
let reader = cast[AsyncFD](rfd).fromPipe
var destFile = destFilePath.open(fmWrite)
while workerRunning[].load:
let data = await reader.read(12)
discard destFile.writeBytes(data, 0, 12)
destFile.close
proc runTasks(runner: TaskRunner) {.async.} =
let (rfd, wfd) = createAsyncPipe()
asyncSpawn readFromStreamWriteToFile(runner, localstore, rfd.int,
currentSourcePath.parentDir / "foo.txt")
let writer = wfd.fromPipe
while runner.running.load:
let n = await writer.write("hello there ".toBytes)
await sleepAsync 10.milliseconds
proc scheduleStop(runner: TaskRunner, s: Duration) {.async.} =
await sleepAsync 10.seconds
await runner.stop
proc main() {.async.} =
var
localstoreArg = LocalstoreArg()
runner = TaskRunner.new
runnerPtr {.threadvar.}: pointer
runnerPtr = cast[pointer](runner)
proc stop() {.noconv.} = waitFor cast[TaskRunner](runnerPtr).stop
setControlCHook(stop)
runner.createWorker(thread, localstore, localstoreContext, localstoreArg)
await runner.start
asyncSpawn runner.runTasks
asyncSpawn runner.scheduleStop(10.seconds)
while runner.running.load: poll()
when isMainModule:
waitFor main()
quit QuitSuccess