WIP: feat: "beta 2" implementation extracted from nim-status
This commit is contained in:
parent
b77db52d7b
commit
9383578c7c
|
@ -18,3 +18,15 @@
|
|||
[submodule "vendor/nim-unittest2"]
|
||||
path = vendor/nim-unittest2
|
||||
url = https://github.com/status-im/nim-unittest2.git
|
||||
[submodule "vendor/nim-json-serialization"]
|
||||
path = vendor/nim-json-serialization
|
||||
url = https://github.com/status-im/nim-json-serialization.git
|
||||
[submodule "vendor/nim-chronicles"]
|
||||
path = vendor/nim-chronicles
|
||||
url = https://github.com/status-im/nim-chronicles.git
|
||||
[submodule "vendor/nim-faststreams"]
|
||||
path = vendor/nim-faststreams
|
||||
url = https://github.com/status-im/nim-faststreams.git
|
||||
[submodule "vendor/nim-serialization"]
|
||||
path = vendor/nim-serialization
|
||||
url = https://github.com/status-im/nim-serialization.git
|
||||
|
|
|
@ -9,6 +9,6 @@
|
|||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
import ./task_runner/achannels, ./task_runner/sys
|
||||
import ./task_runner/[macros, runner]
|
||||
|
||||
export achannels, sys
|
||||
export macros, runner
|
||||
|
|
|
@ -13,7 +13,7 @@ import strutils
|
|||
|
||||
import chronos/[handles, transport]
|
||||
|
||||
import ./asyncloop, ./asyncsync
|
||||
import ./chronos_plus/[asyncloop, asyncsync]
|
||||
|
||||
when hasThreadSupport:
|
||||
import locks
|
||||
|
|
|
@ -0,0 +1,323 @@
|
|||
import # std libs
|
||||
std/[macros, unicode]
|
||||
|
||||
import # task_runner libs
|
||||
./runner
|
||||
|
||||
macro task*(kind: static TaskKind, stoppable: static bool, body: untyped): untyped =
|
||||
result = newStmtList()
|
||||
|
||||
const
|
||||
star = "*"
|
||||
syncPost = "Sync"
|
||||
taskArgPost = "TaskArg"
|
||||
taskPost = "Task"
|
||||
|
||||
var
|
||||
exported = false
|
||||
starId = ident(star)
|
||||
taskArgTypeId = ident(taskArgPost)
|
||||
taskArgTypeDerivedId: NimNode
|
||||
taskName: string
|
||||
taskNameId: NimNode
|
||||
taskNameImplId: NimNode
|
||||
taskNameSyncId: NimNode
|
||||
taskReturnTypeId: NimNode
|
||||
|
||||
if kind(body[0]) == nnkPostfix and body[0][0] == ident(star):
|
||||
exported = true
|
||||
taskName = strVal(body[0][1])
|
||||
taskNameId = ident(taskName)
|
||||
|
||||
else:
|
||||
taskNameId = body[0]
|
||||
taskName = strVal(taskNameId)
|
||||
|
||||
taskArgTypeDerivedId = ident(taskName.capitalize & taskArgPost)
|
||||
taskNameImplId = ident(taskName & taskPost)
|
||||
taskNameSyncId = ident(taskName & syncPost)
|
||||
taskReturnTypeId = body[3][0]
|
||||
|
||||
let
|
||||
chanReturnToSenderId = ident("chanReturnToSender")
|
||||
serializedPointerTypeId = ident("ByteAddress")
|
||||
taskStoppedId = ident("taskStopped")
|
||||
|
||||
# The repetitiveness of some code below could/should be cleaned up with
|
||||
# additional metaprogramming (and probably more informed use of shortcuts and
|
||||
# helpers provided by Nim's macros module); there can be a task options
|
||||
# object for which e.g. the `stoppable` field is a boolean flag, but then
|
||||
# also a helper object/table with the same fields/keys but the values are
|
||||
# tuples of the type names and field names to be added to the type derived
|
||||
# from TaskArg; the fields of the supplied/default options object can be
|
||||
# iterated over and the proper nnkIdentDefs, etc. can be built according to
|
||||
# the options values and info in the helper object/table; the same (or very
|
||||
# similar) technique could be used to allow e.g. specification of a
|
||||
# TaskRunner instance and/or worker name and/or `ptr Atomic[bool]` (for
|
||||
# stopping the task) in the options object, which would then affect whether
|
||||
# parameters for those things are included in the type signatures of the
|
||||
# constructed procs or instead baked into their bodies.
|
||||
|
||||
var
|
||||
taskArgTypeDef = newNimNode(nnkTypeDef)
|
||||
objDef = newNimNode(nnkObjectTy)
|
||||
recList = newNimNode(nnkRecList)
|
||||
|
||||
taskArgTypeDef.add(taskArgTypeDerivedId)
|
||||
taskArgTypeDef.add(newEmptyNode())
|
||||
objDef.add(newEmptyNode())
|
||||
objDef.add(newNimNode(nnkOfInherit).add(taskArgTypeId))
|
||||
|
||||
if kind == rts:
|
||||
var
|
||||
objParam = newNimNode(nnkIdentDefs)
|
||||
post = newNimNode(nnkPostfix)
|
||||
|
||||
post.add(starId)
|
||||
post.add(chanReturnToSenderId)
|
||||
objParam.add(post)
|
||||
objParam.add(serializedPointerTypeId)
|
||||
objParam.add(newEmptyNode())
|
||||
recList.add(objParam)
|
||||
|
||||
if stoppable == true:
|
||||
var
|
||||
objParam = newNimNode(nnkIdentDefs)
|
||||
post = newNimNode(nnkPostfix)
|
||||
|
||||
post.add(starId)
|
||||
post.add(taskStoppedId)
|
||||
objParam.add(post)
|
||||
objParam.add(serializedPointerTypeId)
|
||||
objParam.add(newEmptyNode())
|
||||
recList.add(objParam)
|
||||
|
||||
for nn in body[3]:
|
||||
if kind(nn) == nnkIdentDefs:
|
||||
var
|
||||
objParam = newNimNode(nnkIdentDefs)
|
||||
post = newNimNode(nnkPostfix)
|
||||
|
||||
post.add(starId)
|
||||
post.add(nn[0])
|
||||
objParam.add(post)
|
||||
objParam.add(nn[1])
|
||||
objParam.add(nn[2])
|
||||
recList.add(objParam)
|
||||
|
||||
objDef.add(recList)
|
||||
taskArgTypeDef.add(newNimNode(nnkRefTy).add(objDef))
|
||||
result.add(newNimNode(nnkTypeSection).add(taskArgTypeDef))
|
||||
|
||||
let
|
||||
asyncPragmaId = ident("async")
|
||||
atomicTypeId = ident("Atomic")
|
||||
boolTypeId = ident("bool")
|
||||
futureTypeId = ident("Future")
|
||||
taskArgId = ident("taskArg")
|
||||
taskArgEncId = ident("taskArgEncoded")
|
||||
chanSendToHostId = ident("chanSendToHost")
|
||||
chanSendToWorkerId = ident("chanSendToWorker")
|
||||
stoppedId = ident("stopped")
|
||||
taskRunnerId = ident("taskRunner")
|
||||
taskRunnerTypeId = ident("TaskRunner")
|
||||
workerChannelTypeId = ident("WorkerChannel")
|
||||
workerId = ident("worker")
|
||||
workerNameId = ident("workerName")
|
||||
workerNameTypeId = ident("string")
|
||||
workerRunningId = ident("workerRunning")
|
||||
|
||||
var
|
||||
atomPtr = newNimNode(nnkPtrTy)
|
||||
atomBracket = newNimNode(nnkBracketExpr)
|
||||
|
||||
atomBracket.add(atomicTypeId)
|
||||
atomBracket.add(boolTypeId)
|
||||
atomPtr.add(atomBracket)
|
||||
|
||||
let
|
||||
taskStoppedTypeId = atomPtr
|
||||
workerRunningTypeId = atomPtr
|
||||
|
||||
var impl = newStmtList()
|
||||
|
||||
impl.add quote do:
|
||||
let
|
||||
`taskArgId` = decode[`taskArgTypeDerivedId`](`taskArgEncId`)
|
||||
`chanSendToHostId` = cast[`workerChannelTypeId`](`taskArgId`.`chanSendToHostId`)
|
||||
|
||||
if kind == rts:
|
||||
impl.add quote do:
|
||||
let `chanReturnToSenderId` = cast[`workerChannelTypeId`](`taskArgId`.`chanReturnToSenderId`)
|
||||
|
||||
if stoppable == true:
|
||||
impl.add quote do:
|
||||
var `taskStoppedId` = cast[`taskStoppedTypeId`](`taskArgId`.`taskStoppedId`)
|
||||
|
||||
impl.add quote do:
|
||||
var `workerRunningId` = cast[`workerRunningTypeId`](`taskArgId`.`workerRunningId`)
|
||||
|
||||
for nn in body[3]:
|
||||
if kind(nn) == nnkIdentDefs:
|
||||
let id = nn[0]
|
||||
impl.add quote do:
|
||||
let `id` = `taskArgId`.`id`
|
||||
|
||||
impl.add(body[6])
|
||||
|
||||
result.add quote do:
|
||||
const `taskNameImplId`: Task = proc(`taskArgEncId`: string) {.async, gcsafe, nimcall, raises: [Defect].} =
|
||||
`impl`
|
||||
|
||||
var
|
||||
taskBody = newStmtList()
|
||||
taskSyncBody = newStmtList()
|
||||
taskProcDef = newNimNode(nnkProcDef)
|
||||
taskProcSyncDef = newNimNode(nnkProcDef)
|
||||
taskProcParams = newNimNode(nnkFormalParams)
|
||||
stoppedIdentDefs = newNimNode(nnkIdentDefs)
|
||||
taskRunnerIdentDefs = newNimNode(nnkIdentDefs)
|
||||
workerNameIdentDefs = newNimNode(nnkIdentDefs)
|
||||
|
||||
taskProcDef.add(taskNameId)
|
||||
taskProcDef.add(newEmptyNode())
|
||||
taskProcDef.add(body[2])
|
||||
taskProcParams.add(newEmptyNode())
|
||||
taskRunnerIdentDefs.add(taskRunnerId)
|
||||
taskRunnerIdentDefs.add(taskRunnerTypeId)
|
||||
taskRunnerIdentDefs.add(newEmptyNode())
|
||||
taskProcParams.add(taskRunnerIdentDefs)
|
||||
workerNameIdentDefs.add(workerNameId)
|
||||
workerNameIdentDefs.add(workerNameTypeId)
|
||||
workerNameIdentDefs.add(newEmptyNode())
|
||||
taskProcParams.add(workerNameIdentDefs)
|
||||
if stoppable == true:
|
||||
stoppedIdentDefs.add(stoppedId)
|
||||
stoppedIdentDefs.add(taskStoppedTypeId)
|
||||
stoppedIdentDefs.add(newEmptyNode())
|
||||
taskProcParams.add(stoppedIdentDefs)
|
||||
|
||||
for nn in body[3]:
|
||||
if kind(nn) == nnkIdentDefs:
|
||||
taskProcParams.add(nn)
|
||||
|
||||
taskProcDef.add(taskProcParams)
|
||||
taskProcDef.add(newNimNode(nnkPragma).add(asyncPragmaId))
|
||||
taskProcDef.add(newEmptyNode())
|
||||
|
||||
copyChildrenTo(taskProcDef, taskProcSyncDef)
|
||||
taskProcSyncDef[0] = taskNameSyncId
|
||||
taskProcSyncDef[4] = newEmptyNode()
|
||||
|
||||
if kind == rts:
|
||||
if kind(taskReturnTypeId) != nnkEmpty:
|
||||
var futureBracket = newNimNode(nnkBracketExpr)
|
||||
futureBracket.add(futureTypeId)
|
||||
futureBracket.add(taskReturnTypeId)
|
||||
taskProcDef[3][0] = futureBracket
|
||||
|
||||
taskProcSyncDef[3][0] = taskReturnTypeId
|
||||
|
||||
taskBody.add quote do:
|
||||
let
|
||||
`workerId` = taskRunner.workers[workerName].worker
|
||||
`chanSendToHostId` = `workerId`.chanRecvFromWorker
|
||||
`chanSendToWorkerId` = `workerId`.chanSendToWorker
|
||||
|
||||
if kind == rts:
|
||||
taskBody.add quote do:
|
||||
let `chanReturnToSenderId` = newWorkerChannel()
|
||||
|
||||
taskBody.add quote do:
|
||||
let `taskArgId` = `taskArgTypeDerivedId`(
|
||||
`chanSendToHostId`: cast[`serializedPointerTypeId`](`chanSendToHostId`),
|
||||
task: cast[`serializedPointerTypeId`](`taskNameImplId`),
|
||||
taskName: `taskName`,
|
||||
`workerRunningId`: cast[`serializedPointerTypeId`](addr taskRunner.running)
|
||||
)
|
||||
|
||||
var taskArgConstructor = taskBody[if kind == rts: 2 else: 1][0][2]
|
||||
|
||||
if kind == rts:
|
||||
var objField = newNimNode(nnkExprColonExpr)
|
||||
objField.add(chanReturnToSenderId)
|
||||
objField.add quote do: cast[`serializedPointerTypeId`](`chanReturnToSenderId`)
|
||||
taskArgConstructor.add(objField)
|
||||
|
||||
if stoppable == true:
|
||||
var objField = newNimNode(nnkExprColonExpr)
|
||||
objField.add(taskStoppedId)
|
||||
objField.add quote do: cast[`serializedPointerTypeId`](`stoppedId`)
|
||||
taskArgConstructor.add(objField)
|
||||
|
||||
for nn in body[3]:
|
||||
var objField = newNimNode(nnkExprColonExpr)
|
||||
if kind(nn) == nnkIdentDefs:
|
||||
objField.add(nn[0])
|
||||
objField.add(nn[0])
|
||||
taskArgConstructor.add(objField)
|
||||
|
||||
if kind == rts:
|
||||
taskBody.add quote do:
|
||||
`chanReturnToSenderId`.open()
|
||||
|
||||
copyChildrenTo(taskBody, taskSyncBody)
|
||||
|
||||
taskBody.add quote do:
|
||||
asyncSpawn `chanSendToWorkerId`.send(`taskArgId`.encode.safe)
|
||||
|
||||
taskSyncBody.add quote do:
|
||||
`chanSendToWorkerId`.sendSync(`taskArgId`.encode.safe)
|
||||
|
||||
if kind == rts:
|
||||
if kind(taskReturnTypeId) != nnkEmpty:
|
||||
taskBody.add quote do:
|
||||
let res = decode[`taskReturnTypeId`]($(await `chanReturnToSenderId`.recv()))
|
||||
`chanReturnToSenderId`.close()
|
||||
return res
|
||||
|
||||
taskSyncBody.add quote do:
|
||||
let res = decode[`taskReturnTypeId`]($`chanReturnToSenderId`.recvSync())
|
||||
`chanReturnToSenderId`.close()
|
||||
return res
|
||||
|
||||
else:
|
||||
taskBody.add quote do:
|
||||
discard $(await `chanReturnToSenderId`.recv())
|
||||
`chanReturnToSenderId`.close()
|
||||
|
||||
taskSyncBody.add quote do:
|
||||
discard $`chanReturnToSenderId`.recvSync()
|
||||
`chanReturnToSenderId`.close()
|
||||
|
||||
taskProcDef.add(taskBody)
|
||||
taskProcSyncDef.add(taskSyncBody)
|
||||
|
||||
result.add(taskProcDef)
|
||||
result.add(taskProcSyncDef)
|
||||
|
||||
if exported:
|
||||
result.add quote do:
|
||||
export `taskArgTypeDerivedId`, `taskNameId`, `taskNameSyncId`, `taskNameImplId`
|
||||
|
||||
# debug ----------------------------------------------------------------------
|
||||
# echo toStrLit(result)
|
||||
|
||||
# The approach below doesn't work because unexpected things can happen with the
|
||||
# AST of `body`, at least that's that what I observed; can look into a
|
||||
# different approach:
|
||||
# https://github.com/beef331/kashae/blob/master/src/kashae.nim#L204-L220
|
||||
# (that approach was recommended in #main channel of the official Nim discord
|
||||
# server when I asked about the unexpected AST things)
|
||||
|
||||
# macro task*(kind: TaskKind, body: untyped): untyped =
|
||||
# result = newStmtList()
|
||||
# result.add(quote do: task(`kind`, false, `body`))
|
||||
#
|
||||
# macro task*(stoppable: bool, body: untyped): untyped =
|
||||
# result = newStmtList()
|
||||
# result.add(quote do: task(no_rts, `stoppable`, `body`))
|
||||
#
|
||||
# macro task*(body: untyped): untyped =
|
||||
# result = newStmtList()
|
||||
# result.add(quote do: task(no_rts, false, `body`))
|
|
@ -0,0 +1,64 @@
|
|||
import # std libs
|
||||
std/[atomics, tables]
|
||||
|
||||
import # task_runner libs
|
||||
./tasks, ./workers
|
||||
|
||||
export atomics, tables, tasks, workers
|
||||
|
||||
logScope:
|
||||
topics = "task_runner"
|
||||
|
||||
type
|
||||
WorkerTable = TableRef[string, tuple[kind: WorkerKind, worker: Worker]]
|
||||
|
||||
TaskRunner* = ref object
|
||||
running*: Atomic[bool]
|
||||
workers*: WorkerTable
|
||||
|
||||
proc newWorkerTable*(): WorkerTable =
|
||||
newTable[string, tuple[kind: WorkerKind, worker: Worker]]()
|
||||
|
||||
proc new*(T: type TaskRunner, workers: WorkerTable = newWorkerTable()): T =
|
||||
# Atomic[bool] is `false` by default, no need to initialize `running`
|
||||
T(workers: workers)
|
||||
|
||||
proc start*(self: TaskRunner) {.async.} =
|
||||
trace "task runner starting"
|
||||
var starting: seq[Future[void]] = @[]
|
||||
for v in self.workers.values:
|
||||
let (kind, worker) = v
|
||||
case kind:
|
||||
of pool:
|
||||
starting.add cast[PoolWorker](worker).start()
|
||||
of thread:
|
||||
starting.add cast[ThreadWorker](worker).start()
|
||||
await allFutures(starting)
|
||||
trace "task runner started"
|
||||
self.running.store(true)
|
||||
|
||||
proc stop*(self: TaskRunner) {.async.} =
|
||||
trace "task runner stopping"
|
||||
self.running.store(false)
|
||||
var stopping: seq[Future[void]] = @[]
|
||||
for v in self.workers.values:
|
||||
let (kind, worker) = v
|
||||
case kind:
|
||||
of pool:
|
||||
stopping.add(cast[PoolWorker](worker).stop())
|
||||
of thread:
|
||||
stopping.add(cast[ThreadWorker](worker).stop())
|
||||
await allFutures(stopping)
|
||||
trace "task runner stopped"
|
||||
|
||||
proc createWorker*(self: TaskRunner, kind: WorkerKind, name: string,
|
||||
context: Context = emptyContext, contextArg: ContextArg = ContextArg(),
|
||||
size = DefaultPoolSize) =
|
||||
let running = cast[pointer](addr self.running)
|
||||
case kind:
|
||||
of pool:
|
||||
self.workers[name] = (kind: kind,
|
||||
worker: PoolWorker.new(name, running, context, contextArg, size))
|
||||
of thread:
|
||||
self.workers[name] = (kind: kind,
|
||||
worker: ThreadWorker.new(name, running, context, contextArg))
|
|
@ -1,12 +0,0 @@
|
|||
type
|
||||
ThreadSafeString* = distinct cstring
|
||||
|
||||
proc safe*(input: string): ThreadSafeString =
|
||||
var res = cast[cstring](allocShared(input.len + 1))
|
||||
copyMem(res, input.cstring, input.len)
|
||||
res[input.len] = '\0'
|
||||
res.ThreadSafeString
|
||||
|
||||
proc `$`*(input: ThreadSafeString): string =
|
||||
result = $(input.cstring)
|
||||
deallocShared input.cstring
|
|
@ -0,0 +1,48 @@
|
|||
import # vendor libs
|
||||
chronicles, chronos, json_serialization, json_serialization/std/options
|
||||
|
||||
export chronicles, chronos, json_serialization, options
|
||||
|
||||
type
|
||||
ContextArg* = ref object of RootObj
|
||||
|
||||
Context* = proc(arg: ContextArg): Future[void] {.gcsafe, nimcall, raises: [Defect].}
|
||||
|
||||
Task* = proc(taskArgEncoded: string): Future[void] {.gcsafe, nimcall, raises: [Defect].}
|
||||
|
||||
TaskKind* = enum no_rts, rts # rts := "return to sender"
|
||||
|
||||
TaskArg* = ref object of RootObj
|
||||
chanSendToHost*: ByteAddress # pointer to channel for sending to host
|
||||
task*: ByteAddress # pointer to task proc
|
||||
taskName*: string
|
||||
workerRunning*: ByteAddress # pointer to taskRunner's .running Atomic[bool]
|
||||
|
||||
ThreadSafeString* = distinct cstring
|
||||
|
||||
# there should eventually be the ability to reliably stop individual workers,
|
||||
# i.e. each worker would have it's own `.running` Atomic[bool] (maybe
|
||||
# reconsider the naming, e.g. "workerStopped" vs. "workerRunning" to be
|
||||
# consistent with "taskStopped", or switch the latter to
|
||||
# "taskRunning"). Currently, a TaskRunner instance's `.running` Atomic[bool]
|
||||
# serves as a "master switch" for all the workers, so it's not completely
|
||||
# inaccurate for the field on TaskArg to be named `workerRunning`
|
||||
|
||||
const emptyContext*: Context =
|
||||
proc(arg: ContextArg) {.async, gcsafe, nimcall.} = discard
|
||||
|
||||
proc decode*[T](arg: string): T =
|
||||
Json.decode(arg, T, allowUnknownFields = true)
|
||||
|
||||
proc encode*[T](arg: T): string =
|
||||
arg.toJson(typeAnnotations = true)
|
||||
|
||||
proc `$`*(input: ThreadSafeString): string =
|
||||
result = $(input.cstring)
|
||||
deallocShared input.cstring
|
||||
|
||||
proc safe*(input: string): ThreadSafeString =
|
||||
var res = cast[cstring](allocShared(input.len + 1))
|
||||
copyMem(res, input.cstring, input.len)
|
||||
res[input.len] = '\0'
|
||||
res.ThreadSafeString
|
|
@ -0,0 +1,4 @@
|
|||
import # task_runner libs
|
||||
./workers/[pool_worker, thread_worker, worker]
|
||||
|
||||
export pool_worker, thread_worker, worker
|
|
@ -0,0 +1,314 @@
|
|||
import # std libs
|
||||
std/[atomics, json, sequtils, tables]
|
||||
|
||||
import # task_runner libs
|
||||
./worker
|
||||
|
||||
export json, sequtils
|
||||
|
||||
logScope:
|
||||
topics = "task_runner"
|
||||
|
||||
type
|
||||
PoolThreadArg = ref object of ThreadArg
|
||||
poolName: string
|
||||
poolSize: int
|
||||
|
||||
PoolWorker* = ref object of Worker
|
||||
size*: int
|
||||
thread: Thread[PoolThreadArg]
|
||||
|
||||
WorkerThreadArg = ref object of ThreadArg
|
||||
poolName: string
|
||||
workerId: int
|
||||
|
||||
ThreadWorker = ref object of Worker
|
||||
id: int
|
||||
thread: Thread[WorkerThreadArg]
|
||||
|
||||
WorkerNotification = ref object
|
||||
id: int
|
||||
notice: string
|
||||
|
||||
proc poolThread(arg: PoolThreadArg) {.thread.}
|
||||
|
||||
proc workerThread(arg: WorkerThreadArg) {.thread.}
|
||||
|
||||
const DefaultPoolSize* = 16
|
||||
|
||||
proc new*(T: type PoolWorker, name: string, running: pointer,
|
||||
context: Context = emptyContext, contextArg: ContextArg = ContextArg(),
|
||||
size: int = DefaultPoolSize, awaitTasks = true): T =
|
||||
let
|
||||
chanRecvFromWorker = newWorkerChannel()
|
||||
chanSendToWorker = newWorkerChannel()
|
||||
thread = Thread[PoolThreadArg]()
|
||||
|
||||
T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker,
|
||||
chanSendToWorker: chanSendToWorker, context: context,
|
||||
contextArg: contextArg, name: name, running: running, size: size,
|
||||
thread: thread)
|
||||
|
||||
proc start*(self: PoolWorker) {.async.} =
|
||||
trace "pool starting", pool=self.name, poolSize=self.size
|
||||
self.chanRecvFromWorker.open()
|
||||
self.chanSendToWorker.open()
|
||||
let arg = PoolThreadArg(awaitTasks: self.awaitTasks,
|
||||
chanRecvFromHost: self.chanSendToWorker,
|
||||
chanSendToHost: self.chanRecvFromWorker, context: self.context,
|
||||
contextArg: self.contextArg, running: self.running, poolName: self.name,
|
||||
poolSize: self.size)
|
||||
|
||||
createThread(self.thread, poolThread, arg)
|
||||
let notice = $(await self.chanRecvFromWorker.recv())
|
||||
trace "pool started", notice, pool=self.name, poolSize=self.size
|
||||
|
||||
proc stop*(self: PoolWorker) {.async.} =
|
||||
asyncSpawn self.chanSendToWorker.send("stop".safe)
|
||||
joinThread(self.thread)
|
||||
self.chanRecvFromWorker.close()
|
||||
self.chanSendToWorker.close()
|
||||
trace "pool stopped", pool=self.name, poolSize=self.size
|
||||
|
||||
proc new*(T: type ThreadWorker, name: string, id: int, running: pointer,
|
||||
chanRecvFromWorker: WorkerChannel, context: Context = emptyContext,
|
||||
contextArg: ContextArg = ContextArg(), awaitTasks = true): T =
|
||||
let
|
||||
chanSendToWorker = newWorkerChannel()
|
||||
thread = Thread[WorkerThreadArg]()
|
||||
|
||||
T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker,
|
||||
chanSendToWorker: chanSendToWorker, context: context,
|
||||
contextArg: contextArg, name: name, running: running, id: id,
|
||||
thread: thread)
|
||||
|
||||
proc start*(self: ThreadWorker) {.async.} =
|
||||
self.chanSendToWorker.open()
|
||||
let arg = WorkerThreadArg(awaitTasks: self.awaitTasks,
|
||||
chanRecvFromHost: self.chanSendToWorker,
|
||||
chanSendToHost: self.chanRecvFromWorker, context: self.context,
|
||||
contextArg: self.contextArg, running: self.running, poolName: self.name,
|
||||
workerId: self.id)
|
||||
|
||||
createThread(self.thread, workerThread, arg)
|
||||
|
||||
proc stop*(self: ThreadWorker) {.async.} =
|
||||
asyncSpawn self.chanSendToWorker.send("stop".safe)
|
||||
joinThread(self.thread)
|
||||
self.chanSendToWorker.close()
|
||||
trace "pool worker stopped", pool=self.name, workerId=self.id
|
||||
|
||||
proc pool(arg: PoolThreadArg) {.async, raises: [Defect].} =
|
||||
let
|
||||
chanRecvFromHostOrWorker = arg.chanRecvFromHost
|
||||
chanSendToHost = arg.chanSendToHost
|
||||
pool = arg.poolName
|
||||
poolSize = arg.poolSize
|
||||
|
||||
var running = cast[ptr Atomic[bool]](arg.running)
|
||||
|
||||
chanRecvFromHostOrWorker.open()
|
||||
chanSendToHost.open()
|
||||
|
||||
let notice = "ready"
|
||||
trace "pool sent notification to host", notice, pool
|
||||
asyncSpawn chanSendToHost.send(notice.safe)
|
||||
|
||||
var
|
||||
taskQueue: seq[string] = @[] # FIFO queue
|
||||
workersBusy = newTable[int, ThreadWorker]()
|
||||
workersIdle: seq[ThreadWorker] = @[]
|
||||
workersStarted = 0
|
||||
|
||||
for i in 0..<poolSize:
|
||||
let
|
||||
workerId = i + 1
|
||||
worker = ThreadWorker.new(pool, workerId, arg.running,
|
||||
chanRecvFromHostOrWorker, arg.context, arg.contextArg, arg.awaitTasks)
|
||||
|
||||
workersBusy[workerId] = worker
|
||||
trace "pool worker starting", pool, workerId
|
||||
trace "pool marked new worker as busy", pool, poolSize, workerId,
|
||||
workersStarted=workerId
|
||||
|
||||
asyncSpawn worker.start()
|
||||
|
||||
# when task received and number of busy threads == poolSize, then put task in
|
||||
# taskQueue
|
||||
|
||||
# when task received and number of busy threads < poolSize, pop worker from
|
||||
# workersIdle, track that worker in workersBusy, and send task to that
|
||||
# worker; if taskQueue is not empty then before sending the current task it
|
||||
# should be added to the queue and replaced with oldest task in the queue
|
||||
|
||||
# if "ready" or "done" received from a worker, remove worker from
|
||||
# workersBusy, and push worker into workersIdle
|
||||
|
||||
while true:
|
||||
trace "pool waiting for message", pool
|
||||
var
|
||||
message = $(await chanRecvFromHostOrWorker.recv())
|
||||
shouldSendToWorker = false
|
||||
|
||||
if message == "stop":
|
||||
trace "pool stopping", notice=message, pool, poolSize
|
||||
var stopping: seq[Future[void]] = @[]
|
||||
for worker in workersIdle:
|
||||
stopping.add worker.stop()
|
||||
for worker in workersBusy.values:
|
||||
stopping.add worker.stop()
|
||||
|
||||
await allFutures(stopping)
|
||||
trace "pool workers all stopped", pool, poolSize
|
||||
break
|
||||
|
||||
if running[].load():
|
||||
try:
|
||||
let
|
||||
parsed = parseJson(message)
|
||||
messageType = parsed{"$type"}.getStr
|
||||
|
||||
case messageType
|
||||
of "WorkerNotification:ObjectType":
|
||||
try:
|
||||
let
|
||||
notification = decode[WorkerNotification](message)
|
||||
workerId = notification.id
|
||||
notice = notification.notice
|
||||
worker = workersBusy[workerId]
|
||||
|
||||
if notice == "ready" or notice == "done":
|
||||
if notice == "ready":
|
||||
trace "pool worker started", notice, pool, workerId
|
||||
workersStarted = workersStarted + 1
|
||||
if workersStarted == poolSize:
|
||||
trace "pool workers all started", pool, poolSize
|
||||
|
||||
workersBusy.del workerId
|
||||
workersIdle.add worker
|
||||
trace "pool marked worker as idle", notice, pool, poolSize,
|
||||
workerId, workersBusy=workersBusy.len,
|
||||
workersIdle=workersIdle.len
|
||||
|
||||
else:
|
||||
error "pool received unknown notification from worker", notice,
|
||||
pool, workerId
|
||||
|
||||
except CatchableError as e:
|
||||
error "exception raised while handling pool worker notification",
|
||||
error=e.msg, notification=message, pool
|
||||
|
||||
else: # it's a task to send to an idle worker or add to the taskQueue
|
||||
trace "pool received message", message, pool
|
||||
shouldSendToWorker = true
|
||||
|
||||
except CatchableError as e:
|
||||
error "pool received unknown message", error=e.msg, message, pool
|
||||
|
||||
if (not shouldSendToWorker) and taskQueue.len > 0 and
|
||||
workersBusy.len < poolSize:
|
||||
message = taskQueue[0]
|
||||
taskQueue.delete 0, 0
|
||||
trace "pool removed task from queue", pool, queued=taskQueue.len
|
||||
shouldSendToWorker = true
|
||||
|
||||
elif shouldSendToWorker and taskQueue.len > 0 and
|
||||
workersBusy.len < poolSize:
|
||||
taskQueue.add message
|
||||
message = taskQueue[0]
|
||||
taskQueue.delete 0, 0
|
||||
trace "pool added task to queue and removed oldest task from queue",
|
||||
pool, queued=taskQueue.len
|
||||
|
||||
elif shouldSendToWorker and workersBusy.len == poolSize:
|
||||
taskQueue.add message
|
||||
trace "pool added task to queue", pool, queued=taskQueue.len
|
||||
shouldSendToWorker = false
|
||||
|
||||
if shouldSendToWorker:
|
||||
let
|
||||
worker = workersIdle[0]
|
||||
workerId = worker.id
|
||||
|
||||
workersIdle.delete 0, 0
|
||||
workersBusy[workerId] = worker
|
||||
trace "pool sent task to worker", pool, workerId
|
||||
trace "pool marked worker as busy", pool, poolSize, workerId,
|
||||
workersBusy=workersBusy.len, workersIdle=workersIdle.len
|
||||
|
||||
asyncSpawn worker.chanSendToWorker.send(message.safe)
|
||||
|
||||
chanRecvFromHostOrWorker.close()
|
||||
chanSendToHost.close()
|
||||
|
||||
proc poolThread(arg: PoolThreadArg) {.thread.} =
|
||||
waitFor pool(arg)
|
||||
|
||||
proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} =
|
||||
let
|
||||
awaitTasks = arg.awaitTasks
|
||||
chanRecvFromHost = arg.chanRecvFromHost
|
||||
chanSendToHost = arg.chanSendToHost
|
||||
pool = arg.poolName
|
||||
workerId = arg.workerId
|
||||
|
||||
var running = cast[ptr Atomic[bool]](arg.running)
|
||||
|
||||
chanRecvFromHost.open()
|
||||
chanSendToHost.open()
|
||||
|
||||
trace "pool worker running context", pool, workerId
|
||||
await arg.context(arg.contextArg)
|
||||
let
|
||||
notice = "ready"
|
||||
notification = WorkerNotification(id: workerId, notice: notice)
|
||||
|
||||
trace "pool worker sent notification to pool", notice, pool, workerId
|
||||
asyncSpawn chanSendToHost.send(notification.encode.safe)
|
||||
|
||||
while true:
|
||||
trace "pool worker waiting for message", pool, workerId
|
||||
let message = $(await chanRecvFromHost.recv())
|
||||
|
||||
if message == "stop":
|
||||
trace "pool worker stopping", notice=message, pool, workerId
|
||||
break
|
||||
|
||||
if running[].load():
|
||||
var
|
||||
msgerr = false
|
||||
parsed: JsonNode
|
||||
task: Task
|
||||
taskName: string
|
||||
|
||||
try:
|
||||
parsed = parseJson(message)
|
||||
task = cast[Task](parsed{"task"}.getInt)
|
||||
taskName = parsed{"taskName"}.getStr
|
||||
|
||||
trace "pool worker received message", message, pool, workerId
|
||||
trace "pool worker running task", pool, task=taskName, workerId
|
||||
|
||||
except CatchableError as e:
|
||||
msgerr = true
|
||||
error "pool worker received unknown message", error=e.msg, message,
|
||||
pool, workerId
|
||||
|
||||
if not msgerr:
|
||||
if awaitTasks:
|
||||
await task(message)
|
||||
else:
|
||||
asyncSpawn task(message)
|
||||
|
||||
let
|
||||
notice = "done"
|
||||
notification = WorkerNotification(id: workerId, notice: notice)
|
||||
|
||||
trace "pool worker sent notification to pool", notice, pool, workerId
|
||||
asyncSpawn chanSendToHost.send(notification.encode.safe)
|
||||
|
||||
chanRecvFromHost.close()
|
||||
chanSendToHost.close()
|
||||
|
||||
proc workerThread(arg: WorkerThreadArg) {.thread.} =
|
||||
waitFor worker(arg)
|
|
@ -0,0 +1,108 @@
|
|||
import # std libs
|
||||
std/[atomics, json]
|
||||
|
||||
import # task_runner libs
|
||||
./worker
|
||||
|
||||
export json
|
||||
|
||||
logScope:
|
||||
topics = "task_runner"
|
||||
|
||||
type
|
||||
WorkerThreadArg = ref object of ThreadArg
|
||||
workerName: string
|
||||
|
||||
ThreadWorker* = ref object of Worker
|
||||
thread: Thread[WorkerThreadArg]
|
||||
|
||||
proc workerThread(arg: WorkerThreadArg) {.thread.}
|
||||
|
||||
proc new*(T: type ThreadWorker, name: string, running: pointer,
|
||||
context: Context = emptyContext, contextArg: ContextArg = ContextArg(),
|
||||
awaitTasks = false): T =
|
||||
let
|
||||
chanRecvFromWorker = newWorkerChannel()
|
||||
chanSendToWorker = newWorkerChannel()
|
||||
thread = Thread[WorkerThreadArg]()
|
||||
|
||||
T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker,
|
||||
chanSendToWorker: chanSendToWorker, context: context,
|
||||
contextArg: contextArg, name: name, running: running, thread: thread)
|
||||
|
||||
proc start*(self: ThreadWorker) {.async.} =
|
||||
trace "worker starting", worker=self.name
|
||||
self.chanRecvFromWorker.open()
|
||||
self.chanSendToWorker.open()
|
||||
let arg = WorkerThreadArg(awaitTasks: self.awaitTasks,
|
||||
chanRecvFromHost: self.chanSendToWorker,
|
||||
chanSendToHost: self.chanRecvFromWorker, context: self.context,
|
||||
contextArg: self.contextArg, running: self.running, workerName: self.name)
|
||||
|
||||
createThread(self.thread, workerThread, arg)
|
||||
let notice = $(await self.chanRecvFromWorker.recv())
|
||||
trace "worker started", notice, worker=self.name
|
||||
|
||||
proc stop*(self: ThreadWorker) {.async.} =
|
||||
asyncSpawn self.chanSendToWorker.send("stop".safe)
|
||||
joinThread(self.thread)
|
||||
self.chanRecvFromWorker.close()
|
||||
self.chanSendToWorker.close()
|
||||
trace "worker stopped", worker=self.name
|
||||
|
||||
proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} =
|
||||
let
|
||||
awaitTasks = arg.awaitTasks
|
||||
chanRecvFromHost = arg.chanRecvFromHost
|
||||
chanSendToHost = arg.chanSendToHost
|
||||
worker = arg.workerName
|
||||
|
||||
var running = cast[ptr Atomic[bool]](arg.running)
|
||||
|
||||
chanRecvFromHost.open()
|
||||
chanSendToHost.open()
|
||||
|
||||
trace "worker running context", worker
|
||||
await arg.context(arg.contextArg)
|
||||
let notice = "ready"
|
||||
trace "worker sent notification to host", notice, worker
|
||||
asyncSpawn chanSendToHost.send(notice.safe)
|
||||
|
||||
while true:
|
||||
trace "worker waiting for message", worker
|
||||
let message = $(await chanRecvFromHost.recv())
|
||||
|
||||
if message == "stop":
|
||||
trace "worker stopping", notice=message, worker
|
||||
break
|
||||
|
||||
if running[].load():
|
||||
var
|
||||
msgerr = false
|
||||
parsed: JsonNode
|
||||
task: Task
|
||||
taskName: string
|
||||
|
||||
try:
|
||||
parsed = parseJson(message)
|
||||
task = cast[Task](parsed{"task"}.getInt)
|
||||
taskName = parsed{"taskName"}.getStr
|
||||
|
||||
trace "worker received message", message, worker
|
||||
trace "worker running task", task=taskName, worker
|
||||
|
||||
except CatchableError as e:
|
||||
msgerr = true
|
||||
error "worker received unknown message", error=e.msg, message, worker
|
||||
|
||||
if not msgerr:
|
||||
if awaitTasks:
|
||||
await task(message)
|
||||
else:
|
||||
asyncSpawn task(message)
|
||||
|
||||
chanRecvFromHost.close()
|
||||
chanSendToHost.close()
|
||||
|
||||
proc workerThread(arg: WorkerThreadArg) {.thread.} =
|
||||
waitFor worker(arg)
|
|
@ -0,0 +1,29 @@
|
|||
import # task_runner libs
|
||||
../achannels, ../tasks
|
||||
|
||||
export achannels, tasks
|
||||
|
||||
type
|
||||
WorkerChannel* = AsyncChannel[ThreadSafeString]
|
||||
|
||||
WorkerKind* = enum pool, thread
|
||||
|
||||
ThreadArg* = ref object of RootObj
|
||||
awaitTasks*: bool
|
||||
chanRecvFromHost*: WorkerChannel
|
||||
chanSendToHost*: WorkerChannel
|
||||
context*: Context
|
||||
contextArg*: ContextArg
|
||||
running*: pointer
|
||||
|
||||
Worker* = ref object of RootObj
|
||||
awaitTasks*: bool
|
||||
chanRecvFromWorker*: WorkerChannel
|
||||
chanSendToWorker*: WorkerChannel
|
||||
context*: Context
|
||||
contextArg*: ContextArg
|
||||
name*: string
|
||||
running*: pointer
|
||||
|
||||
proc newWorkerChannel*(): WorkerChannel =
|
||||
newAsyncChannel[ThreadSafeString](-1)
|
|
@ -1 +1 @@
|
|||
Subproject commit 0a7401ad466d70bab31c5d6dc82d1d584e4ebd1f
|
||||
Subproject commit dc62f4fccd2d40c884009ae8f2b14bb6a86a55cf
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 63ce43a86a40a4c73d1b3b8317278d47ec55a458
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 5eb7fd0c90d3f03b6778688a5893fdd2715e9fe2
|
|
@ -1 +1 @@
|
|||
Subproject commit 613ad40f00ab3d0ee839f9db9c4d25e5e0248dee
|
||||
Subproject commit 8b492c74b56c62bcee991a6899d413938a3accc5
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 652099a95960be7790e2f4b4c925d0dd703cc9aa
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 5213d397f9d85c69279961256e19a859cd32df30
|
|
@ -1 +1 @@
|
|||
Subproject commit ede0651741aa4f14f76c5560c3d2c6730757366d
|
||||
Subproject commit 70680e2af2f3b0ead9ea49969731910e0898fbbe
|
|
@ -1 +1 @@
|
|||
Subproject commit e788deab3d59ff8a4fe103aeb5d82d3d82fcac7d
|
||||
Subproject commit 91d4eaa4ccb4bfddf179fe2ee4247ae000e2587f
|
|
@ -1 +1 @@
|
|||
Subproject commit 2b097ec86aead0119c5e6bfff8502c3948a1ceaf
|
||||
Subproject commit 12c3591fc165e88e90f5a0f1e5f5f37c69ef41f4
|
Loading…
Reference in New Issue