refactor: import task_runner from vendor and remove task_runnner modules under examples/chat

This commit is contained in:
Michael Bradley, Jr 2021-07-18 20:39:02 -05:00 committed by Michael Bradley
parent 075b205aad
commit 185edf73e1
9 changed files with 4 additions and 919 deletions

View File

@ -1,8 +1,11 @@
import # std libs
std/[sets, times]
import # vendor libs
task_runner
import # chat libs
../config, ../task_runner
../config
import # nim-status libs
../../../nim_status/accounts/public_accounts

View File

@ -1,13 +0,0 @@
# The code in ./task_runner is intended for eventual inclusion in:
# https://github.com/status-im/nim-task-runner
# The implementation in this repo is the "second nursery" for refining the
# concepts and code involved; the "first nursery" was status-desktop.
import # chat libs
./task_runner/impl, ./task_runner/macros
export impl, macros
logScope:
topics = "task_runner"

View File

@ -1,67 +0,0 @@
import # std libs
std/[atomics, tables]
import # vendor libs
chronicles, chronos, task_runner
import # chat libs
./tasks, ./workers
export atomics, chronicles, chronos, tables, task_runner, tasks, workers
logScope:
topics = "task_runner"
type
WorkerTable = TableRef[string, tuple[kind: WorkerKind, worker: Worker]]
TaskRunner* = ref object
running*: Atomic[bool]
workers*: WorkerTable
proc newWorkerTable*(): WorkerTable =
newTable[string, tuple[kind: WorkerKind, worker: Worker]]()
proc new*(T: type TaskRunner, workers: WorkerTable = newWorkerTable()): T =
# Atomic[bool] is `false` by default, no need to initialize `running`
T(workers: workers)
proc start*(self: TaskRunner) {.async.} =
trace "task runner starting"
var starting: seq[Future[void]] = @[]
for v in self.workers.values:
let (kind, worker) = v
case kind:
of pool:
starting.add cast[PoolWorker](worker).start()
of thread:
starting.add cast[ThreadWorker](worker).start()
await allFutures(starting)
trace "task runner started"
self.running.store(true)
proc stop*(self: TaskRunner) {.async.} =
trace "task runner stopping"
self.running.store(false)
var stopping: seq[Future[void]] = @[]
for v in self.workers.values:
let (kind, worker) = v
case kind:
of pool:
stopping.add(cast[PoolWorker](worker).stop())
of thread:
stopping.add(cast[ThreadWorker](worker).stop())
await allFutures(stopping)
trace "task runner stopped"
proc createWorker*(self: TaskRunner, kind: WorkerKind, name: string,
context: Context = emptyContext, contextArg: ContextArg = ContextArg(),
size = DefaultPoolSize) =
let running = cast[pointer](addr self.running)
case kind:
of pool:
self.workers[name] = (kind: kind,
worker: PoolWorker.new(name, running, context, contextArg, size))
of thread:
self.workers[name] = (kind: kind,
worker: ThreadWorker.new(name, running, context, contextArg))

View File

@ -1,329 +0,0 @@
import # std libs
std/[macros, unicode]
import # vendor libs
chronicles
import # chat libs
./impl
logScope:
topics = "task_runner"
macro task*(kind: static TaskKind, stoppable: static bool, body: untyped): untyped =
result = newStmtList()
const
star = "*"
syncPost = "Sync"
taskArgPost = "TaskArg"
taskPost = "Task"
var
exported = false
starId = ident(star)
taskArgTypeId = ident(taskArgPost)
taskArgTypeDerivedId: NimNode
taskName: string
taskNameId: NimNode
taskNameImplId: NimNode
taskNameSyncId: NimNode
taskReturnTypeId: NimNode
if kind(body[0]) == nnkPostfix and body[0][0] == ident(star):
exported = true
taskName = strVal(body[0][1])
taskNameId = ident(taskName)
else:
taskNameId = body[0]
taskName = strVal(taskNameId)
taskArgTypeDerivedId = ident(taskName.capitalize & taskArgPost)
taskNameImplId = ident(taskName & taskPost)
taskNameSyncId = ident(taskName & syncPost)
taskReturnTypeId = body[3][0]
let
chanReturnToSenderId = ident("chanReturnToSender")
serializedPointerTypeId = ident("ByteAddress")
taskStoppedId = ident("taskStopped")
# The repetitiveness of some code below could/should be cleaned up with
# additional metaprogramming (and probably more informed use of shortcuts and
# helpers provided by Nim's macros module); there can be a task options
# object for which e.g. the `stoppable` field is a boolean flag, but then
# also a helper object/table with the same fields/keys but the values are
# tuples of the type names and field names to be added to the type derived
# from TaskArg; the fields of the supplied/default options object can be
# iterated over and the proper nnkIdentDefs, etc. can be built according to
# the options values and info in the helper object/table; the same (or very
# similar) technique could be used to allow e.g. specification of a
# TaskRunner instance and/or worker name and/or `ptr Atomic[bool]` (for
# stopping the task) in the options object, which would then affect whether
# parameters for those things are included in the type signatures of the
# constructed procs or instead baked into their bodies.
var
taskArgTypeDef = newNimNode(nnkTypeDef)
objDef = newNimNode(nnkObjectTy)
recList = newNimNode(nnkRecList)
taskArgTypeDef.add(taskArgTypeDerivedId)
taskArgTypeDef.add(newEmptyNode())
objDef.add(newEmptyNode())
objDef.add(newNimNode(nnkOfInherit).add(taskArgTypeId))
if kind == rts:
var
objParam = newNimNode(nnkIdentDefs)
post = newNimNode(nnkPostfix)
post.add(starId)
post.add(chanReturnToSenderId)
objParam.add(post)
objParam.add(serializedPointerTypeId)
objParam.add(newEmptyNode())
recList.add(objParam)
if stoppable == true:
var
objParam = newNimNode(nnkIdentDefs)
post = newNimNode(nnkPostfix)
post.add(starId)
post.add(taskStoppedId)
objParam.add(post)
objParam.add(serializedPointerTypeId)
objParam.add(newEmptyNode())
recList.add(objParam)
for nn in body[3]:
if kind(nn) == nnkIdentDefs:
var
objParam = newNimNode(nnkIdentDefs)
post = newNimNode(nnkPostfix)
post.add(starId)
post.add(nn[0])
objParam.add(post)
objParam.add(nn[1])
objParam.add(nn[2])
recList.add(objParam)
objDef.add(recList)
taskArgTypeDef.add(newNimNode(nnkRefTy).add(objDef))
result.add(newNimNode(nnkTypeSection).add(taskArgTypeDef))
let
asyncPragmaId = ident("async")
atomicTypeId = ident("Atomic")
boolTypeId = ident("bool")
futureTypeId = ident("Future")
taskArgId = ident("taskArg")
taskArgEncId = ident("taskArgEncoded")
chanSendToHostId = ident("chanSendToHost")
chanSendToWorkerId = ident("chanSendToWorker")
stoppedId = ident("stopped")
taskRunnerId = ident("taskRunner")
taskRunnerTypeId = ident("TaskRunner")
workerChannelTypeId = ident("WorkerChannel")
workerId = ident("worker")
workerNameId = ident("workerName")
workerNameTypeId = ident("string")
workerRunningId = ident("workerRunning")
var
atomPtr = newNimNode(nnkPtrTy)
atomBracket = newNimNode(nnkBracketExpr)
atomBracket.add(atomicTypeId)
atomBracket.add(boolTypeId)
atomPtr.add(atomBracket)
let
taskStoppedTypeId = atomPtr
workerRunningTypeId = atomPtr
var impl = newStmtList()
impl.add quote do:
let
`taskArgId` = decode[`taskArgTypeDerivedId`](`taskArgEncId`)
`chanSendToHostId` = cast[`workerChannelTypeId`](`taskArgId`.`chanSendToHostId`)
if kind == rts:
impl.add quote do:
let `chanReturnToSenderId` = cast[`workerChannelTypeId`](`taskArgId`.`chanReturnToSenderId`)
if stoppable == true:
impl.add quote do:
var `taskStoppedId` = cast[`taskStoppedTypeId`](`taskArgId`.`taskStoppedId`)
impl.add quote do:
var `workerRunningId` = cast[`workerRunningTypeId`](`taskArgId`.`workerRunningId`)
for nn in body[3]:
if kind(nn) == nnkIdentDefs:
let id = nn[0]
impl.add quote do:
let `id` = `taskArgId`.`id`
impl.add(body[6])
result.add quote do:
const `taskNameImplId`: Task = proc(`taskArgEncId`: string) {.async, gcsafe, nimcall, raises: [Defect].} =
`impl`
var
taskBody = newStmtList()
taskSyncBody = newStmtList()
taskProcDef = newNimNode(nnkProcDef)
taskProcSyncDef = newNimNode(nnkProcDef)
taskProcParams = newNimNode(nnkFormalParams)
stoppedIdentDefs = newNimNode(nnkIdentDefs)
taskRunnerIdentDefs = newNimNode(nnkIdentDefs)
workerNameIdentDefs = newNimNode(nnkIdentDefs)
taskProcDef.add(taskNameId)
taskProcDef.add(newEmptyNode())
taskProcDef.add(body[2])
taskProcParams.add(newEmptyNode())
taskRunnerIdentDefs.add(taskRunnerId)
taskRunnerIdentDefs.add(taskRunnerTypeId)
taskRunnerIdentDefs.add(newEmptyNode())
taskProcParams.add(taskRunnerIdentDefs)
workerNameIdentDefs.add(workerNameId)
workerNameIdentDefs.add(workerNameTypeId)
workerNameIdentDefs.add(newEmptyNode())
taskProcParams.add(workerNameIdentDefs)
if stoppable == true:
stoppedIdentDefs.add(stoppedId)
stoppedIdentDefs.add(taskStoppedTypeId)
stoppedIdentDefs.add(newEmptyNode())
taskProcParams.add(stoppedIdentDefs)
for nn in body[3]:
if kind(nn) == nnkIdentDefs:
taskProcParams.add(nn)
taskProcDef.add(taskProcParams)
taskProcDef.add(newNimNode(nnkPragma).add(asyncPragmaId))
taskProcDef.add(newEmptyNode())
copyChildrenTo(taskProcDef, taskProcSyncDef)
taskProcSyncDef[0] = taskNameSyncId
taskProcSyncDef[4] = newEmptyNode()
if kind == rts:
if kind(taskReturnTypeId) != nnkEmpty:
var futureBracket = newNimNode(nnkBracketExpr)
futureBracket.add(futureTypeId)
futureBracket.add(taskReturnTypeId)
taskProcDef[3][0] = futureBracket
taskProcSyncDef[3][0] = taskReturnTypeId
taskBody.add quote do:
let
`workerId` = taskRunner.workers[workerName].worker
`chanSendToHostId` = `workerId`.chanRecvFromWorker
`chanSendToWorkerId` = `workerId`.chanSendToWorker
if kind == rts:
taskBody.add quote do:
let `chanReturnToSenderId` = newWorkerChannel()
taskBody.add quote do:
let `taskArgId` = `taskArgTypeDerivedId`(
`chanSendToHostId`: cast[`serializedPointerTypeId`](`chanSendToHostId`),
task: cast[`serializedPointerTypeId`](`taskNameImplId`),
taskName: `taskName`,
`workerRunningId`: cast[`serializedPointerTypeId`](addr taskRunner.running)
)
var taskArgConstructor = taskBody[if kind == rts: 2 else: 1][0][2]
if kind == rts:
var objField = newNimNode(nnkExprColonExpr)
objField.add(chanReturnToSenderId)
objField.add quote do: cast[`serializedPointerTypeId`](`chanReturnToSenderId`)
taskArgConstructor.add(objField)
if stoppable == true:
var objField = newNimNode(nnkExprColonExpr)
objField.add(taskStoppedId)
objField.add quote do: cast[`serializedPointerTypeId`](`stoppedId`)
taskArgConstructor.add(objField)
for nn in body[3]:
var objField = newNimNode(nnkExprColonExpr)
if kind(nn) == nnkIdentDefs:
objField.add(nn[0])
objField.add(nn[0])
taskArgConstructor.add(objField)
if kind == rts:
taskBody.add quote do:
`chanReturnToSenderId`.open()
copyChildrenTo(taskBody, taskSyncBody)
taskBody.add quote do:
asyncSpawn `chanSendToWorkerId`.send(`taskArgId`.encode.safe)
taskSyncBody.add quote do:
`chanSendToWorkerId`.sendSync(`taskArgId`.encode.safe)
if kind == rts:
if kind(taskReturnTypeId) != nnkEmpty:
taskBody.add quote do:
let res = decode[`taskReturnTypeId`]($(await `chanReturnToSenderId`.recv()))
`chanReturnToSenderId`.close()
return res
taskSyncBody.add quote do:
let res = decode[`taskReturnTypeId`]($`chanReturnToSenderId`.recvSync())
`chanReturnToSenderId`.close()
return res
else:
taskBody.add quote do:
discard $(await `chanReturnToSenderId`.recv())
`chanReturnToSenderId`.close()
taskSyncBody.add quote do:
discard $`chanReturnToSenderId`.recvSync()
`chanReturnToSenderId`.close()
taskProcDef.add(taskBody)
taskProcSyncDef.add(taskSyncBody)
result.add(taskProcDef)
result.add(taskProcSyncDef)
if exported:
result.add quote do:
export `taskArgTypeDerivedId`, `taskNameId`, `taskNameSyncId`, `taskNameImplId`
# debug ----------------------------------------------------------------------
# echo toStrLit(result)
# The approach below doesn't work because unexpected things can happen with the
# AST of `body`, at least that's that what I observed; can look into a
# different approach:
# https://github.com/beef331/kashae/blob/master/src/kashae.nim#L204-L220
# (that approach was recommended in #main channel of the official Nim discord
# server when I asked about the unexpected AST things)
# macro task*(kind: TaskKind, body: untyped): untyped =
# result = newStmtList()
# result.add(quote do: task(`kind`, false, `body`))
#
# macro task*(stoppable: bool, body: untyped): untyped =
# result = newStmtList()
# result.add(quote do: task(no_rts, `stoppable`, `body`))
#
# macro task*(body: untyped): untyped =
# result = newStmtList()
# result.add(quote do: task(no_rts, false, `body`))

View File

@ -1,39 +0,0 @@
import # vendor libs
chronicles, chronos, json_serialization, json_serialization/std/options
export chronicles, json_serialization, options
logScope:
topics = "task_runner"
type
ContextArg* = ref object of RootObj
Context* = proc(arg: ContextArg): Future[void] {.gcsafe, nimcall, raises: [Defect].}
Task* = proc(taskArgEncoded: string): Future[void] {.gcsafe, nimcall, raises: [Defect].}
TaskKind* = enum no_rts, rts # rts := "return to sender"
TaskArg* = ref object of RootObj
chanSendToHost*: ByteAddress # pointer to channel for sending to host
task*: ByteAddress # pointer to task proc
taskName*: string
workerRunning*: ByteAddress # pointer to taskRunner's .running Atomic[bool]
# there should eventually be the ability to reliably stop individual workers,
# i.e. each worker would have it's own `.running` Atomic[bool] (maybe
# reconsider the naming, e.g. "workerStopped" vs. "workerRunning" to be
# consistent with "taskStopped", or switch the latter to
# "taskRunning"). Currently, a TaskRunner instance's `.running` Atomic[bool]
# serves as a "master switch" for all the workers, so it's not completely
# inaccurate for the field on TaskArg to be named `workerRunning`
const emptyContext*: Context =
proc(arg: ContextArg) {.async, gcsafe, nimcall.} = discard
proc decode*[T](arg: string): T =
Json.decode(arg, T, allowUnknownFields = true)
proc encode*[T](arg: T): string =
arg.toJson(typeAnnotations = true)

View File

@ -1,7 +0,0 @@
import # chat libs
./workers/[pool_worker, thread_worker, worker]
export pool_worker, thread_worker, worker
logScope:
topics = "task_runner"

View File

@ -1,317 +0,0 @@
import # std libs
std/[atomics, json, sequtils, tables]
import # vendor libs
chronicles, chronos, task_runner
import # chat libs
./worker
export json
logScope:
topics = "task_runner"
type
PoolThreadArg = ref object of ThreadArg
poolName: string
poolSize: int
PoolWorker* = ref object of Worker
size*: int
thread: Thread[PoolThreadArg]
WorkerThreadArg = ref object of ThreadArg
poolName: string
workerId: int
ThreadWorker = ref object of Worker
id: int
thread: Thread[WorkerThreadArg]
WorkerNotification = ref object
id: int
notice: string
proc poolThread(arg: PoolThreadArg) {.thread.}
proc workerThread(arg: WorkerThreadArg) {.thread.}
const DefaultPoolSize* = 16
proc new*(T: type PoolWorker, name: string, running: pointer,
context: Context = emptyContext, contextArg: ContextArg = ContextArg(),
size: int = DefaultPoolSize, awaitTasks = true): T =
let
chanRecvFromWorker = newWorkerChannel()
chanSendToWorker = newWorkerChannel()
thread = Thread[PoolThreadArg]()
T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker,
chanSendToWorker: chanSendToWorker, context: context,
contextArg: contextArg, name: name, running: running, size: size,
thread: thread)
proc start*(self: PoolWorker) {.async.} =
trace "pool starting", pool=self.name, poolSize=self.size
self.chanRecvFromWorker.open()
self.chanSendToWorker.open()
let arg = PoolThreadArg(awaitTasks: self.awaitTasks,
chanRecvFromHost: self.chanSendToWorker,
chanSendToHost: self.chanRecvFromWorker, context: self.context,
contextArg: self.contextArg, running: self.running, poolName: self.name,
poolSize: self.size)
createThread(self.thread, poolThread, arg)
let notice = $(await self.chanRecvFromWorker.recv())
trace "pool started", notice, pool=self.name, poolSize=self.size
proc stop*(self: PoolWorker) {.async.} =
asyncSpawn self.chanSendToWorker.send("stop".safe)
joinThread(self.thread)
self.chanRecvFromWorker.close()
self.chanSendToWorker.close()
trace "pool stopped", pool=self.name, poolSize=self.size
proc new*(T: type ThreadWorker, name: string, id: int, running: pointer,
chanRecvFromWorker: WorkerChannel, context: Context = emptyContext,
contextArg: ContextArg = ContextArg(), awaitTasks = true): T =
let
chanSendToWorker = newWorkerChannel()
thread = Thread[WorkerThreadArg]()
T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker,
chanSendToWorker: chanSendToWorker, context: context,
contextArg: contextArg, name: name, running: running, id: id,
thread: thread)
proc start*(self: ThreadWorker) {.async.} =
self.chanSendToWorker.open()
let arg = WorkerThreadArg(awaitTasks: self.awaitTasks,
chanRecvFromHost: self.chanSendToWorker,
chanSendToHost: self.chanRecvFromWorker, context: self.context,
contextArg: self.contextArg, running: self.running, poolName: self.name,
workerId: self.id)
createThread(self.thread, workerThread, arg)
proc stop*(self: ThreadWorker) {.async.} =
asyncSpawn self.chanSendToWorker.send("stop".safe)
joinThread(self.thread)
self.chanSendToWorker.close()
trace "pool worker stopped", pool=self.name, workerId=self.id
proc pool(arg: PoolThreadArg) {.async, raises: [Defect].} =
let
chanRecvFromHostOrWorker = arg.chanRecvFromHost
chanSendToHost = arg.chanSendToHost
pool = arg.poolName
poolSize = arg.poolSize
var running = cast[ptr Atomic[bool]](arg.running)
chanRecvFromHostOrWorker.open()
chanSendToHost.open()
let notice = "ready"
trace "pool sent notification to host", notice, pool
asyncSpawn chanSendToHost.send(notice.safe)
var
taskQueue: seq[string] = @[] # FIFO queue
workersBusy = newTable[int, ThreadWorker]()
workersIdle: seq[ThreadWorker] = @[]
workersStarted = 0
for i in 0..<poolSize:
let
workerId = i + 1
worker = ThreadWorker.new(pool, workerId, arg.running,
chanRecvFromHostOrWorker, arg.context, arg.contextArg, arg.awaitTasks)
workersBusy[workerId] = worker
trace "pool worker starting", pool, workerId
trace "pool marked new worker as busy", pool, poolSize, workerId,
workersStarted=workerId
asyncSpawn worker.start()
# when task received and number of busy threads == poolSize, then put task in
# taskQueue
# when task received and number of busy threads < poolSize, pop worker from
# workersIdle, track that worker in workersBusy, and send task to that
# worker; if taskQueue is not empty then before sending the current task it
# should be added to the queue and replaced with oldest task in the queue
# if "ready" or "done" received from a worker, remove worker from
# workersBusy, and push worker into workersIdle
while true:
trace "pool waiting for message", pool
var
message = $(await chanRecvFromHostOrWorker.recv())
shouldSendToWorker = false
if message == "stop":
trace "pool stopping", notice=message, pool, poolSize
var stopping: seq[Future[void]] = @[]
for worker in workersIdle:
stopping.add worker.stop()
for worker in workersBusy.values:
stopping.add worker.stop()
await allFutures(stopping)
trace "pool workers all stopped", pool, poolSize
break
if running[].load():
try:
let
parsed = parseJson(message)
messageType = parsed{"$type"}.getStr
case messageType
of "WorkerNotification:ObjectType":
try:
let
notification = decode[WorkerNotification](message)
workerId = notification.id
notice = notification.notice
worker = workersBusy[workerId]
if notice == "ready" or notice == "done":
if notice == "ready":
trace "pool worker started", notice, pool, workerId
workersStarted = workersStarted + 1
if workersStarted == poolSize:
trace "pool workers all started", pool, poolSize
workersBusy.del workerId
workersIdle.add worker
trace "pool marked worker as idle", notice, pool, poolSize,
workerId, workersBusy=workersBusy.len,
workersIdle=workersIdle.len
else:
error "pool received unknown notification from worker", notice,
pool, workerId
except CatchableError as e:
error "exception raised while handling pool worker notification",
error=e.msg, notification=message, pool
else: # it's a task to send to an idle worker or add to the taskQueue
trace "pool received message", message, pool
shouldSendToWorker = true
except CatchableError as e:
error "pool received unknown message", error=e.msg, message, pool
if (not shouldSendToWorker) and taskQueue.len > 0 and
workersBusy.len < poolSize:
message = taskQueue[0]
taskQueue.delete 0, 0
trace "pool removed task from queue", pool, queued=taskQueue.len
shouldSendToWorker = true
elif shouldSendToWorker and taskQueue.len > 0 and
workersBusy.len < poolSize:
taskQueue.add message
message = taskQueue[0]
taskQueue.delete 0, 0
trace "pool added task to queue and removed oldest task from queue",
pool, queued=taskQueue.len
elif shouldSendToWorker and workersBusy.len == poolSize:
taskQueue.add message
trace "pool added task to queue", pool, queued=taskQueue.len
shouldSendToWorker = false
if shouldSendToWorker:
let
worker = workersIdle[0]
workerId = worker.id
workersIdle.delete 0, 0
workersBusy[workerId] = worker
trace "pool sent task to worker", pool, workerId
trace "pool marked worker as busy", pool, poolSize, workerId,
workersBusy=workersBusy.len, workersIdle=workersIdle.len
asyncSpawn worker.chanSendToWorker.send(message.safe)
chanRecvFromHostOrWorker.close()
chanSendToHost.close()
proc poolThread(arg: PoolThreadArg) {.thread.} =
waitFor pool(arg)
proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} =
let
awaitTasks = arg.awaitTasks
chanRecvFromHost = arg.chanRecvFromHost
chanSendToHost = arg.chanSendToHost
pool = arg.poolName
workerId = arg.workerId
var running = cast[ptr Atomic[bool]](arg.running)
chanRecvFromHost.open()
chanSendToHost.open()
trace "pool worker running context", pool, workerId
await arg.context(arg.contextArg)
let
notice = "ready"
notification = WorkerNotification(id: workerId, notice: notice)
trace "pool worker sent notification to pool", notice, pool, workerId
asyncSpawn chanSendToHost.send(notification.encode.safe)
while true:
trace "pool worker waiting for message", pool, workerId
let message = $(await chanRecvFromHost.recv())
if message == "stop":
trace "pool worker stopping", notice=message, pool, workerId
break
if running[].load():
var
msgerr = false
parsed: JsonNode
task: Task
taskName: string
try:
parsed = parseJson(message)
task = cast[Task](parsed{"task"}.getInt)
taskName = parsed{"taskName"}.getStr
trace "pool worker received message", message, pool, workerId
trace "pool worker running task", pool, task=taskName, workerId
except CatchableError as e:
msgerr = true
error "pool worker received unknown message", error=e.msg, message,
pool, workerId
if not msgerr:
if awaitTasks:
await task(message)
else:
asyncSpawn task(message)
let
notice = "done"
notification = WorkerNotification(id: workerId, notice: notice)
trace "pool worker sent notification to pool", notice, pool, workerId
asyncSpawn chanSendToHost.send(notification.encode.safe)
chanRecvFromHost.close()
chanSendToHost.close()
proc workerThread(arg: WorkerThreadArg) {.thread.} =
waitFor worker(arg)

View File

@ -1,111 +0,0 @@
import # std libs
std/[atomics, json]
import # vendor libs
chronicles, chronos, task_runner
import # chat libs
./worker
export json
logScope:
topics = "task_runner"
type
WorkerThreadArg = ref object of ThreadArg
workerName: string
ThreadWorker* = ref object of Worker
thread: Thread[WorkerThreadArg]
proc workerThread(arg: WorkerThreadArg) {.thread.}
proc new*(T: type ThreadWorker, name: string, running: pointer,
context: Context = emptyContext, contextArg: ContextArg = ContextArg(),
awaitTasks = false): T =
let
chanRecvFromWorker = newWorkerChannel()
chanSendToWorker = newWorkerChannel()
thread = Thread[WorkerThreadArg]()
T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker,
chanSendToWorker: chanSendToWorker, context: context,
contextArg: contextArg, name: name, running: running, thread: thread)
proc start*(self: ThreadWorker) {.async.} =
trace "worker starting", worker=self.name
self.chanRecvFromWorker.open()
self.chanSendToWorker.open()
let arg = WorkerThreadArg(awaitTasks: self.awaitTasks,
chanRecvFromHost: self.chanSendToWorker,
chanSendToHost: self.chanRecvFromWorker, context: self.context,
contextArg: self.contextArg, running: self.running, workerName: self.name)
createThread(self.thread, workerThread, arg)
let notice = $(await self.chanRecvFromWorker.recv())
trace "worker started", notice, worker=self.name
proc stop*(self: ThreadWorker) {.async.} =
asyncSpawn self.chanSendToWorker.send("stop".safe)
joinThread(self.thread)
self.chanRecvFromWorker.close()
self.chanSendToWorker.close()
trace "worker stopped", worker=self.name
proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} =
let
awaitTasks = arg.awaitTasks
chanRecvFromHost = arg.chanRecvFromHost
chanSendToHost = arg.chanSendToHost
worker = arg.workerName
var running = cast[ptr Atomic[bool]](arg.running)
chanRecvFromHost.open()
chanSendToHost.open()
trace "worker running context", worker
await arg.context(arg.contextArg)
let notice = "ready"
trace "worker sent notification to host", notice, worker
asyncSpawn chanSendToHost.send(notice.safe)
while true:
trace "worker waiting for message", worker
let message = $(await chanRecvFromHost.recv())
if message == "stop":
trace "worker stopping", notice=message, worker
break
if running[].load():
var
msgerr = false
parsed: JsonNode
task: Task
taskName: string
try:
parsed = parseJson(message)
task = cast[Task](parsed{"task"}.getInt)
taskName = parsed{"taskName"}.getStr
trace "worker received message", message, worker
trace "worker running task", task=taskName, worker
except CatchableError as e:
msgerr = true
error "worker received unknown message", error=e.msg, message, worker
if not msgerr:
if awaitTasks:
await task(message)
else:
asyncSpawn task(message)
chanRecvFromHost.close()
chanSendToHost.close()
proc workerThread(arg: WorkerThreadArg) {.thread.} =
waitFor worker(arg)

View File

@ -1,35 +0,0 @@
import # vendor libs
task_runner
import # chat libs
../tasks
export tasks, task_runner
logScope:
topics = "task_runner"
type
WorkerChannel* = AsyncChannel[ThreadSafeString]
WorkerKind* = enum pool, thread
ThreadArg* = ref object of RootObj
awaitTasks*: bool
chanRecvFromHost*: WorkerChannel
chanSendToHost*: WorkerChannel
context*: Context
contextArg*: ContextArg
running*: pointer
Worker* = ref object of RootObj
awaitTasks*: bool
chanRecvFromWorker*: WorkerChannel
chanSendToWorker*: WorkerChannel
context*: Context
contextArg*: ContextArg
name*: string
running*: pointer
proc newWorkerChannel*(): WorkerChannel =
newAsyncChannel[ThreadSafeString](-1)