From 185edf73e12bccd096b106c5352cdee38ec6f4c2 Mon Sep 17 00:00:00 2001 From: "Michael Bradley, Jr" Date: Sun, 18 Jul 2021 20:39:02 -0500 Subject: [PATCH] refactor: import task_runner from vendor and remove task_runnner modules under examples/chat --- examples/chat/client/common.nim | 5 +- examples/chat/task_runner.nim | 13 - examples/chat/task_runner/impl.nim | 67 ---- examples/chat/task_runner/macros.nim | 329 ------------------ examples/chat/task_runner/tasks.nim | 39 --- examples/chat/task_runner/workers.nim | 7 - .../chat/task_runner/workers/pool_worker.nim | 317 ----------------- .../task_runner/workers/thread_worker.nim | 111 ------ examples/chat/task_runner/workers/worker.nim | 35 -- 9 files changed, 4 insertions(+), 919 deletions(-) delete mode 100644 examples/chat/task_runner.nim delete mode 100644 examples/chat/task_runner/impl.nim delete mode 100644 examples/chat/task_runner/macros.nim delete mode 100644 examples/chat/task_runner/tasks.nim delete mode 100644 examples/chat/task_runner/workers.nim delete mode 100644 examples/chat/task_runner/workers/pool_worker.nim delete mode 100644 examples/chat/task_runner/workers/thread_worker.nim delete mode 100644 examples/chat/task_runner/workers/worker.nim diff --git a/examples/chat/client/common.nim b/examples/chat/client/common.nim index 72a23a5..c580c71 100644 --- a/examples/chat/client/common.nim +++ b/examples/chat/client/common.nim @@ -1,8 +1,11 @@ import # std libs std/[sets, times] +import # vendor libs + task_runner + import # chat libs - ../config, ../task_runner + ../config import # nim-status libs ../../../nim_status/accounts/public_accounts diff --git a/examples/chat/task_runner.nim b/examples/chat/task_runner.nim deleted file mode 100644 index f460960..0000000 --- a/examples/chat/task_runner.nim +++ /dev/null @@ -1,13 +0,0 @@ -# The code in ./task_runner is intended for eventual inclusion in: -# https://github.com/status-im/nim-task-runner - -# The implementation in this repo is the "second nursery" for refining the -# concepts and code involved; the "first nursery" was status-desktop. - -import # chat libs - ./task_runner/impl, ./task_runner/macros - -export impl, macros - -logScope: - topics = "task_runner" diff --git a/examples/chat/task_runner/impl.nim b/examples/chat/task_runner/impl.nim deleted file mode 100644 index e172955..0000000 --- a/examples/chat/task_runner/impl.nim +++ /dev/null @@ -1,67 +0,0 @@ -import # std libs - std/[atomics, tables] - -import # vendor libs - chronicles, chronos, task_runner - -import # chat libs - ./tasks, ./workers - -export atomics, chronicles, chronos, tables, task_runner, tasks, workers - -logScope: - topics = "task_runner" - -type - WorkerTable = TableRef[string, tuple[kind: WorkerKind, worker: Worker]] - - TaskRunner* = ref object - running*: Atomic[bool] - workers*: WorkerTable - -proc newWorkerTable*(): WorkerTable = - newTable[string, tuple[kind: WorkerKind, worker: Worker]]() - -proc new*(T: type TaskRunner, workers: WorkerTable = newWorkerTable()): T = - # Atomic[bool] is `false` by default, no need to initialize `running` - T(workers: workers) - -proc start*(self: TaskRunner) {.async.} = - trace "task runner starting" - var starting: seq[Future[void]] = @[] - for v in self.workers.values: - let (kind, worker) = v - case kind: - of pool: - starting.add cast[PoolWorker](worker).start() - of thread: - starting.add cast[ThreadWorker](worker).start() - await allFutures(starting) - trace "task runner started" - self.running.store(true) - -proc stop*(self: TaskRunner) {.async.} = - trace "task runner stopping" - self.running.store(false) - var stopping: seq[Future[void]] = @[] - for v in self.workers.values: - let (kind, worker) = v - case kind: - of pool: - stopping.add(cast[PoolWorker](worker).stop()) - of thread: - stopping.add(cast[ThreadWorker](worker).stop()) - await allFutures(stopping) - trace "task runner stopped" - -proc createWorker*(self: TaskRunner, kind: WorkerKind, name: string, - context: Context = emptyContext, contextArg: ContextArg = ContextArg(), - size = DefaultPoolSize) = - let running = cast[pointer](addr self.running) - case kind: - of pool: - self.workers[name] = (kind: kind, - worker: PoolWorker.new(name, running, context, contextArg, size)) - of thread: - self.workers[name] = (kind: kind, - worker: ThreadWorker.new(name, running, context, contextArg)) diff --git a/examples/chat/task_runner/macros.nim b/examples/chat/task_runner/macros.nim deleted file mode 100644 index 7c69941..0000000 --- a/examples/chat/task_runner/macros.nim +++ /dev/null @@ -1,329 +0,0 @@ -import # std libs - std/[macros, unicode] - -import # vendor libs - chronicles - -import # chat libs - ./impl - -logScope: - topics = "task_runner" - -macro task*(kind: static TaskKind, stoppable: static bool, body: untyped): untyped = - result = newStmtList() - - const - star = "*" - syncPost = "Sync" - taskArgPost = "TaskArg" - taskPost = "Task" - - var - exported = false - starId = ident(star) - taskArgTypeId = ident(taskArgPost) - taskArgTypeDerivedId: NimNode - taskName: string - taskNameId: NimNode - taskNameImplId: NimNode - taskNameSyncId: NimNode - taskReturnTypeId: NimNode - - if kind(body[0]) == nnkPostfix and body[0][0] == ident(star): - exported = true - taskName = strVal(body[0][1]) - taskNameId = ident(taskName) - - else: - taskNameId = body[0] - taskName = strVal(taskNameId) - - taskArgTypeDerivedId = ident(taskName.capitalize & taskArgPost) - taskNameImplId = ident(taskName & taskPost) - taskNameSyncId = ident(taskName & syncPost) - taskReturnTypeId = body[3][0] - - let - chanReturnToSenderId = ident("chanReturnToSender") - serializedPointerTypeId = ident("ByteAddress") - taskStoppedId = ident("taskStopped") - - # The repetitiveness of some code below could/should be cleaned up with - # additional metaprogramming (and probably more informed use of shortcuts and - # helpers provided by Nim's macros module); there can be a task options - # object for which e.g. the `stoppable` field is a boolean flag, but then - # also a helper object/table with the same fields/keys but the values are - # tuples of the type names and field names to be added to the type derived - # from TaskArg; the fields of the supplied/default options object can be - # iterated over and the proper nnkIdentDefs, etc. can be built according to - # the options values and info in the helper object/table; the same (or very - # similar) technique could be used to allow e.g. specification of a - # TaskRunner instance and/or worker name and/or `ptr Atomic[bool]` (for - # stopping the task) in the options object, which would then affect whether - # parameters for those things are included in the type signatures of the - # constructed procs or instead baked into their bodies. - - var - taskArgTypeDef = newNimNode(nnkTypeDef) - objDef = newNimNode(nnkObjectTy) - recList = newNimNode(nnkRecList) - - taskArgTypeDef.add(taskArgTypeDerivedId) - taskArgTypeDef.add(newEmptyNode()) - objDef.add(newEmptyNode()) - objDef.add(newNimNode(nnkOfInherit).add(taskArgTypeId)) - - if kind == rts: - var - objParam = newNimNode(nnkIdentDefs) - post = newNimNode(nnkPostfix) - - post.add(starId) - post.add(chanReturnToSenderId) - objParam.add(post) - objParam.add(serializedPointerTypeId) - objParam.add(newEmptyNode()) - recList.add(objParam) - - if stoppable == true: - var - objParam = newNimNode(nnkIdentDefs) - post = newNimNode(nnkPostfix) - - post.add(starId) - post.add(taskStoppedId) - objParam.add(post) - objParam.add(serializedPointerTypeId) - objParam.add(newEmptyNode()) - recList.add(objParam) - - for nn in body[3]: - if kind(nn) == nnkIdentDefs: - var - objParam = newNimNode(nnkIdentDefs) - post = newNimNode(nnkPostfix) - - post.add(starId) - post.add(nn[0]) - objParam.add(post) - objParam.add(nn[1]) - objParam.add(nn[2]) - recList.add(objParam) - - objDef.add(recList) - taskArgTypeDef.add(newNimNode(nnkRefTy).add(objDef)) - result.add(newNimNode(nnkTypeSection).add(taskArgTypeDef)) - - let - asyncPragmaId = ident("async") - atomicTypeId = ident("Atomic") - boolTypeId = ident("bool") - futureTypeId = ident("Future") - taskArgId = ident("taskArg") - taskArgEncId = ident("taskArgEncoded") - chanSendToHostId = ident("chanSendToHost") - chanSendToWorkerId = ident("chanSendToWorker") - stoppedId = ident("stopped") - taskRunnerId = ident("taskRunner") - taskRunnerTypeId = ident("TaskRunner") - workerChannelTypeId = ident("WorkerChannel") - workerId = ident("worker") - workerNameId = ident("workerName") - workerNameTypeId = ident("string") - workerRunningId = ident("workerRunning") - - var - atomPtr = newNimNode(nnkPtrTy) - atomBracket = newNimNode(nnkBracketExpr) - - atomBracket.add(atomicTypeId) - atomBracket.add(boolTypeId) - atomPtr.add(atomBracket) - - let - taskStoppedTypeId = atomPtr - workerRunningTypeId = atomPtr - - var impl = newStmtList() - - impl.add quote do: - let - `taskArgId` = decode[`taskArgTypeDerivedId`](`taskArgEncId`) - `chanSendToHostId` = cast[`workerChannelTypeId`](`taskArgId`.`chanSendToHostId`) - - if kind == rts: - impl.add quote do: - let `chanReturnToSenderId` = cast[`workerChannelTypeId`](`taskArgId`.`chanReturnToSenderId`) - - if stoppable == true: - impl.add quote do: - var `taskStoppedId` = cast[`taskStoppedTypeId`](`taskArgId`.`taskStoppedId`) - - impl.add quote do: - var `workerRunningId` = cast[`workerRunningTypeId`](`taskArgId`.`workerRunningId`) - - for nn in body[3]: - if kind(nn) == nnkIdentDefs: - let id = nn[0] - impl.add quote do: - let `id` = `taskArgId`.`id` - - impl.add(body[6]) - - result.add quote do: - const `taskNameImplId`: Task = proc(`taskArgEncId`: string) {.async, gcsafe, nimcall, raises: [Defect].} = - `impl` - - var - taskBody = newStmtList() - taskSyncBody = newStmtList() - taskProcDef = newNimNode(nnkProcDef) - taskProcSyncDef = newNimNode(nnkProcDef) - taskProcParams = newNimNode(nnkFormalParams) - stoppedIdentDefs = newNimNode(nnkIdentDefs) - taskRunnerIdentDefs = newNimNode(nnkIdentDefs) - workerNameIdentDefs = newNimNode(nnkIdentDefs) - - taskProcDef.add(taskNameId) - taskProcDef.add(newEmptyNode()) - taskProcDef.add(body[2]) - taskProcParams.add(newEmptyNode()) - taskRunnerIdentDefs.add(taskRunnerId) - taskRunnerIdentDefs.add(taskRunnerTypeId) - taskRunnerIdentDefs.add(newEmptyNode()) - taskProcParams.add(taskRunnerIdentDefs) - workerNameIdentDefs.add(workerNameId) - workerNameIdentDefs.add(workerNameTypeId) - workerNameIdentDefs.add(newEmptyNode()) - taskProcParams.add(workerNameIdentDefs) - if stoppable == true: - stoppedIdentDefs.add(stoppedId) - stoppedIdentDefs.add(taskStoppedTypeId) - stoppedIdentDefs.add(newEmptyNode()) - taskProcParams.add(stoppedIdentDefs) - - for nn in body[3]: - if kind(nn) == nnkIdentDefs: - taskProcParams.add(nn) - - taskProcDef.add(taskProcParams) - taskProcDef.add(newNimNode(nnkPragma).add(asyncPragmaId)) - taskProcDef.add(newEmptyNode()) - - copyChildrenTo(taskProcDef, taskProcSyncDef) - taskProcSyncDef[0] = taskNameSyncId - taskProcSyncDef[4] = newEmptyNode() - - if kind == rts: - if kind(taskReturnTypeId) != nnkEmpty: - var futureBracket = newNimNode(nnkBracketExpr) - futureBracket.add(futureTypeId) - futureBracket.add(taskReturnTypeId) - taskProcDef[3][0] = futureBracket - - taskProcSyncDef[3][0] = taskReturnTypeId - - taskBody.add quote do: - let - `workerId` = taskRunner.workers[workerName].worker - `chanSendToHostId` = `workerId`.chanRecvFromWorker - `chanSendToWorkerId` = `workerId`.chanSendToWorker - - if kind == rts: - taskBody.add quote do: - let `chanReturnToSenderId` = newWorkerChannel() - - taskBody.add quote do: - let `taskArgId` = `taskArgTypeDerivedId`( - `chanSendToHostId`: cast[`serializedPointerTypeId`](`chanSendToHostId`), - task: cast[`serializedPointerTypeId`](`taskNameImplId`), - taskName: `taskName`, - `workerRunningId`: cast[`serializedPointerTypeId`](addr taskRunner.running) - ) - - var taskArgConstructor = taskBody[if kind == rts: 2 else: 1][0][2] - - if kind == rts: - var objField = newNimNode(nnkExprColonExpr) - objField.add(chanReturnToSenderId) - objField.add quote do: cast[`serializedPointerTypeId`](`chanReturnToSenderId`) - taskArgConstructor.add(objField) - - if stoppable == true: - var objField = newNimNode(nnkExprColonExpr) - objField.add(taskStoppedId) - objField.add quote do: cast[`serializedPointerTypeId`](`stoppedId`) - taskArgConstructor.add(objField) - - for nn in body[3]: - var objField = newNimNode(nnkExprColonExpr) - if kind(nn) == nnkIdentDefs: - objField.add(nn[0]) - objField.add(nn[0]) - taskArgConstructor.add(objField) - - if kind == rts: - taskBody.add quote do: - `chanReturnToSenderId`.open() - - copyChildrenTo(taskBody, taskSyncBody) - - taskBody.add quote do: - asyncSpawn `chanSendToWorkerId`.send(`taskArgId`.encode.safe) - - taskSyncBody.add quote do: - `chanSendToWorkerId`.sendSync(`taskArgId`.encode.safe) - - if kind == rts: - if kind(taskReturnTypeId) != nnkEmpty: - taskBody.add quote do: - let res = decode[`taskReturnTypeId`]($(await `chanReturnToSenderId`.recv())) - `chanReturnToSenderId`.close() - return res - - taskSyncBody.add quote do: - let res = decode[`taskReturnTypeId`]($`chanReturnToSenderId`.recvSync()) - `chanReturnToSenderId`.close() - return res - - else: - taskBody.add quote do: - discard $(await `chanReturnToSenderId`.recv()) - `chanReturnToSenderId`.close() - - taskSyncBody.add quote do: - discard $`chanReturnToSenderId`.recvSync() - `chanReturnToSenderId`.close() - - taskProcDef.add(taskBody) - taskProcSyncDef.add(taskSyncBody) - - result.add(taskProcDef) - result.add(taskProcSyncDef) - - if exported: - result.add quote do: - export `taskArgTypeDerivedId`, `taskNameId`, `taskNameSyncId`, `taskNameImplId` - - # debug ---------------------------------------------------------------------- - # echo toStrLit(result) - -# The approach below doesn't work because unexpected things can happen with the -# AST of `body`, at least that's that what I observed; can look into a -# different approach: -# https://github.com/beef331/kashae/blob/master/src/kashae.nim#L204-L220 -# (that approach was recommended in #main channel of the official Nim discord -# server when I asked about the unexpected AST things) - -# macro task*(kind: TaskKind, body: untyped): untyped = -# result = newStmtList() -# result.add(quote do: task(`kind`, false, `body`)) -# -# macro task*(stoppable: bool, body: untyped): untyped = -# result = newStmtList() -# result.add(quote do: task(no_rts, `stoppable`, `body`)) -# -# macro task*(body: untyped): untyped = -# result = newStmtList() -# result.add(quote do: task(no_rts, false, `body`)) diff --git a/examples/chat/task_runner/tasks.nim b/examples/chat/task_runner/tasks.nim deleted file mode 100644 index 5476cf6..0000000 --- a/examples/chat/task_runner/tasks.nim +++ /dev/null @@ -1,39 +0,0 @@ -import # vendor libs - chronicles, chronos, json_serialization, json_serialization/std/options - -export chronicles, json_serialization, options - -logScope: - topics = "task_runner" - -type - ContextArg* = ref object of RootObj - - Context* = proc(arg: ContextArg): Future[void] {.gcsafe, nimcall, raises: [Defect].} - - Task* = proc(taskArgEncoded: string): Future[void] {.gcsafe, nimcall, raises: [Defect].} - - TaskKind* = enum no_rts, rts # rts := "return to sender" - - TaskArg* = ref object of RootObj - chanSendToHost*: ByteAddress # pointer to channel for sending to host - task*: ByteAddress # pointer to task proc - taskName*: string - workerRunning*: ByteAddress # pointer to taskRunner's .running Atomic[bool] - -# there should eventually be the ability to reliably stop individual workers, -# i.e. each worker would have it's own `.running` Atomic[bool] (maybe -# reconsider the naming, e.g. "workerStopped" vs. "workerRunning" to be -# consistent with "taskStopped", or switch the latter to -# "taskRunning"). Currently, a TaskRunner instance's `.running` Atomic[bool] -# serves as a "master switch" for all the workers, so it's not completely -# inaccurate for the field on TaskArg to be named `workerRunning` - -const emptyContext*: Context = - proc(arg: ContextArg) {.async, gcsafe, nimcall.} = discard - -proc decode*[T](arg: string): T = - Json.decode(arg, T, allowUnknownFields = true) - -proc encode*[T](arg: T): string = - arg.toJson(typeAnnotations = true) diff --git a/examples/chat/task_runner/workers.nim b/examples/chat/task_runner/workers.nim deleted file mode 100644 index 1bb8ef0..0000000 --- a/examples/chat/task_runner/workers.nim +++ /dev/null @@ -1,7 +0,0 @@ -import # chat libs - ./workers/[pool_worker, thread_worker, worker] - -export pool_worker, thread_worker, worker - -logScope: - topics = "task_runner" diff --git a/examples/chat/task_runner/workers/pool_worker.nim b/examples/chat/task_runner/workers/pool_worker.nim deleted file mode 100644 index a71f3dd..0000000 --- a/examples/chat/task_runner/workers/pool_worker.nim +++ /dev/null @@ -1,317 +0,0 @@ -import # std libs - std/[atomics, json, sequtils, tables] - -import # vendor libs - chronicles, chronos, task_runner - -import # chat libs - ./worker - -export json - -logScope: - topics = "task_runner" - -type - PoolThreadArg = ref object of ThreadArg - poolName: string - poolSize: int - - PoolWorker* = ref object of Worker - size*: int - thread: Thread[PoolThreadArg] - - WorkerThreadArg = ref object of ThreadArg - poolName: string - workerId: int - - ThreadWorker = ref object of Worker - id: int - thread: Thread[WorkerThreadArg] - - WorkerNotification = ref object - id: int - notice: string - -proc poolThread(arg: PoolThreadArg) {.thread.} - -proc workerThread(arg: WorkerThreadArg) {.thread.} - -const DefaultPoolSize* = 16 - -proc new*(T: type PoolWorker, name: string, running: pointer, - context: Context = emptyContext, contextArg: ContextArg = ContextArg(), - size: int = DefaultPoolSize, awaitTasks = true): T = - let - chanRecvFromWorker = newWorkerChannel() - chanSendToWorker = newWorkerChannel() - thread = Thread[PoolThreadArg]() - - T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker, - chanSendToWorker: chanSendToWorker, context: context, - contextArg: contextArg, name: name, running: running, size: size, - thread: thread) - -proc start*(self: PoolWorker) {.async.} = - trace "pool starting", pool=self.name, poolSize=self.size - self.chanRecvFromWorker.open() - self.chanSendToWorker.open() - let arg = PoolThreadArg(awaitTasks: self.awaitTasks, - chanRecvFromHost: self.chanSendToWorker, - chanSendToHost: self.chanRecvFromWorker, context: self.context, - contextArg: self.contextArg, running: self.running, poolName: self.name, - poolSize: self.size) - - createThread(self.thread, poolThread, arg) - let notice = $(await self.chanRecvFromWorker.recv()) - trace "pool started", notice, pool=self.name, poolSize=self.size - -proc stop*(self: PoolWorker) {.async.} = - asyncSpawn self.chanSendToWorker.send("stop".safe) - joinThread(self.thread) - self.chanRecvFromWorker.close() - self.chanSendToWorker.close() - trace "pool stopped", pool=self.name, poolSize=self.size - -proc new*(T: type ThreadWorker, name: string, id: int, running: pointer, - chanRecvFromWorker: WorkerChannel, context: Context = emptyContext, - contextArg: ContextArg = ContextArg(), awaitTasks = true): T = - let - chanSendToWorker = newWorkerChannel() - thread = Thread[WorkerThreadArg]() - - T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker, - chanSendToWorker: chanSendToWorker, context: context, - contextArg: contextArg, name: name, running: running, id: id, - thread: thread) - -proc start*(self: ThreadWorker) {.async.} = - self.chanSendToWorker.open() - let arg = WorkerThreadArg(awaitTasks: self.awaitTasks, - chanRecvFromHost: self.chanSendToWorker, - chanSendToHost: self.chanRecvFromWorker, context: self.context, - contextArg: self.contextArg, running: self.running, poolName: self.name, - workerId: self.id) - - createThread(self.thread, workerThread, arg) - -proc stop*(self: ThreadWorker) {.async.} = - asyncSpawn self.chanSendToWorker.send("stop".safe) - joinThread(self.thread) - self.chanSendToWorker.close() - trace "pool worker stopped", pool=self.name, workerId=self.id - -proc pool(arg: PoolThreadArg) {.async, raises: [Defect].} = - let - chanRecvFromHostOrWorker = arg.chanRecvFromHost - chanSendToHost = arg.chanSendToHost - pool = arg.poolName - poolSize = arg.poolSize - - var running = cast[ptr Atomic[bool]](arg.running) - - chanRecvFromHostOrWorker.open() - chanSendToHost.open() - - let notice = "ready" - trace "pool sent notification to host", notice, pool - asyncSpawn chanSendToHost.send(notice.safe) - - var - taskQueue: seq[string] = @[] # FIFO queue - workersBusy = newTable[int, ThreadWorker]() - workersIdle: seq[ThreadWorker] = @[] - workersStarted = 0 - - for i in 0.. 0 and - workersBusy.len < poolSize: - message = taskQueue[0] - taskQueue.delete 0, 0 - trace "pool removed task from queue", pool, queued=taskQueue.len - shouldSendToWorker = true - - elif shouldSendToWorker and taskQueue.len > 0 and - workersBusy.len < poolSize: - taskQueue.add message - message = taskQueue[0] - taskQueue.delete 0, 0 - trace "pool added task to queue and removed oldest task from queue", - pool, queued=taskQueue.len - - elif shouldSendToWorker and workersBusy.len == poolSize: - taskQueue.add message - trace "pool added task to queue", pool, queued=taskQueue.len - shouldSendToWorker = false - - if shouldSendToWorker: - let - worker = workersIdle[0] - workerId = worker.id - - workersIdle.delete 0, 0 - workersBusy[workerId] = worker - trace "pool sent task to worker", pool, workerId - trace "pool marked worker as busy", pool, poolSize, workerId, - workersBusy=workersBusy.len, workersIdle=workersIdle.len - - asyncSpawn worker.chanSendToWorker.send(message.safe) - - chanRecvFromHostOrWorker.close() - chanSendToHost.close() - -proc poolThread(arg: PoolThreadArg) {.thread.} = - waitFor pool(arg) - -proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} = - let - awaitTasks = arg.awaitTasks - chanRecvFromHost = arg.chanRecvFromHost - chanSendToHost = arg.chanSendToHost - pool = arg.poolName - workerId = arg.workerId - - var running = cast[ptr Atomic[bool]](arg.running) - - chanRecvFromHost.open() - chanSendToHost.open() - - trace "pool worker running context", pool, workerId - await arg.context(arg.contextArg) - let - notice = "ready" - notification = WorkerNotification(id: workerId, notice: notice) - - trace "pool worker sent notification to pool", notice, pool, workerId - asyncSpawn chanSendToHost.send(notification.encode.safe) - - while true: - trace "pool worker waiting for message", pool, workerId - let message = $(await chanRecvFromHost.recv()) - - if message == "stop": - trace "pool worker stopping", notice=message, pool, workerId - break - - if running[].load(): - var - msgerr = false - parsed: JsonNode - task: Task - taskName: string - - try: - parsed = parseJson(message) - task = cast[Task](parsed{"task"}.getInt) - taskName = parsed{"taskName"}.getStr - - trace "pool worker received message", message, pool, workerId - trace "pool worker running task", pool, task=taskName, workerId - - except CatchableError as e: - msgerr = true - error "pool worker received unknown message", error=e.msg, message, - pool, workerId - - if not msgerr: - if awaitTasks: - await task(message) - else: - asyncSpawn task(message) - - let - notice = "done" - notification = WorkerNotification(id: workerId, notice: notice) - - trace "pool worker sent notification to pool", notice, pool, workerId - asyncSpawn chanSendToHost.send(notification.encode.safe) - - chanRecvFromHost.close() - chanSendToHost.close() - -proc workerThread(arg: WorkerThreadArg) {.thread.} = - waitFor worker(arg) diff --git a/examples/chat/task_runner/workers/thread_worker.nim b/examples/chat/task_runner/workers/thread_worker.nim deleted file mode 100644 index 9396af5..0000000 --- a/examples/chat/task_runner/workers/thread_worker.nim +++ /dev/null @@ -1,111 +0,0 @@ -import # std libs - std/[atomics, json] - -import # vendor libs - chronicles, chronos, task_runner - -import # chat libs - ./worker - -export json - -logScope: - topics = "task_runner" - -type - WorkerThreadArg = ref object of ThreadArg - workerName: string - - ThreadWorker* = ref object of Worker - thread: Thread[WorkerThreadArg] - -proc workerThread(arg: WorkerThreadArg) {.thread.} - -proc new*(T: type ThreadWorker, name: string, running: pointer, - context: Context = emptyContext, contextArg: ContextArg = ContextArg(), - awaitTasks = false): T = - let - chanRecvFromWorker = newWorkerChannel() - chanSendToWorker = newWorkerChannel() - thread = Thread[WorkerThreadArg]() - - T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker, - chanSendToWorker: chanSendToWorker, context: context, - contextArg: contextArg, name: name, running: running, thread: thread) - -proc start*(self: ThreadWorker) {.async.} = - trace "worker starting", worker=self.name - self.chanRecvFromWorker.open() - self.chanSendToWorker.open() - let arg = WorkerThreadArg(awaitTasks: self.awaitTasks, - chanRecvFromHost: self.chanSendToWorker, - chanSendToHost: self.chanRecvFromWorker, context: self.context, - contextArg: self.contextArg, running: self.running, workerName: self.name) - - createThread(self.thread, workerThread, arg) - let notice = $(await self.chanRecvFromWorker.recv()) - trace "worker started", notice, worker=self.name - -proc stop*(self: ThreadWorker) {.async.} = - asyncSpawn self.chanSendToWorker.send("stop".safe) - joinThread(self.thread) - self.chanRecvFromWorker.close() - self.chanSendToWorker.close() - trace "worker stopped", worker=self.name - -proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} = - let - awaitTasks = arg.awaitTasks - chanRecvFromHost = arg.chanRecvFromHost - chanSendToHost = arg.chanSendToHost - worker = arg.workerName - - var running = cast[ptr Atomic[bool]](arg.running) - - chanRecvFromHost.open() - chanSendToHost.open() - - trace "worker running context", worker - await arg.context(arg.contextArg) - let notice = "ready" - trace "worker sent notification to host", notice, worker - asyncSpawn chanSendToHost.send(notice.safe) - - while true: - trace "worker waiting for message", worker - let message = $(await chanRecvFromHost.recv()) - - if message == "stop": - trace "worker stopping", notice=message, worker - break - - if running[].load(): - var - msgerr = false - parsed: JsonNode - task: Task - taskName: string - - try: - parsed = parseJson(message) - task = cast[Task](parsed{"task"}.getInt) - taskName = parsed{"taskName"}.getStr - - trace "worker received message", message, worker - trace "worker running task", task=taskName, worker - - except CatchableError as e: - msgerr = true - error "worker received unknown message", error=e.msg, message, worker - - if not msgerr: - if awaitTasks: - await task(message) - else: - asyncSpawn task(message) - - chanRecvFromHost.close() - chanSendToHost.close() - -proc workerThread(arg: WorkerThreadArg) {.thread.} = - waitFor worker(arg) diff --git a/examples/chat/task_runner/workers/worker.nim b/examples/chat/task_runner/workers/worker.nim deleted file mode 100644 index d2bbc31..0000000 --- a/examples/chat/task_runner/workers/worker.nim +++ /dev/null @@ -1,35 +0,0 @@ -import # vendor libs - task_runner - -import # chat libs - ../tasks - -export tasks, task_runner - -logScope: - topics = "task_runner" - -type - WorkerChannel* = AsyncChannel[ThreadSafeString] - - WorkerKind* = enum pool, thread - - ThreadArg* = ref object of RootObj - awaitTasks*: bool - chanRecvFromHost*: WorkerChannel - chanSendToHost*: WorkerChannel - context*: Context - contextArg*: ContextArg - running*: pointer - - Worker* = ref object of RootObj - awaitTasks*: bool - chanRecvFromWorker*: WorkerChannel - chanSendToWorker*: WorkerChannel - context*: Context - contextArg*: ContextArg - name*: string - running*: pointer - -proc newWorkerChannel*(): WorkerChannel = - newAsyncChannel[ThreadSafeString](-1)