chore: change threadpool logs from debug to trace
This commit is contained in:
parent
0d136d8879
commit
0d677eb156
|
@ -59,7 +59,7 @@ proc teardown*(self: ThreadPool) =
|
||||||
self.chanSendToPool.sendSync("shutdown".safe)
|
self.chanSendToPool.sendSync("shutdown".safe)
|
||||||
self.chanRecvFromPool.close()
|
self.chanRecvFromPool.close()
|
||||||
self.chanSendToPool.close()
|
self.chanSendToPool.close()
|
||||||
debug "[threadpool] waiting for the control thread to stop"
|
trace "[threadpool] waiting for the control thread to stop"
|
||||||
joinThread(self.thread)
|
joinThread(self.thread)
|
||||||
|
|
||||||
proc start*[T: TaskArg](self: Threadpool, arg: T) =
|
proc start*[T: TaskArg](self: Threadpool, arg: T) =
|
||||||
|
@ -70,21 +70,21 @@ proc runner(arg: TaskThreadArg) {.async.} =
|
||||||
arg.chanSendToPool.open()
|
arg.chanSendToPool.open()
|
||||||
|
|
||||||
let noticeToPool = ThreadNotification(id: arg.id, notice: "ready")
|
let noticeToPool = ThreadNotification(id: arg.id, notice: "ready")
|
||||||
debug "[threadpool task thread] sending 'ready'", threadid=arg.id
|
trace "[threadpool task thread] sending 'ready'", threadid=arg.id
|
||||||
await arg.chanSendToPool.send(noticeToPool.encode.safe)
|
await arg.chanSendToPool.send(noticeToPool.encode.safe)
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
debug "[threadpool task thread] waiting for message"
|
trace "[threadpool task thread] waiting for message"
|
||||||
let received = $(await arg.chanRecvFromPool.recv())
|
let received = $(await arg.chanRecvFromPool.recv())
|
||||||
|
|
||||||
if received == "shutdown":
|
if received == "shutdown":
|
||||||
debug "[threadpool task thread] received 'shutdown'"
|
trace "[threadpool task thread] received 'shutdown'"
|
||||||
break
|
break
|
||||||
|
|
||||||
let
|
let
|
||||||
parsed = parseJson(received)
|
parsed = parseJson(received)
|
||||||
messageType = parsed{"$type"}.getStr
|
messageType = parsed{"$type"}.getStr
|
||||||
debug "[threadpool task thread] initiating task", messageType=messageType,
|
trace "[threadpool task thread] initiating task", messageType=messageType,
|
||||||
threadid=arg.id
|
threadid=arg.id
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -97,7 +97,7 @@ proc runner(arg: TaskThreadArg) {.async.} =
|
||||||
error "[threadpool task thread] unknown message", message=received
|
error "[threadpool task thread] unknown message", message=received
|
||||||
|
|
||||||
let noticeToPool = ThreadNotification(id: arg.id, notice: "done")
|
let noticeToPool = ThreadNotification(id: arg.id, notice: "done")
|
||||||
debug "[threadpool task thread] sending 'done' notice to pool",
|
trace "[threadpool task thread] sending 'done' notice to pool",
|
||||||
threadid=arg.id
|
threadid=arg.id
|
||||||
await arg.chanSendToPool.send(noticeToPool.encode.safe)
|
await arg.chanSendToPool.send(noticeToPool.encode.safe)
|
||||||
|
|
||||||
|
@ -120,14 +120,14 @@ proc pool(arg: PoolThreadArg) {.async.} =
|
||||||
chanSendToMain.open()
|
chanSendToMain.open()
|
||||||
chanRecvFromMainOrTask.open()
|
chanRecvFromMainOrTask.open()
|
||||||
|
|
||||||
debug "[threadpool] sending 'ready' to main thread"
|
trace "[threadpool] sending 'ready' to main thread"
|
||||||
await chanSendToMain.send("ready".safe)
|
await chanSendToMain.send("ready".safe)
|
||||||
|
|
||||||
for i in 0..<arg.size:
|
for i in 0..<arg.size:
|
||||||
let id = i + 1
|
let id = i + 1
|
||||||
let chanSendToTask = newAsyncChannel[ThreadSafeString](-1)
|
let chanSendToTask = newAsyncChannel[ThreadSafeString](-1)
|
||||||
chanSendToTask.open()
|
chanSendToTask.open()
|
||||||
debug "[threadpool] adding to threadsIdle", threadid=id
|
trace "[threadpool] adding to threadsIdle", threadid=id
|
||||||
threadsIdle[i].id = id
|
threadsIdle[i].id = id
|
||||||
createThread(
|
createThread(
|
||||||
threadsIdle[i].thr,
|
threadsIdle[i].thr,
|
||||||
|
@ -149,11 +149,11 @@ proc pool(arg: PoolThreadArg) {.async.} =
|
||||||
# push thread into threadsIdle
|
# push thread into threadsIdle
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
debug "[threadpool] waiting for message"
|
trace "[threadpool] waiting for message"
|
||||||
var task = $(await chanRecvFromMainOrTask.recv())
|
var task = $(await chanRecvFromMainOrTask.recv())
|
||||||
|
|
||||||
if task == "shutdown":
|
if task == "shutdown":
|
||||||
debug "[threadpool] sending 'shutdown' to all task threads"
|
trace "[threadpool] sending 'shutdown' to all task threads"
|
||||||
for tpl in threadsIdle:
|
for tpl in threadsIdle:
|
||||||
await tpl.chanSendToTask.send("shutdown".safe)
|
await tpl.chanSendToTask.send("shutdown".safe)
|
||||||
for tpl in threadsBusy.values:
|
for tpl in threadsBusy.values:
|
||||||
|
@ -163,39 +163,39 @@ proc pool(arg: PoolThreadArg) {.async.} =
|
||||||
let
|
let
|
||||||
jsonNode = parseJson(task)
|
jsonNode = parseJson(task)
|
||||||
messageType = jsonNode{"$type"}.getStr
|
messageType = jsonNode{"$type"}.getStr
|
||||||
debug "[threadpool] determined message type", messageType=messageType
|
trace "[threadpool] determined message type", messageType=messageType
|
||||||
|
|
||||||
case messageType
|
case messageType
|
||||||
of "ThreadNotification":
|
of "ThreadNotification":
|
||||||
try:
|
try:
|
||||||
let notification = decode[ThreadNotification](task)
|
let notification = decode[ThreadNotification](task)
|
||||||
debug "[threadpool] received notification",
|
trace "[threadpool] received notification",
|
||||||
notice=notification.notice, threadid=notification.id
|
notice=notification.notice, threadid=notification.id
|
||||||
|
|
||||||
if notification.notice == "ready":
|
if notification.notice == "ready":
|
||||||
debug "[threadpool] received 'ready' from a task thread"
|
trace "[threadpool] received 'ready' from a task thread"
|
||||||
allReady = allReady + 1
|
allReady = allReady + 1
|
||||||
|
|
||||||
elif notification.notice == "done":
|
elif notification.notice == "done":
|
||||||
let tpl = threadsBusy[notification.id]
|
let tpl = threadsBusy[notification.id]
|
||||||
debug "[threadpool] adding to threadsIdle",
|
trace "[threadpool] adding to threadsIdle",
|
||||||
newlength=(threadsIdle.len + 1)
|
newlength=(threadsIdle.len + 1)
|
||||||
threadsIdle.add (notification.id, tpl.thr, tpl.chanSendToTask)
|
threadsIdle.add (notification.id, tpl.thr, tpl.chanSendToTask)
|
||||||
debug "[threadpool] removing from threadsBusy",
|
trace "[threadpool] removing from threadsBusy",
|
||||||
newlength=(threadsBusy.len - 1), threadid=notification.id
|
newlength=(threadsBusy.len - 1), threadid=notification.id
|
||||||
threadsBusy.del notification.id
|
threadsBusy.del notification.id
|
||||||
|
|
||||||
if taskQueue.len > 0:
|
if taskQueue.len > 0:
|
||||||
debug "[threadpool] removing from taskQueue",
|
trace "[threadpool] removing from taskQueue",
|
||||||
newlength=(taskQueue.len - 1)
|
newlength=(taskQueue.len - 1)
|
||||||
task = taskQueue[0]
|
task = taskQueue[0]
|
||||||
taskQueue.delete 0, 0
|
taskQueue.delete 0, 0
|
||||||
|
|
||||||
debug "[threadpool] removing from threadsIdle",
|
trace "[threadpool] removing from threadsIdle",
|
||||||
newlength=(threadsIdle.len - 1)
|
newlength=(threadsIdle.len - 1)
|
||||||
let tpl = threadsIdle[0]
|
let tpl = threadsIdle[0]
|
||||||
threadsIdle.delete 0, 0
|
threadsIdle.delete 0, 0
|
||||||
debug "[threadpool] adding to threadsBusy",
|
trace "[threadpool] adding to threadsBusy",
|
||||||
newlength=(threadsBusy.len + 1), threadid=tpl.id
|
newlength=(threadsBusy.len + 1), threadid=tpl.id
|
||||||
threadsBusy.add tpl.id, (tpl.thr, tpl.chanSendToTask)
|
threadsBusy.add tpl.id, (tpl.thr, tpl.chanSendToTask)
|
||||||
await tpl.chanSendToTask.send(task.safe)
|
await tpl.chanSendToTask.send(task.safe)
|
||||||
|
@ -208,7 +208,7 @@ proc pool(arg: PoolThreadArg) {.async.} =
|
||||||
else: # must be a request to do task work
|
else: # must be a request to do task work
|
||||||
if allReady < arg.size or threadsBusy.len == arg.size:
|
if allReady < arg.size or threadsBusy.len == arg.size:
|
||||||
# add to queue
|
# add to queue
|
||||||
debug "[threadpool] adding to taskQueue",
|
trace "[threadpool] adding to taskQueue",
|
||||||
newlength=(taskQueue.len + 1)
|
newlength=(taskQueue.len + 1)
|
||||||
taskQueue.add task
|
taskQueue.add task
|
||||||
|
|
||||||
|
@ -217,19 +217,19 @@ proc pool(arg: PoolThreadArg) {.async.} =
|
||||||
# check if we have tasks waiting on queue
|
# check if we have tasks waiting on queue
|
||||||
if taskQueue.len > 0:
|
if taskQueue.len > 0:
|
||||||
# remove first element from the task queue
|
# remove first element from the task queue
|
||||||
debug "[threadpool] adding to taskQueue",
|
trace "[threadpool] adding to taskQueue",
|
||||||
newlength=(taskQueue.len + 1)
|
newlength=(taskQueue.len + 1)
|
||||||
taskQueue.add task
|
taskQueue.add task
|
||||||
debug "[threadpool] removing from taskQueue",
|
trace "[threadpool] removing from taskQueue",
|
||||||
newlength=(taskQueue.len - 1)
|
newlength=(taskQueue.len - 1)
|
||||||
task = taskQueue[0]
|
task = taskQueue[0]
|
||||||
taskQueue.delete 0, 0
|
taskQueue.delete 0, 0
|
||||||
|
|
||||||
debug "[threadpool] removing from threadsIdle",
|
trace "[threadpool] removing from threadsIdle",
|
||||||
newlength=(threadsIdle.len - 1)
|
newlength=(threadsIdle.len - 1)
|
||||||
let tpl = threadsIdle[0]
|
let tpl = threadsIdle[0]
|
||||||
threadsIdle.delete 0, 0
|
threadsIdle.delete 0, 0
|
||||||
debug "[threadpool] adding to threadsBusy",
|
trace "[threadpool] adding to threadsBusy",
|
||||||
newlength=(threadsBusy.len + 1), threadid=tpl.id
|
newlength=(threadsBusy.len + 1), threadid=tpl.id
|
||||||
threadsBusy.add tpl.id, (tpl.thr, tpl.chanSendToTask)
|
threadsBusy.add tpl.id, (tpl.thr, tpl.chanSendToTask)
|
||||||
await tpl.chanSendToTask.send(task.safe)
|
await tpl.chanSendToTask.send(task.safe)
|
||||||
|
@ -246,7 +246,7 @@ proc pool(arg: PoolThreadArg) {.async.} =
|
||||||
chanSendToMain.close()
|
chanSendToMain.close()
|
||||||
chanRecvFromMainOrTask.close()
|
chanRecvFromMainOrTask.close()
|
||||||
|
|
||||||
debug "[threadpool] waiting for all task threads to stop"
|
trace "[threadpool] waiting for all task threads to stop"
|
||||||
joinThreads(allTaskThreads)
|
joinThreads(allTaskThreads)
|
||||||
|
|
||||||
proc poolThread(arg: PoolThreadArg) {.thread.} =
|
proc poolThread(arg: PoolThreadArg) {.thread.} =
|
||||||
|
|
Loading…
Reference in New Issue