diff --git a/.gitmodules b/.gitmodules index 8e55a47..1d820cf 100644 --- a/.gitmodules +++ b/.gitmodules @@ -18,3 +18,15 @@ [submodule "vendor/nim-unittest2"] path = vendor/nim-unittest2 url = https://github.com/status-im/nim-unittest2.git +[submodule "vendor/nim-json-serialization"] + path = vendor/nim-json-serialization + url = https://github.com/status-im/nim-json-serialization.git +[submodule "vendor/nim-chronicles"] + path = vendor/nim-chronicles + url = https://github.com/status-im/nim-chronicles.git +[submodule "vendor/nim-faststreams"] + path = vendor/nim-faststreams + url = https://github.com/status-im/nim-faststreams.git +[submodule "vendor/nim-serialization"] + path = vendor/nim-serialization + url = https://github.com/status-im/nim-serialization.git diff --git a/task_runner.nim b/task_runner.nim index 4008a29..e2d7139 100644 --- a/task_runner.nim +++ b/task_runner.nim @@ -9,6 +9,6 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import ./task_runner/achannels, ./task_runner/sys +import ./task_runner/[macros, runner] -export achannels, sys +export macros, runner diff --git a/task_runner/achannels.nim b/task_runner/achannels.nim index be987d8..7a355ec 100644 --- a/task_runner/achannels.nim +++ b/task_runner/achannels.nim @@ -13,7 +13,7 @@ import strutils import chronos/[handles, transport] -import ./asyncloop, ./asyncsync +import ./chronos_plus/[asyncloop, asyncsync] when hasThreadSupport: import locks diff --git a/task_runner/asyncloop.nim b/task_runner/chronos_plus/asyncloop.nim similarity index 100% rename from task_runner/asyncloop.nim rename to task_runner/chronos_plus/asyncloop.nim diff --git a/task_runner/asyncsync.nim b/task_runner/chronos_plus/asyncsync.nim similarity index 100% rename from task_runner/asyncsync.nim rename to task_runner/chronos_plus/asyncsync.nim diff --git a/task_runner/osapi.nim b/task_runner/chronos_plus/osapi.nim similarity index 100% rename from task_runner/osapi.nim rename to task_runner/chronos_plus/osapi.nim diff --git a/task_runner/macros.nim b/task_runner/macros.nim new file mode 100644 index 0000000..b566e51 --- /dev/null +++ b/task_runner/macros.nim @@ -0,0 +1,323 @@ +import # std libs + std/[macros, unicode] + +import # task_runner libs + ./runner + +macro task*(kind: static TaskKind, stoppable: static bool, body: untyped): untyped = + result = newStmtList() + + const + star = "*" + syncPost = "Sync" + taskArgPost = "TaskArg" + taskPost = "Task" + + var + exported = false + starId = ident(star) + taskArgTypeId = ident(taskArgPost) + taskArgTypeDerivedId: NimNode + taskName: string + taskNameId: NimNode + taskNameImplId: NimNode + taskNameSyncId: NimNode + taskReturnTypeId: NimNode + + if kind(body[0]) == nnkPostfix and body[0][0] == ident(star): + exported = true + taskName = strVal(body[0][1]) + taskNameId = ident(taskName) + + else: + taskNameId = body[0] + taskName = strVal(taskNameId) + + taskArgTypeDerivedId = ident(taskName.capitalize & taskArgPost) + taskNameImplId = ident(taskName & taskPost) + taskNameSyncId = ident(taskName & syncPost) + taskReturnTypeId = body[3][0] + + let + chanReturnToSenderId = ident("chanReturnToSender") + serializedPointerTypeId = ident("ByteAddress") + taskStoppedId = ident("taskStopped") + + # The repetitiveness of some code below could/should be cleaned up with + # additional metaprogramming (and probably more informed use of shortcuts and + # helpers provided by Nim's macros module); there can be a task options + # object for which e.g. the `stoppable` field is a boolean flag, but then + # also a helper object/table with the same fields/keys but the values are + # tuples of the type names and field names to be added to the type derived + # from TaskArg; the fields of the supplied/default options object can be + # iterated over and the proper nnkIdentDefs, etc. can be built according to + # the options values and info in the helper object/table; the same (or very + # similar) technique could be used to allow e.g. specification of a + # TaskRunner instance and/or worker name and/or `ptr Atomic[bool]` (for + # stopping the task) in the options object, which would then affect whether + # parameters for those things are included in the type signatures of the + # constructed procs or instead baked into their bodies. + + var + taskArgTypeDef = newNimNode(nnkTypeDef) + objDef = newNimNode(nnkObjectTy) + recList = newNimNode(nnkRecList) + + taskArgTypeDef.add(taskArgTypeDerivedId) + taskArgTypeDef.add(newEmptyNode()) + objDef.add(newEmptyNode()) + objDef.add(newNimNode(nnkOfInherit).add(taskArgTypeId)) + + if kind == rts: + var + objParam = newNimNode(nnkIdentDefs) + post = newNimNode(nnkPostfix) + + post.add(starId) + post.add(chanReturnToSenderId) + objParam.add(post) + objParam.add(serializedPointerTypeId) + objParam.add(newEmptyNode()) + recList.add(objParam) + + if stoppable == true: + var + objParam = newNimNode(nnkIdentDefs) + post = newNimNode(nnkPostfix) + + post.add(starId) + post.add(taskStoppedId) + objParam.add(post) + objParam.add(serializedPointerTypeId) + objParam.add(newEmptyNode()) + recList.add(objParam) + + for nn in body[3]: + if kind(nn) == nnkIdentDefs: + var + objParam = newNimNode(nnkIdentDefs) + post = newNimNode(nnkPostfix) + + post.add(starId) + post.add(nn[0]) + objParam.add(post) + objParam.add(nn[1]) + objParam.add(nn[2]) + recList.add(objParam) + + objDef.add(recList) + taskArgTypeDef.add(newNimNode(nnkRefTy).add(objDef)) + result.add(newNimNode(nnkTypeSection).add(taskArgTypeDef)) + + let + asyncPragmaId = ident("async") + atomicTypeId = ident("Atomic") + boolTypeId = ident("bool") + futureTypeId = ident("Future") + taskArgId = ident("taskArg") + taskArgEncId = ident("taskArgEncoded") + chanSendToHostId = ident("chanSendToHost") + chanSendToWorkerId = ident("chanSendToWorker") + stoppedId = ident("stopped") + taskRunnerId = ident("taskRunner") + taskRunnerTypeId = ident("TaskRunner") + workerChannelTypeId = ident("WorkerChannel") + workerId = ident("worker") + workerNameId = ident("workerName") + workerNameTypeId = ident("string") + workerRunningId = ident("workerRunning") + + var + atomPtr = newNimNode(nnkPtrTy) + atomBracket = newNimNode(nnkBracketExpr) + + atomBracket.add(atomicTypeId) + atomBracket.add(boolTypeId) + atomPtr.add(atomBracket) + + let + taskStoppedTypeId = atomPtr + workerRunningTypeId = atomPtr + + var impl = newStmtList() + + impl.add quote do: + let + `taskArgId` = decode[`taskArgTypeDerivedId`](`taskArgEncId`) + `chanSendToHostId` = cast[`workerChannelTypeId`](`taskArgId`.`chanSendToHostId`) + + if kind == rts: + impl.add quote do: + let `chanReturnToSenderId` = cast[`workerChannelTypeId`](`taskArgId`.`chanReturnToSenderId`) + + if stoppable == true: + impl.add quote do: + var `taskStoppedId` = cast[`taskStoppedTypeId`](`taskArgId`.`taskStoppedId`) + + impl.add quote do: + var `workerRunningId` = cast[`workerRunningTypeId`](`taskArgId`.`workerRunningId`) + + for nn in body[3]: + if kind(nn) == nnkIdentDefs: + let id = nn[0] + impl.add quote do: + let `id` = `taskArgId`.`id` + + impl.add(body[6]) + + result.add quote do: + const `taskNameImplId`: Task = proc(`taskArgEncId`: string) {.async, gcsafe, nimcall, raises: [Defect].} = + `impl` + + var + taskBody = newStmtList() + taskSyncBody = newStmtList() + taskProcDef = newNimNode(nnkProcDef) + taskProcSyncDef = newNimNode(nnkProcDef) + taskProcParams = newNimNode(nnkFormalParams) + stoppedIdentDefs = newNimNode(nnkIdentDefs) + taskRunnerIdentDefs = newNimNode(nnkIdentDefs) + workerNameIdentDefs = newNimNode(nnkIdentDefs) + + taskProcDef.add(taskNameId) + taskProcDef.add(newEmptyNode()) + taskProcDef.add(body[2]) + taskProcParams.add(newEmptyNode()) + taskRunnerIdentDefs.add(taskRunnerId) + taskRunnerIdentDefs.add(taskRunnerTypeId) + taskRunnerIdentDefs.add(newEmptyNode()) + taskProcParams.add(taskRunnerIdentDefs) + workerNameIdentDefs.add(workerNameId) + workerNameIdentDefs.add(workerNameTypeId) + workerNameIdentDefs.add(newEmptyNode()) + taskProcParams.add(workerNameIdentDefs) + if stoppable == true: + stoppedIdentDefs.add(stoppedId) + stoppedIdentDefs.add(taskStoppedTypeId) + stoppedIdentDefs.add(newEmptyNode()) + taskProcParams.add(stoppedIdentDefs) + + for nn in body[3]: + if kind(nn) == nnkIdentDefs: + taskProcParams.add(nn) + + taskProcDef.add(taskProcParams) + taskProcDef.add(newNimNode(nnkPragma).add(asyncPragmaId)) + taskProcDef.add(newEmptyNode()) + + copyChildrenTo(taskProcDef, taskProcSyncDef) + taskProcSyncDef[0] = taskNameSyncId + taskProcSyncDef[4] = newEmptyNode() + + if kind == rts: + if kind(taskReturnTypeId) != nnkEmpty: + var futureBracket = newNimNode(nnkBracketExpr) + futureBracket.add(futureTypeId) + futureBracket.add(taskReturnTypeId) + taskProcDef[3][0] = futureBracket + + taskProcSyncDef[3][0] = taskReturnTypeId + + taskBody.add quote do: + let + `workerId` = taskRunner.workers[workerName].worker + `chanSendToHostId` = `workerId`.chanRecvFromWorker + `chanSendToWorkerId` = `workerId`.chanSendToWorker + + if kind == rts: + taskBody.add quote do: + let `chanReturnToSenderId` = newWorkerChannel() + + taskBody.add quote do: + let `taskArgId` = `taskArgTypeDerivedId`( + `chanSendToHostId`: cast[`serializedPointerTypeId`](`chanSendToHostId`), + task: cast[`serializedPointerTypeId`](`taskNameImplId`), + taskName: `taskName`, + `workerRunningId`: cast[`serializedPointerTypeId`](addr taskRunner.running) + ) + + var taskArgConstructor = taskBody[if kind == rts: 2 else: 1][0][2] + + if kind == rts: + var objField = newNimNode(nnkExprColonExpr) + objField.add(chanReturnToSenderId) + objField.add quote do: cast[`serializedPointerTypeId`](`chanReturnToSenderId`) + taskArgConstructor.add(objField) + + if stoppable == true: + var objField = newNimNode(nnkExprColonExpr) + objField.add(taskStoppedId) + objField.add quote do: cast[`serializedPointerTypeId`](`stoppedId`) + taskArgConstructor.add(objField) + + for nn in body[3]: + var objField = newNimNode(nnkExprColonExpr) + if kind(nn) == nnkIdentDefs: + objField.add(nn[0]) + objField.add(nn[0]) + taskArgConstructor.add(objField) + + if kind == rts: + taskBody.add quote do: + `chanReturnToSenderId`.open() + + copyChildrenTo(taskBody, taskSyncBody) + + taskBody.add quote do: + asyncSpawn `chanSendToWorkerId`.send(`taskArgId`.encode.safe) + + taskSyncBody.add quote do: + `chanSendToWorkerId`.sendSync(`taskArgId`.encode.safe) + + if kind == rts: + if kind(taskReturnTypeId) != nnkEmpty: + taskBody.add quote do: + let res = decode[`taskReturnTypeId`]($(await `chanReturnToSenderId`.recv())) + `chanReturnToSenderId`.close() + return res + + taskSyncBody.add quote do: + let res = decode[`taskReturnTypeId`]($`chanReturnToSenderId`.recvSync()) + `chanReturnToSenderId`.close() + return res + + else: + taskBody.add quote do: + discard $(await `chanReturnToSenderId`.recv()) + `chanReturnToSenderId`.close() + + taskSyncBody.add quote do: + discard $`chanReturnToSenderId`.recvSync() + `chanReturnToSenderId`.close() + + taskProcDef.add(taskBody) + taskProcSyncDef.add(taskSyncBody) + + result.add(taskProcDef) + result.add(taskProcSyncDef) + + if exported: + result.add quote do: + export `taskArgTypeDerivedId`, `taskNameId`, `taskNameSyncId`, `taskNameImplId` + + # debug ---------------------------------------------------------------------- + # echo toStrLit(result) + +# The approach below doesn't work because unexpected things can happen with the +# AST of `body`, at least that's that what I observed; can look into a +# different approach: +# https://github.com/beef331/kashae/blob/master/src/kashae.nim#L204-L220 +# (that approach was recommended in #main channel of the official Nim discord +# server when I asked about the unexpected AST things) + +# macro task*(kind: TaskKind, body: untyped): untyped = +# result = newStmtList() +# result.add(quote do: task(`kind`, false, `body`)) +# +# macro task*(stoppable: bool, body: untyped): untyped = +# result = newStmtList() +# result.add(quote do: task(no_rts, `stoppable`, `body`)) +# +# macro task*(body: untyped): untyped = +# result = newStmtList() +# result.add(quote do: task(no_rts, false, `body`)) diff --git a/task_runner/runner.nim b/task_runner/runner.nim new file mode 100644 index 0000000..54ba5c8 --- /dev/null +++ b/task_runner/runner.nim @@ -0,0 +1,64 @@ +import # std libs + std/[atomics, tables] + +import # task_runner libs + ./tasks, ./workers + +export atomics, tables, tasks, workers + +logScope: + topics = "task_runner" + +type + WorkerTable = TableRef[string, tuple[kind: WorkerKind, worker: Worker]] + + TaskRunner* = ref object + running*: Atomic[bool] + workers*: WorkerTable + +proc newWorkerTable*(): WorkerTable = + newTable[string, tuple[kind: WorkerKind, worker: Worker]]() + +proc new*(T: type TaskRunner, workers: WorkerTable = newWorkerTable()): T = + # Atomic[bool] is `false` by default, no need to initialize `running` + T(workers: workers) + +proc start*(self: TaskRunner) {.async.} = + trace "task runner starting" + var starting: seq[Future[void]] = @[] + for v in self.workers.values: + let (kind, worker) = v + case kind: + of pool: + starting.add cast[PoolWorker](worker).start() + of thread: + starting.add cast[ThreadWorker](worker).start() + await allFutures(starting) + trace "task runner started" + self.running.store(true) + +proc stop*(self: TaskRunner) {.async.} = + trace "task runner stopping" + self.running.store(false) + var stopping: seq[Future[void]] = @[] + for v in self.workers.values: + let (kind, worker) = v + case kind: + of pool: + stopping.add(cast[PoolWorker](worker).stop()) + of thread: + stopping.add(cast[ThreadWorker](worker).stop()) + await allFutures(stopping) + trace "task runner stopped" + +proc createWorker*(self: TaskRunner, kind: WorkerKind, name: string, + context: Context = emptyContext, contextArg: ContextArg = ContextArg(), + size = DefaultPoolSize) = + let running = cast[pointer](addr self.running) + case kind: + of pool: + self.workers[name] = (kind: kind, + worker: PoolWorker.new(name, running, context, contextArg, size)) + of thread: + self.workers[name] = (kind: kind, + worker: ThreadWorker.new(name, running, context, contextArg)) diff --git a/task_runner/sys.nim b/task_runner/sys.nim deleted file mode 100644 index c60b5bc..0000000 --- a/task_runner/sys.nim +++ /dev/null @@ -1,12 +0,0 @@ -type - ThreadSafeString* = distinct cstring - -proc safe*(input: string): ThreadSafeString = - var res = cast[cstring](allocShared(input.len + 1)) - copyMem(res, input.cstring, input.len) - res[input.len] = '\0' - res.ThreadSafeString - -proc `$`*(input: ThreadSafeString): string = - result = $(input.cstring) - deallocShared input.cstring diff --git a/task_runner/tasks.nim b/task_runner/tasks.nim new file mode 100644 index 0000000..1f79737 --- /dev/null +++ b/task_runner/tasks.nim @@ -0,0 +1,48 @@ +import # vendor libs + chronicles, chronos, json_serialization, json_serialization/std/options + +export chronicles, chronos, json_serialization, options + +type + ContextArg* = ref object of RootObj + + Context* = proc(arg: ContextArg): Future[void] {.gcsafe, nimcall, raises: [Defect].} + + Task* = proc(taskArgEncoded: string): Future[void] {.gcsafe, nimcall, raises: [Defect].} + + TaskKind* = enum no_rts, rts # rts := "return to sender" + + TaskArg* = ref object of RootObj + chanSendToHost*: ByteAddress # pointer to channel for sending to host + task*: ByteAddress # pointer to task proc + taskName*: string + workerRunning*: ByteAddress # pointer to taskRunner's .running Atomic[bool] + + ThreadSafeString* = distinct cstring + +# there should eventually be the ability to reliably stop individual workers, +# i.e. each worker would have it's own `.running` Atomic[bool] (maybe +# reconsider the naming, e.g. "workerStopped" vs. "workerRunning" to be +# consistent with "taskStopped", or switch the latter to +# "taskRunning"). Currently, a TaskRunner instance's `.running` Atomic[bool] +# serves as a "master switch" for all the workers, so it's not completely +# inaccurate for the field on TaskArg to be named `workerRunning` + +const emptyContext*: Context = + proc(arg: ContextArg) {.async, gcsafe, nimcall.} = discard + +proc decode*[T](arg: string): T = + Json.decode(arg, T, allowUnknownFields = true) + +proc encode*[T](arg: T): string = + arg.toJson(typeAnnotations = true) + +proc `$`*(input: ThreadSafeString): string = + result = $(input.cstring) + deallocShared input.cstring + +proc safe*(input: string): ThreadSafeString = + var res = cast[cstring](allocShared(input.len + 1)) + copyMem(res, input.cstring, input.len) + res[input.len] = '\0' + res.ThreadSafeString diff --git a/task_runner/workers.nim b/task_runner/workers.nim new file mode 100644 index 0000000..75e7107 --- /dev/null +++ b/task_runner/workers.nim @@ -0,0 +1,4 @@ +import # task_runner libs + ./workers/[pool_worker, thread_worker, worker] + +export pool_worker, thread_worker, worker diff --git a/task_runner/workers/pool_worker.nim b/task_runner/workers/pool_worker.nim new file mode 100644 index 0000000..f5dcc39 --- /dev/null +++ b/task_runner/workers/pool_worker.nim @@ -0,0 +1,314 @@ +import # std libs + std/[atomics, json, sequtils, tables] + +import # task_runner libs + ./worker + +export json, sequtils + +logScope: + topics = "task_runner" + +type + PoolThreadArg = ref object of ThreadArg + poolName: string + poolSize: int + + PoolWorker* = ref object of Worker + size*: int + thread: Thread[PoolThreadArg] + + WorkerThreadArg = ref object of ThreadArg + poolName: string + workerId: int + + ThreadWorker = ref object of Worker + id: int + thread: Thread[WorkerThreadArg] + + WorkerNotification = ref object + id: int + notice: string + +proc poolThread(arg: PoolThreadArg) {.thread.} + +proc workerThread(arg: WorkerThreadArg) {.thread.} + +const DefaultPoolSize* = 16 + +proc new*(T: type PoolWorker, name: string, running: pointer, + context: Context = emptyContext, contextArg: ContextArg = ContextArg(), + size: int = DefaultPoolSize, awaitTasks = true): T = + let + chanRecvFromWorker = newWorkerChannel() + chanSendToWorker = newWorkerChannel() + thread = Thread[PoolThreadArg]() + + T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker, + chanSendToWorker: chanSendToWorker, context: context, + contextArg: contextArg, name: name, running: running, size: size, + thread: thread) + +proc start*(self: PoolWorker) {.async.} = + trace "pool starting", pool=self.name, poolSize=self.size + self.chanRecvFromWorker.open() + self.chanSendToWorker.open() + let arg = PoolThreadArg(awaitTasks: self.awaitTasks, + chanRecvFromHost: self.chanSendToWorker, + chanSendToHost: self.chanRecvFromWorker, context: self.context, + contextArg: self.contextArg, running: self.running, poolName: self.name, + poolSize: self.size) + + createThread(self.thread, poolThread, arg) + let notice = $(await self.chanRecvFromWorker.recv()) + trace "pool started", notice, pool=self.name, poolSize=self.size + +proc stop*(self: PoolWorker) {.async.} = + asyncSpawn self.chanSendToWorker.send("stop".safe) + joinThread(self.thread) + self.chanRecvFromWorker.close() + self.chanSendToWorker.close() + trace "pool stopped", pool=self.name, poolSize=self.size + +proc new*(T: type ThreadWorker, name: string, id: int, running: pointer, + chanRecvFromWorker: WorkerChannel, context: Context = emptyContext, + contextArg: ContextArg = ContextArg(), awaitTasks = true): T = + let + chanSendToWorker = newWorkerChannel() + thread = Thread[WorkerThreadArg]() + + T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker, + chanSendToWorker: chanSendToWorker, context: context, + contextArg: contextArg, name: name, running: running, id: id, + thread: thread) + +proc start*(self: ThreadWorker) {.async.} = + self.chanSendToWorker.open() + let arg = WorkerThreadArg(awaitTasks: self.awaitTasks, + chanRecvFromHost: self.chanSendToWorker, + chanSendToHost: self.chanRecvFromWorker, context: self.context, + contextArg: self.contextArg, running: self.running, poolName: self.name, + workerId: self.id) + + createThread(self.thread, workerThread, arg) + +proc stop*(self: ThreadWorker) {.async.} = + asyncSpawn self.chanSendToWorker.send("stop".safe) + joinThread(self.thread) + self.chanSendToWorker.close() + trace "pool worker stopped", pool=self.name, workerId=self.id + +proc pool(arg: PoolThreadArg) {.async, raises: [Defect].} = + let + chanRecvFromHostOrWorker = arg.chanRecvFromHost + chanSendToHost = arg.chanSendToHost + pool = arg.poolName + poolSize = arg.poolSize + + var running = cast[ptr Atomic[bool]](arg.running) + + chanRecvFromHostOrWorker.open() + chanSendToHost.open() + + let notice = "ready" + trace "pool sent notification to host", notice, pool + asyncSpawn chanSendToHost.send(notice.safe) + + var + taskQueue: seq[string] = @[] # FIFO queue + workersBusy = newTable[int, ThreadWorker]() + workersIdle: seq[ThreadWorker] = @[] + workersStarted = 0 + + for i in 0.. 0 and + workersBusy.len < poolSize: + message = taskQueue[0] + taskQueue.delete 0, 0 + trace "pool removed task from queue", pool, queued=taskQueue.len + shouldSendToWorker = true + + elif shouldSendToWorker and taskQueue.len > 0 and + workersBusy.len < poolSize: + taskQueue.add message + message = taskQueue[0] + taskQueue.delete 0, 0 + trace "pool added task to queue and removed oldest task from queue", + pool, queued=taskQueue.len + + elif shouldSendToWorker and workersBusy.len == poolSize: + taskQueue.add message + trace "pool added task to queue", pool, queued=taskQueue.len + shouldSendToWorker = false + + if shouldSendToWorker: + let + worker = workersIdle[0] + workerId = worker.id + + workersIdle.delete 0, 0 + workersBusy[workerId] = worker + trace "pool sent task to worker", pool, workerId + trace "pool marked worker as busy", pool, poolSize, workerId, + workersBusy=workersBusy.len, workersIdle=workersIdle.len + + asyncSpawn worker.chanSendToWorker.send(message.safe) + + chanRecvFromHostOrWorker.close() + chanSendToHost.close() + +proc poolThread(arg: PoolThreadArg) {.thread.} = + waitFor pool(arg) + +proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} = + let + awaitTasks = arg.awaitTasks + chanRecvFromHost = arg.chanRecvFromHost + chanSendToHost = arg.chanSendToHost + pool = arg.poolName + workerId = arg.workerId + + var running = cast[ptr Atomic[bool]](arg.running) + + chanRecvFromHost.open() + chanSendToHost.open() + + trace "pool worker running context", pool, workerId + await arg.context(arg.contextArg) + let + notice = "ready" + notification = WorkerNotification(id: workerId, notice: notice) + + trace "pool worker sent notification to pool", notice, pool, workerId + asyncSpawn chanSendToHost.send(notification.encode.safe) + + while true: + trace "pool worker waiting for message", pool, workerId + let message = $(await chanRecvFromHost.recv()) + + if message == "stop": + trace "pool worker stopping", notice=message, pool, workerId + break + + if running[].load(): + var + msgerr = false + parsed: JsonNode + task: Task + taskName: string + + try: + parsed = parseJson(message) + task = cast[Task](parsed{"task"}.getInt) + taskName = parsed{"taskName"}.getStr + + trace "pool worker received message", message, pool, workerId + trace "pool worker running task", pool, task=taskName, workerId + + except CatchableError as e: + msgerr = true + error "pool worker received unknown message", error=e.msg, message, + pool, workerId + + if not msgerr: + if awaitTasks: + await task(message) + else: + asyncSpawn task(message) + + let + notice = "done" + notification = WorkerNotification(id: workerId, notice: notice) + + trace "pool worker sent notification to pool", notice, pool, workerId + asyncSpawn chanSendToHost.send(notification.encode.safe) + + chanRecvFromHost.close() + chanSendToHost.close() + +proc workerThread(arg: WorkerThreadArg) {.thread.} = + waitFor worker(arg) diff --git a/task_runner/workers/thread_worker.nim b/task_runner/workers/thread_worker.nim new file mode 100644 index 0000000..900e403 --- /dev/null +++ b/task_runner/workers/thread_worker.nim @@ -0,0 +1,108 @@ +import # std libs + std/[atomics, json] + +import # task_runner libs + ./worker + +export json + +logScope: + topics = "task_runner" + +type + WorkerThreadArg = ref object of ThreadArg + workerName: string + + ThreadWorker* = ref object of Worker + thread: Thread[WorkerThreadArg] + +proc workerThread(arg: WorkerThreadArg) {.thread.} + +proc new*(T: type ThreadWorker, name: string, running: pointer, + context: Context = emptyContext, contextArg: ContextArg = ContextArg(), + awaitTasks = false): T = + let + chanRecvFromWorker = newWorkerChannel() + chanSendToWorker = newWorkerChannel() + thread = Thread[WorkerThreadArg]() + + T(awaitTasks: awaitTasks, chanRecvFromWorker: chanRecvFromWorker, + chanSendToWorker: chanSendToWorker, context: context, + contextArg: contextArg, name: name, running: running, thread: thread) + +proc start*(self: ThreadWorker) {.async.} = + trace "worker starting", worker=self.name + self.chanRecvFromWorker.open() + self.chanSendToWorker.open() + let arg = WorkerThreadArg(awaitTasks: self.awaitTasks, + chanRecvFromHost: self.chanSendToWorker, + chanSendToHost: self.chanRecvFromWorker, context: self.context, + contextArg: self.contextArg, running: self.running, workerName: self.name) + + createThread(self.thread, workerThread, arg) + let notice = $(await self.chanRecvFromWorker.recv()) + trace "worker started", notice, worker=self.name + +proc stop*(self: ThreadWorker) {.async.} = + asyncSpawn self.chanSendToWorker.send("stop".safe) + joinThread(self.thread) + self.chanRecvFromWorker.close() + self.chanSendToWorker.close() + trace "worker stopped", worker=self.name + +proc worker(arg: WorkerThreadArg) {.async, raises: [Defect].} = + let + awaitTasks = arg.awaitTasks + chanRecvFromHost = arg.chanRecvFromHost + chanSendToHost = arg.chanSendToHost + worker = arg.workerName + + var running = cast[ptr Atomic[bool]](arg.running) + + chanRecvFromHost.open() + chanSendToHost.open() + + trace "worker running context", worker + await arg.context(arg.contextArg) + let notice = "ready" + trace "worker sent notification to host", notice, worker + asyncSpawn chanSendToHost.send(notice.safe) + + while true: + trace "worker waiting for message", worker + let message = $(await chanRecvFromHost.recv()) + + if message == "stop": + trace "worker stopping", notice=message, worker + break + + if running[].load(): + var + msgerr = false + parsed: JsonNode + task: Task + taskName: string + + try: + parsed = parseJson(message) + task = cast[Task](parsed{"task"}.getInt) + taskName = parsed{"taskName"}.getStr + + trace "worker received message", message, worker + trace "worker running task", task=taskName, worker + + except CatchableError as e: + msgerr = true + error "worker received unknown message", error=e.msg, message, worker + + if not msgerr: + if awaitTasks: + await task(message) + else: + asyncSpawn task(message) + + chanRecvFromHost.close() + chanSendToHost.close() + +proc workerThread(arg: WorkerThreadArg) {.thread.} = + waitFor worker(arg) diff --git a/task_runner/workers/worker.nim b/task_runner/workers/worker.nim new file mode 100644 index 0000000..fb070f5 --- /dev/null +++ b/task_runner/workers/worker.nim @@ -0,0 +1,29 @@ +import # task_runner libs + ../achannels, ../tasks + +export achannels, tasks + +type + WorkerChannel* = AsyncChannel[ThreadSafeString] + + WorkerKind* = enum pool, thread + + ThreadArg* = ref object of RootObj + awaitTasks*: bool + chanRecvFromHost*: WorkerChannel + chanSendToHost*: WorkerChannel + context*: Context + contextArg*: ContextArg + running*: pointer + + Worker* = ref object of RootObj + awaitTasks*: bool + chanRecvFromWorker*: WorkerChannel + chanSendToWorker*: WorkerChannel + context*: Context + contextArg*: ContextArg + name*: string + running*: pointer + +proc newWorkerChannel*(): WorkerChannel = + newAsyncChannel[ThreadSafeString](-1) diff --git a/vendor/nim-bearssl b/vendor/nim-bearssl index 0a7401a..dc62f4f 160000 --- a/vendor/nim-bearssl +++ b/vendor/nim-bearssl @@ -1 +1 @@ -Subproject commit 0a7401ad466d70bab31c5d6dc82d1d584e4ebd1f +Subproject commit dc62f4fccd2d40c884009ae8f2b14bb6a86a55cf diff --git a/vendor/nim-chronicles b/vendor/nim-chronicles new file mode 160000 index 0000000..63ce43a --- /dev/null +++ b/vendor/nim-chronicles @@ -0,0 +1 @@ +Subproject commit 63ce43a86a40a4c73d1b3b8317278d47ec55a458 diff --git a/vendor/nim-faststreams b/vendor/nim-faststreams new file mode 160000 index 0000000..5eb7fd0 --- /dev/null +++ b/vendor/nim-faststreams @@ -0,0 +1 @@ +Subproject commit 5eb7fd0c90d3f03b6778688a5893fdd2715e9fe2 diff --git a/vendor/nim-http-utils b/vendor/nim-http-utils index 613ad40..8b492c7 160000 --- a/vendor/nim-http-utils +++ b/vendor/nim-http-utils @@ -1 +1 @@ -Subproject commit 613ad40f00ab3d0ee839f9db9c4d25e5e0248dee +Subproject commit 8b492c74b56c62bcee991a6899d413938a3accc5 diff --git a/vendor/nim-json-serialization b/vendor/nim-json-serialization new file mode 160000 index 0000000..652099a --- /dev/null +++ b/vendor/nim-json-serialization @@ -0,0 +1 @@ +Subproject commit 652099a95960be7790e2f4b4c925d0dd703cc9aa diff --git a/vendor/nim-serialization b/vendor/nim-serialization new file mode 160000 index 0000000..5213d39 --- /dev/null +++ b/vendor/nim-serialization @@ -0,0 +1 @@ +Subproject commit 5213d397f9d85c69279961256e19a859cd32df30 diff --git a/vendor/nim-stew b/vendor/nim-stew index ede0651..70680e2 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit ede0651741aa4f14f76c5560c3d2c6730757366d +Subproject commit 70680e2af2f3b0ead9ea49969731910e0898fbbe diff --git a/vendor/nim-unittest2 b/vendor/nim-unittest2 index e788dea..91d4eaa 160000 --- a/vendor/nim-unittest2 +++ b/vendor/nim-unittest2 @@ -1 +1 @@ -Subproject commit e788deab3d59ff8a4fe103aeb5d82d3d82fcac7d +Subproject commit 91d4eaa4ccb4bfddf179fe2ee4247ae000e2587f diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 2b097ec..12c3591 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 2b097ec86aead0119c5e6bfff8502c3948a1ceaf +Subproject commit 12c3591fc165e88e90f5a0f1e5f5f37c69ef41f4