[Threadpool] Remove reserve threads (#223)

* remove reserve threads

* recover last perf diff: 1. don't import primitives, cpu features detection globals are noticeable, 2. noinit + conditional zeroMem are unnecessary when sync is inline 3. inline 'newSpawn' and don't init the loop part

* avoid syscalls if possible if thred is awake but idle

* renaming eventLoop

* remove unused code: steal-half

* renaming

* no need for 0-init sync, T can be large in cryptography
This commit is contained in:
Mamy Ratsimbazafy 2023-02-24 17:36:04 +01:00 committed by GitHub
parent bf32c2d408
commit 1dfbb8bd4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 306 additions and 446 deletions

View File

@ -243,6 +243,8 @@ const testDescNvidia: seq[string] = @[
const testDescThreadpool: seq[string] = @[
"constantine/platforms/threadpool/examples/e01_simple_tasks.nim",
"constantine/platforms/threadpool/examples/e02_parallel_pi.nim",
"constantine/platforms/threadpool/examples/e03_parallel_for.nim",
"constantine/platforms/threadpool/examples/e04_parallel_reduce.nim",
# "constantine/platforms/threadpool/benchmarks/bouncing_producer_consumer/threadpool_bpc.nim", # Need timing not implemented on Windows
"constantine/platforms/threadpool/benchmarks/dfs/threadpool_dfs.nim",
"constantine/platforms/threadpool/benchmarks/fibonacci/threadpool_fib.nim",

View File

@ -34,6 +34,10 @@ export
allocs,
compiler_optim_hints
# Note:
# - cpuinfo_x86 initialize globals with following CPU features detection.
# This will impact benchmarks that do not need it, such as the threadpool.
when X86 and GCC_Compatible:
import isa/[cpuinfo_x86, macro_assembler_x86]
export cpuinfo_x86, macro_assembler_x86
@ -69,7 +73,7 @@ func ceilDiv_vartime*(a, b: auto): auto {.inline.} =
## ceil division, to be used only on length or at compile-time
## ceil(a / b)
# "LengthInDigits: static int" doesn't match "int"
# if "SomeInteger" is used instead of "autoi"
# if "SomeInteger" is used instead of "auto"
(a + b - 1) div b
# ############################################################

View File

@ -16,13 +16,11 @@ This threadpool will desirable properties are:
and not dealing with threadpool contention, latencies and overheads.
Compared to [Weave](https://github.com/mratsim/weave), here are the tradeoffs:
- Constantine's threadpool only provide spawn/sync (task parallelism).\
There is no (extremely) optimized parallel for (data parallelism)\
or precise in/out dependencies (events / dataflow parallelism).
- Constantine's threadpool provides spawn/sync (task parallelism)
and optimized parallelFor for (data parallelism).\
It however does not provide precise in/out dependencies (events / dataflow parallelism).
- Constantine's threadpool has been significantly optimized to provide
overhead lower than Weave's default (and as low as Weave "lazy" + "alloca" allocation scheme)
- Constantine's threadpool provides the same adaptative scheduling strategy as Weave
with additional enhancement (leapfrogging)
overhead lower than Weave's default (and as low as Weave "lazy" + "alloca" allocation scheme).
Compared to [nim-taskpools](https://github.com/status-im), here are the tradeoffs:
- Constantine does not use std/tasks:
@ -35,4 +33,6 @@ Compared to [nim-taskpools](https://github.com/status-im), here are the tradeoff
- The future (Flowvar) result channel
- Contention improvement, Constantine is entirely lock-free while Nim-taskpools need a lock+condition variable for putting threads to sleep
- Powersaving improvement, threads sleep when awaiting for a task and there is no work available.
- Scheduling improvement, Constantine's threadpool incorporate Weave's adaptative scheduling policy with additional enhancement (leapfrogging)
- Scheduling improvement, Constantine's threadpool incorporate Weave's adaptative scheduling policy with additional enhancement (leapfrogging)
See also [design.md](./docs/design.md)

View File

@ -200,13 +200,17 @@ proc cancelSleep*(ec: var Eventcount) {.inline.} =
proc wake*(ec: var EventCount) {.inline.} =
## Wake a thread if at least 1 is parked
let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease)
if (prev and kAnyWaiterMask) != 0:
if (prev and kPreWaitMask) != 0:
# Some threads are in prewait and will see the epoch change
# no need to do an expensive syscall
return
if (prev and kWaitMask) != 0:
ec.futex.wake()
proc wakeAll*(ec: var EventCount) {.inline.} =
## Wake all threads if at least 1 is parked
let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease)
if (prev and kAnyWaiterMask) != 0:
if (prev and kWaitMask) != 0:
ec.futex.wakeAll()
proc getNumWaiters*(ec: var EventCount): tuple[preSleep, committedSleep: int32] {.noInit, inline.} =

View File

@ -14,25 +14,21 @@
# David Chase, Yossi Lev, 1993
# https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf
#
# - Non-Blocking Steal-Half Work Queues
# Danny Hendler, Nir Shavit, 2002
# https://www.cs.bgu.ac.il/~hendlerd/papers/p280-hendler.pdf
#
# - Correct and Efficient Work-Stealing for Weak Memory Models
# Nhat Minh Lê, Antoniu Pop, Albert Cohen, Francesco Zappa Nardelli, 2013
# https://fzn.fr/readings/ppopp13.pdf
#
# The task queue implements the following push, pop, steal, stealHalf
# The task queue implements the following push, pop, steal
#
# front back
# ---------------------------------
# steal() <- | | | | <- push()
# stealHalf() <- | Task 0 | Task 1 | Task 2 | -> pop()
# | Task 0 | Task 1 | Task 2 | -> pop()
# any thread | | | | owner-only
# ---------------------------------
#
# To reduce contention, stealing is done on the opposite end from push/pop
# so that there is a race only for the very last task(s).
# so that there is a race only for the very last task.
{.push raises: [], checks: off.} # No exceptions in a multithreading datastructure
@ -57,7 +53,7 @@ type
## Foreign threads steal at the front.
##
## There is no memory reclamation scheme for simplicity
front {.align: 64.}: Atomic[int] # Consumers - steal/stealHalf
front {.align: 64.}: Atomic[int] # Consumers - steal
back: Atomic[int] # Producer - push/pop
buf: Atomic[ptr Buf]
garbage: ptr Buf
@ -215,72 +211,3 @@ proc steal*(thiefID: int32, tq: var Taskqueue): ptr Task =
# Failed race.
return nil
result.thiefID.store(thiefID, moRelease)
proc stealHalfImpl(dst: var Buf, dstBack: int, src: var Taskqueue): int =
## Theft part of stealHalf:
## - updates the victim metadata if successful
## - uncommitted updates to the thief tq whether successful or not
## Returns -1 if dst buffer is too small
## Assumes `dst` buffer is empty (i.e. not ahead-of-time thefts)
while true:
# Try as long as there are something to steal, we are idling anyway.
var f = src.front.load(moAcquire)
fence(moSequentiallyConsistent)
let b = src.back.load(moAcquire)
var n = b-f
n = n - (n shr 1) # Division by 2 rounded up, so if only one task is left, we still get it.
if n <= 0:
return 0
if n > dst.capacity:
return -1
# Non-empty queue.
let sBuf = src.buf.load(moConsume)
for i in 0 ..< n: # Copy LIFO or FIFO?
dst[dstBack+i] = sBuf[][f+i]
if compareExchange(src.front, f, f+n, moSequentiallyConsistent, moRelaxed):
return n
proc stealHalf*(thiefID: int32, dst: var Taskqueue, src: var Taskqueue): ptr Task =
## Dequeue up to half of the items in the `src` tq, fom the front.
## Return the last of those, or nil if none found
while true:
# Prepare for batch steal
let
bDst = dst.back.load(moRelaxed)
fDst = dst.front.load(moAcquire)
var dBuf = dst.buf.load(moAcquire)
let sBuf = src.buf.load(moAcquire)
if dBuf.capacity < sBuf.capacity:
# We could grow to sBuf/2 since we steal half, but we want to minimize
# churn if we are actually in the process of stealing and the buffers grows.
dst.grow(dBuf, sBuf.capacity, fDst, bDst)
# Steal
let n = dBuf[].stealHalfImpl(bDst, src)
if n == 0:
return nil
if n == -1:
# Oops, victim buffer grew bigger than ours, restart the whole process
continue
# Update metadata
for i in 0 ..< n:
dBuf[][bDst+i].thiefID.store(thiefID, moRelease)
# Commit/publish theft, return the first item for processing
let last = dBuf[][bDst+n-1]
fence(moSequentiallyConsistent)
if n == 1:
return last
# We have more than one item, so some must go in our queue
# they are already here but inaccessible for other thieves
dst.back.store(bDst+n-1, moRelease) # We assume that queue was empty and so dst.front didn't change
return last

View File

@ -9,7 +9,7 @@
import
std/atomics,
../instrumentation,
../../allocs, ../../primitives,
../../allocs,
./backoff
# Tasks have an efficient design so that a single heap allocation
@ -27,14 +27,10 @@ import
type
Task* = object
# Intrusive metadata
# Synchronization
# ------------------
parent*: ptr Task # When a task is awaiting, a thread can quickly prioritize the direct child of a task
thiefID*: Atomic[int32] # ID of the worker that stole and run the task. For leapfrogging.
# Result sync
# ------------------
hasFuture*: bool # Ownership: if a task has a future, the future deallocates it. Otherwise the worker thread does.
completed*: Atomic[bool]
waiter*: Atomic[ptr EventNotifier]
@ -42,25 +38,20 @@ type
# Data parallelism
# ------------------
isFirstIter*: bool # Awaitable for-loops return true for first iter. Loops are split before first iter.
envSize*: int32 # This is used for splittable loop
loopStart*: int
loopStop*: int
loopStride*: int
loopStepsLeft*: int
reductionDAG*: ptr ReductionDagNode # For parallel loop reduction, merge with other range result
# Dataflow parallelism
# --------------------
dependsOnEvent: bool # We cannot leapfrog a task triggered by an event
# Execution
# ------------------
fn*: proc (env: pointer) {.nimcall, gcsafe, raises: [].}
# destroy*: proc (env: pointer) {.nimcall, gcsafe.} # Constantine only deals with plain old data
envSize*: int32
# destroy*: proc (env: pointer) {.nimcall, gcsafe.} # Constantine only deals with plain old env
env*{.align:sizeof(int).}: UncheckedArray[byte]
Flowvar*[T] = object
## A Flowvar is a placeholder for a future result that may be computed in parallel
task: ptr Task
ReductionDagNode* = object
@ -82,7 +73,8 @@ const SentinelThief* = 0xFACADE'i32
proc newSpawn*(
T: typedesc[Task],
parent: ptr Task,
fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].}): ptr Task =
fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].}
): ptr Task {.inline.} =
const size = sizeof(T)
@ -93,22 +85,12 @@ proc newSpawn*(
result.completed.store(false, moRelaxed)
result.waiter.store(nil, moRelaxed)
result.fn = fn
result.envSize = 0
result.isFirstIter = false
result.loopStart = 0
result.loopStop = 0
result.loopStride = 0
result.loopStepsLeft = 0
result.reductionDAG = nil
result.dependsOnEvent = false
proc newSpawn*(
T: typedesc[Task],
parent: ptr Task,
fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].},
env: auto): ptr Task =
env: auto): ptr Task {.inline.} =
const size = sizeof(T) + # size without Unchecked
sizeof(env)
@ -120,24 +102,20 @@ proc newSpawn*(
result.completed.store(false, moRelaxed)
result.waiter.store(nil, moRelaxed)
result.fn = fn
result.envSize = int32 sizeof(env)
cast[ptr[type env]](result.env)[] = env
result.isFirstIter = false
result.loopStart = 0
result.loopStop = 0
result.loopStride = 0
result.loopStepsLeft = 0
result.reductionDAG = nil
result.dependsOnEvent = false
func ceilDiv_vartime*(a, b: auto): auto {.inline.} =
## ceil division, to be used only on length or at compile-time
## ceil(a / b)
(a + b - 1) div b
proc newLoop*(
T: typedesc[Task],
parent: ptr Task,
start, stop, stride: int,
isFirstIter: bool,
fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].}): ptr Task =
fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].}
): ptr Task =
const size = sizeof(T)
preCondition: start < stop
@ -157,8 +135,6 @@ proc newLoop*(
result.loopStepsLeft = ceilDiv_vartime(stop-start, stride)
result.reductionDAG = nil
result.dependsOnEvent = false
proc newLoop*(
T: typedesc[Task],
parent: ptr Task,
@ -188,8 +164,6 @@ proc newLoop*(
result.loopStepsLeft = ceilDiv_vartime(stop-start, stride)
result.reductionDAG = nil
result.dependsOnEvent = false
# Flowvars
# -------------------------------------------------------------------------
@ -235,9 +209,6 @@ proc sync*[T](fv: sink Flowvar[T]): T {.noInit, inline, gcsafe.} =
## and returned.
## The thread is not idle and will complete pending tasks.
mixin completeFuture
if fv.task.isNil:
zeroMem(result.addr, sizeof(T))
return
completeFuture(fv, result)
cleanup(fv)

View File

@ -22,7 +22,7 @@ Also neither supports putting awaiting threads to sleep when the future they wan
| Communication mechanisms | Shared-memory | Message-passing / Channels | Shared-memory | Shared-memory
| Load balancing strategy | static (GCC), work-stealing (Intel/LLVM) | work-sharing / work-requesting | work-stealing | work-stealing |
| Blocked tasks don't block runtime | N/A | no | yes | yes |
| Load-balancing strategy for task parallelism (important for fine-grained parallelism) | global queue (GCC), steal-one (Intel/LLVM) | Adaptative steal-one/steal-half | steal-one | steal-one (steal-half WIP) |
| Load-balancing strategy for task parallelism (important for fine-grained parallelism) | global queue (GCC), steal-one (Intel/LLVM) | Adaptative steal-one/steal-half | steal-one | steal-one |
| Load-balancing strategy for data parallelism | eager splitting depending on iteration count and cpu count | lazy splitting depending on idle CPUs and workload | N/A | lazy splitting depending on idle CPUs and workload |
| Backoff worker when idle | yes (?) | yes | yes | yes |
| Backoff worker when awaiting task but no work | N/A | no | no | yes |
@ -30,6 +30,43 @@ Also neither supports putting awaiting threads to sleep when the future they wan
## Key features design
### Scheduler overhead/contention
#### Distributed task queues
To enable fine-grained parallelism, i.e. parallelizing tasks in the microseconds range, it's critical to reduce contention.
A global task queue will be hammered by N threads, leading to each thrashing each other caches.
In contrast, distributed task queues with random victim selection significantly reduce contention.
#### Memory allocation
Another source of overhead is the allocator, the worst case for allocators is allocation in a thread and deallocation in another, especially if the
allocating thread is always the same. Unfortunately this is common in producer-consumer workloads.
Besides multithreaded allocations/deallocations will trigger costly atomic-swaps and possibly fragmentation.
Minimizing allocations to the utmost will significantly help on fine-grained tasks.
- Weave solved that problem by having 2 levels of cache: a memory-pool for tasks and futures and a lookaside list that caches tasks to reduce further pressure on the memory pool.
- Nim-taskpools does not address this problem, it has an allocation overhead per tasks of 1 for std/tasks, 1 for the linked list that holds them, 1 for the result channel/flowvar.
Unlike GCC OpenMP which freezes on a fibonacci 40 benchmark, it can still finish but it's 20x slower than Weave.
- Constantine's threadpool solves the problem by making everything intrusive to a task: the task env, the future, the linked list.
In fact this solution is even faster than Weave's, probably due to significantly less page faults and cache misses.
Note that Weave has an even faster mode when futures don't escape their function by allocating them on the stack but without compiler support (heap allocation elision) that restricts the programs you can write.
### Load balancing for task parallelism
When a worker runs out of task, it steals from others' task queues.
They may steal one or multiple tasks.
In case of severe load imbalance, a steal-half policy can quickly rebalance workers queue to the global average.
This also helps reduce scheduler overhead by having logarithmically less steal attempts.
However, it may lead to significantly more rebalancing if workers generate few tasks.
Weave implements adaptative work-stealing with runtime selection of steal-one/steal-half
- Embracing Explicit Communication in Work-Stealing Runtime Systems.\
Andreas Prell, 2016\
https://epub.uni-bayreuth.de/id/eprint/2990/
Constantine's threadpool will likely adopt the same if the following task queues can be implemented with low overhead
- Non-Blocking Steal-Half Work Queues\
Danny Hendler, Nir Shavit, 2002\
https://www.cs.bgu.ac.il/~hendlerd/papers/p280-hendler.pdf
### Load-balancing for data parallelism
A critical issue in most (all?) runtimes used in HPC (OpenMP and Intel TBB in particular) is that they split their parallel for loop ahead of time.
@ -76,34 +113,35 @@ Instead we can have each thread start working and use backpressure to lazily eva
### Backoff workers when awaiting a future
This problem is quite tricky:
- For latency, and to potentially create more work ASAP, we want such a worker to follow through on the blocked future ASAP.
- For throughput, and because a scheduler is optimal only when greedy, we want an idle thread to take any available work.
- But what if the work then blocks that worker for 1s? This hurts latency and might lead to work starvation if the continuation would have created more work.
- For latency we want the worker to continue as soon as the future is completed. This might also create more work and expose more parallelism opportunities (in recursive divide-and-conquer algorithms for example)
- For throughput, and because a scheduler is optimal only when greedy (i.e. within 2x of the best schedule, see Cilk paper), we want an idle thread to take any available work ASAP.
- but what if that worker ends up with work that blocks it for a long-time? It may lead to work starvation.
- There is no robust, cross-platform API, to wake a specific thread awaiting on a futex or condition variable.
- The simplest design then would be to have an array of futexes, when backing-off sleep on those.
The issue then is that when offering work you have to scan that array to find a worker to wake.
Contrary to a idle worker, the waker is working so this scan hurts throughput and latency, and due to the many
atomics operations required, will completely thrash the cache of that worker.
- The even more simple is to wake-all on future completion
- Another potential data structure would be a concurrent sparse-set but designing concurrent data structures is difficult.
and locking would be expensive for an active worker
and locking would be expensive for an active worker.
- Alternative designs would be:
- Not sleep
- Having reserve threads:
Before sleeping when blocked on a future the thread wakes a reserve thread. As the number of hardware resources is maintained we maintain throughput.
The waiting thread is also immediately available when the future is completed since it cannot be stuck in work.
A well-behaved program will always have at least 1 thread making progress among N, so a reserve of size N is sufficient.
Unfortunately, this solution suffers for high latency of wakeups and/or kernel context-switch.
For fine-grained tasks it is quite impactful: heat benchmark is 7x slower, fibonacci 1.5x, depth-first-search 2.5x.
For actual workload, the elliptic curve sum reduction is also significantly slower.
- Using continuations:
We could just store a continuation in the future so the thread that completes the future picks up the continuation.
The solution in Constantine's threadpool to minimize latency and maximize throughput and avoid implementing another concurrent data structure is
having "reserve threads". Before sleeping when blocked on a future the thread wakes a reserve thread. This maintain throughput, and the thread is immediately available
as well when the future is completed. A well-behaved program will always have at least 1 thread making progress among N, so a reserve of size N is sufficient.
### Scheduler overhead/contention
To enable fine-grained parallelism, i.e. parallelizing tasks in the microseconds range, it's critical to reduce contention.
A global task queue will be hammered by N threads, leading to each thrashing each other caches.
In contrast, distributed task queues with random victim selection significantly reduce contention.
Another source of overhead is the allocator, the worst case for allocators is allocation in a thread and deallocation in another, especially if the
allocating thread is always the same. Unfortunately this is common in producer-consumer workloads.
Besides multithreaded allocations/deallocations will trigger costly atomic-swaps and possibly fragmentation.
Minimizing allocations to the utmost will significantly help on fine-grained tasks.
- Weave solved that problem by having 2 levels of cache: a memory-pool for tasks and futures and a lookaside list that caches tasks to reduce further pressure on the memory pool.
- Nim-taskpools does not address this problem, it has an allocation overhead per tasks of 1 for std/tasks, 1 for the linked list that holds them, 1 for the result channel/flowvar.
Unlike GCC OpenMP which freezes on a fibonacci 40 benchmark, it can still finish but it's 20x slower than Weave.
- Constantine's threadpool solves the problem by making everything intrusive to a task: the task env, the future, the linked list.
In fact this solution is even faster than Weave's, probably due to significantly less page faults and cache misses.
Note that Weave has an even faster mode when futures don't escape their function by allocating them on the stack but without compiler support (heap allocation elision) that restricts the programs you can write.
The workaround for now is just to work on any available tasks to maximize throughput then sleep.
This has 2 disadvantages:
- For coarse-grain parallelism, if the waiting thread steals a long task.
However, coarse-grain parallelism is easier to split into multiple tasks,
while exposing fine-grain parallelism and properly handling with its overhead is the usual problem.
- As we can't wake a specific thread on a common futex or condition variable,
when awaiting a future, a thread sleeps on a local one.
Hence if no work can be stolen, the waiter sleeps. But if work is then recreated, it stays sleeping as only the global futex is notified.
Note that with hyperthreading, the sibling thread(s) can still use the core fully so throughput might not be impacted.

View File

@ -41,7 +41,7 @@ proc spawnVoid(funcCall: NimNode, args, argsTy: NimNode, workerContext, schedule
let fnName = $fn
let withArgs = args.len > 0
let tpSpawn_closure = ident("ctt_tpSpawnVoidClosure_" & fnName)
var loopFnCall = newCall(fn)
var fnCall = newCall(fn)
let env = ident("ctt_tpSpawnVoidEnv_") # typed pointer to env
# Schedule
@ -53,10 +53,10 @@ proc spawnVoid(funcCall: NimNode, args, argsTy: NimNode, workerContext, schedule
if funcCall.len == 2:
# With only 1 arg, the tuple syntax doesn't construct a tuple
# let env = (123) # is an int
loopFnCall.add nnkDerefExpr.newTree(env)
fnCall.add nnkDerefExpr.newTree(env)
else: # This handles the 0 arg case as well
for i in 1 ..< funcCall.len:
loopFnCall.add nnkBracketExpr.newTree(
fnCall.add nnkBracketExpr.newTree(
env,
newLit i-1)
@ -65,7 +65,7 @@ proc spawnVoid(funcCall: NimNode, args, argsTy: NimNode, workerContext, schedule
proc `tpSpawn_closure`(env: pointer) {.nimcall, gcsafe, raises: [].} =
when bool(`withArgs`):
let `env` = cast[ptr `argsTy`](env)
`loopFnCall`
`fnCall`
# Create the task
result.add quote do:
@ -88,7 +88,7 @@ proc spawnRet(funcCall: NimNode, retTy, args, argsTy: NimNode, workerContext, sc
let fn = funcCall[0]
let fnName = $fn
let tpSpawn_closure = ident("ctt_tpSpawnRetClosure_" & fnName)
var loopFnCall = newCall(fn)
var fnCall = newCall(fn)
let env = ident("ctt_tpSpawnRetEnv_") # typed pointer to env
# tasks have no return value.
@ -117,12 +117,12 @@ proc spawnRet(funcCall: NimNode, retTy, args, argsTy: NimNode, workerContext, sc
# env stores | ptr Task | result | arg₀ | arg₁ | ... | argₙ
# so arguments starts at env[2] in the wrapping funcCall functions
for i in 1 ..< funcCall.len:
loopFnCall.add nnkBracketExpr.newTree(env, newLit i+1)
fnCall.add nnkBracketExpr.newTree(env, newLit i+1)
result.add quote do:
proc `tpSpawn_closure`(env: pointer) {.nimcall, gcsafe, raises: [].} =
let `env` = cast[ptr `envParamsTy`](env)
let res = `loopFnCall`
let res = `fnCall`
readyWith(`env`[0], res)
# Regenerate fresh ident, retTy has been tagged as a function call param

View File

@ -20,13 +20,113 @@ import
./instrumentation,
./primitives/barriers,
./parallel_offloading,
../allocs, ../bithacks,
../../../helpers/prng_unsafe
../allocs, ../bithacks
export
# flowvars
Flowvar, isSpawned, isReady, sync
# ############################################################
# #
# RNG #
# #
# ############################################################
#
# We don't need a CSPRNG, the RNG is to select a random victim when work-stealing
#
# - Scrambled Linear Pseudorandom Number Generators
# Blackman, Vigna, 2021
# https://vigna.di.unimi.it/ftp/papers/ScrambledLinear.pdf
# https://prng.di.unimi.it/
type WorkStealingRng = object
## This is the state of a Xoshiro256+ PRNG
## It is used for work-stealing. The low bits have low linear complexity.
## So we use the high 32 bits to seed our pseudo random walk of thread taskqueues.
s: array[4, uint64]
func splitMix64(state: var uint64): uint64 =
state += 0x9e3779b97f4a7c15'u64
result = state
result = (result xor (result shr 30)) * 0xbf58476d1ce4e5b9'u64
result = (result xor (result shr 27)) * 0xbf58476d1ce4e5b9'u64
result = result xor (result shr 31)
func seed(rng: var WorkStealingRng, x: SomeInteger) =
## Seed the random number generator with a fixed seed
var sm64 = uint64(x)
rng.s[0] = splitMix64(sm64)
rng.s[1] = splitMix64(sm64)
rng.s[2] = splitMix64(sm64)
rng.s[3] = splitMix64(sm64)
func rotl(x: uint64, k: static int): uint64 {.inline.} =
return (x shl k) or (x shr (64 - k))
template `^=`(x: var uint64, y: uint64) =
x = x xor y
func nextU32(rng: var WorkStealingRng): uint32 =
## Compute a random uint32
# Need to use the high bits
result = uint32((rng.s[0] + rng.s[3]) shr 32)
let t = rng.s[1] shl 17
rng.s[2] ^= rng.s[0];
rng.s[3] ^= rng.s[1];
rng.s[1] ^= rng.s[2];
rng.s[0] ^= rng.s[3];
rng.s[2] ^= t;
rng.s[3] = rotl(rng.s[3], 45);
iterator pseudoRandomPermutation(randomSeed: uint32, maxExclusive: int32): int32 =
## Create (low-quality) pseudo-random permutations for [0, max)
# Design considerations and randomness constraint for work-stealing, see docs/random_permutations.md
#
# Linear Congruential Generator: https://en.wikipedia.org/wiki/Linear_congruential_generator
#
# Xₙ₊₁ = aXₙ+c (mod m) generates all random number mod m without repetition
# if and only if (Hull-Dobell theorem):
# 1. c and m are coprime
# 2. a-1 is divisible by all prime factors of m
# 3. a-1 is divisible by 4 if m is divisible by 4
#
# Alternative 1. By choosing a=1, all conditions are easy to reach.
#
# The randomness quality is not important besides distributing potential contention,
# i.e. randomly trying thread i, then i+1, then i+n-1 (mod n) is good enough.
#
# Assuming 6 threads, co-primes are [1, 5], which means the following permutations
# assuming we start with victim 0:
# - [0, 1, 2, 3, 4, 5]
# - [0, 5, 4, 3, 2, 1]
# While we don't care much about randoness quality, it's a bit disappointing.
#
# Alternative 2. We can choose m to be the next power of 2, meaning all odd integers are co-primes,
# consequently:
# - we don't need a GCD to find the coprimes
# - we don't need to cache coprimes, removing a cache-miss potential
# - a != 1, so we now have a multiplicative factor, which makes output more "random looking".
# n and (m-1) <=> n mod m, if m is a power of 2
let maxExclusive = cast[uint32](maxExclusive)
let M = maxExclusive.nextPowerOfTwo_vartime()
let c = (randomSeed and ((M shr 1) - 1)) * 2 + 1 # c odd and c ∈ [0, M)
let a = (randomSeed and ((M shr 2) - 1)) * 4 + 1 # a-1 divisible by 2 (all prime factors of m) and by 4 if m divisible by 4
let mask = M-1 # for mod M
let start = randomSeed and mask
var x = start
while true:
if x < maxExclusive:
yield cast[int32](x)
x = (a*x + c) and mask # ax + c (mod M), with M power of 2
if x == start:
break
# ############################################################
# #
# Types #
@ -46,35 +146,25 @@ type
threadpool: Threadpool
# Tasks
taskqueue: ptr Taskqueue # owned task queue
taskqueue: ptr Taskqueue # owned task queue
currentTask: ptr Task
# Synchronization
localBackoff: EventNotifier # Multi-Producer Single-Consumer backoff
signal: ptr Signal # owned signal
localBackoff: EventNotifier # Multi-Producer Single-Consumer backoff
signal: ptr Signal # owned signal
# Thefts
rng: RngState # RNG state to select victims
# Adaptative theft policy
stealHalf: bool
recentTasks: int32
recentThefts: int32
recentTheftsAdaptative: int32
recentLeaps: int32
rng: WorkStealingRng # RNG state to select victims
Threadpool* = ptr object
barrier: SyncBarrier # Barrier for initialization and teardown
# -- align: 64
globalBackoff: EventCount # Multi-Producer Multi-Consumer backoff
reserveBackoff: EventCount
# -- align: 64
numThreads*{.align: 64.}: int32 # N regular workers + N reserve workers
workerQueues: ptr UncheckedArray[Taskqueue] # size 2N
workers: ptr UncheckedArray[Thread[(Threadpool, WorkerID)]] # size 2N
workerSignals: ptr UncheckedArray[Signal] # size 2N
# -- align: 64
numIdleThreadsAwaitingFutures*{.align: 64.}: Atomic[int32]
numThreads*{.align: 64.}: int32 # N regular workers
workerQueues: ptr UncheckedArray[Taskqueue] # size N
workers: ptr UncheckedArray[Thread[(Threadpool, WorkerID)]] # size N
workerSignals: ptr UncheckedArray[Signal] # size N
# ############################################################
# #
@ -85,14 +175,21 @@ type
var workerContext {.threadvar.}: WorkerContext
## Thread-local Worker context
## We assume that a threadpool has exclusive ownership
##
## TODO: if we want to allow non-conflicting threadpools instantiated by the same thread:
## - only the threadID per threadpool should be stored and the associated
## context should be stored at the Threadpool-level.
## Then we need to associate threadpool pointer to workerID in that threadpool
## - Or we completely remove thread-local data
## and use a Minimal Perfect Hash Function.
## We can approximate a threadID by retrieving the address of a dummy thread-local variable.
## - Or we sort threadID and use binary search
proc setupWorker() =
proc setupWorker(ctx: var WorkerContext) =
## Initialize the thread-local context of a worker
## Requires the ID and threadpool fields to be initialized
template ctx: untyped = workerContext
preCondition: not ctx.threadpool.isNil()
preCondition: 0 <= ctx.id and ctx.id < 2*ctx.threadpool.numThreads
preCondition: 0 <= ctx.id and ctx.id < ctx.threadpool.numThreads
preCondition: not ctx.threadpool.workerQueues.isNil()
preCondition: not ctx.threadpool.workerSignals.isNil()
@ -111,19 +208,12 @@ proc setupWorker() =
# Init
ctx.taskqueue[].init(initialCapacity = 32)
# Adaptative theft policy
ctx.recentTasks = 0
ctx.recentThefts = 0
ctx.recentTheftsAdaptative = 0
ctx.recentLeaps = 0
proc teardownWorker() =
proc teardownWorker(ctx: var WorkerContext) =
## Cleanup the thread-local context of a worker
workerContext.localBackoff.`=destroy`()
workerContext.taskqueue[].teardown()
ctx.localBackoff.`=destroy`()
ctx.taskqueue[].teardown()
proc eventLoopRegular(ctx: var WorkerContext) {.raises:[], gcsafe.}
proc eventLoopReserve(ctx: var WorkerContext) {.raises:[], gcsafe.}
proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.}
proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises: [].} =
## On the start of the threadpool workers will execute this
@ -137,15 +227,12 @@ proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises
ctx.id = params.id
ctx.threadpool = params.threadpool
setupWorker()
ctx.setupWorker()
# 1 matching barrier in Threadpool.new() for root thread
discard params.threadpool.barrier.wait()
if ctx.id < ctx.threadpool.numThreads:
ctx.eventLoopRegular()
else:
ctx.eventLoopReserve()
ctx.eventLoop()
debugTermination:
log(">>> Worker %3d shutting down <<<\n", ctx.id)
@ -153,7 +240,7 @@ proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises
# 1 matching barrier in threadpool.shutdown() for root thread
discard params.threadpool.barrier.wait()
teardownWorker()
ctx.teardownWorker()
# ############################################################
# #
@ -167,12 +254,11 @@ const RootTask = cast[ptr Task](0xEFFACED0)
proc run*(ctx: var WorkerContext, task: ptr Task) {.raises:[].} =
## Run a task, frees it if it is not owned by a Flowvar
let suspendedTask = workerContext.currentTask
let suspendedTask = ctx.currentTask
ctx.currentTask = task
debug: log("Worker %3d: running task 0x%.08x (previous: 0x%.08x, %d pending, thiefID %d)\n", ctx.id, task, suspendedTask, ctx.taskqueue[].peek(), task.thiefID)
task.fn(task.env.addr)
debug: log("Worker %3d: completed task 0x%.08x (%d pending)\n", ctx.id, task, ctx.taskqueue[].peek())
ctx.recentTasks += 1
ctx.currentTask = suspendedTask
if not task.hasFuture:
freeHeap(task)
@ -193,10 +279,6 @@ proc schedule(ctx: var WorkerContext, tn: ptr Task, forceWake = false) {.inline.
# Instead of notifying every time a task is scheduled, we notify
# only when the worker queue is empty. This is a good approximation
# of starvation in work-stealing.
# - Tzannes, A., G. C. Caragea, R. Barua, and U. Vishkin.
# Lazy binary-splitting: a run-time adaptive work-stealing scheduler.
# In PPoPP 10, Bangalore, India, January 2010. ACM, pp. 179190.
# https://user.eng.umd.edu/~barua/ppopp164.pdf
let wasEmpty = ctx.taskqueue[].peek() == 0
ctx.taskqueue[].push(tn)
if forceWake or wasEmpty:
@ -259,7 +341,7 @@ iterator splitUpperRanges(
ctx.id, task, task.loopStepsLeft, curLoopIndex, task.loopStart, task.loopStop, task.loopStride, numIdle)
# Send a chunk of work to all idle workers + ourselves
let availableWorkers = numIdle + 1
let availableWorkers = cast[int](numIdle + 1)
let baseChunkSize = task.loopStepsLeft div availableWorkers
let cutoff = task.loopStepsLeft mod availableWorkers
@ -364,7 +446,7 @@ proc loadBalanceLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int,
if ctx.taskqueue[].peek() == 0:
let waiters = ctx.threadpool.globalBackoff.getNumWaiters()
# We assume that the worker that scheduled the task will work on it. I.e. idleness is underestimated.
let numIdle = waiters.preSleep + waiters.committedSleep + int32(task.isFirstIter)
let numIdle = waiters.preSleep + waiters.committedSleep + cast[int32](task.isFirstIter)
if numIdle > 0:
ctx.splitAndDispatchLoop(task, curLoopIndex, numIdle)
backoff.decrease()
@ -376,13 +458,6 @@ proc loadBalanceLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int,
backoff.nextCheck += task.loopStride shl backoff.windowLogSize
template parallelForWrapper(idx: untyped{ident}, loopBody: untyped): untyped =
## To be called within a loop task
## Gets the loop bounds and iterate the over them
## Also polls runtime status for dynamic loop splitting
##
## Loop prologue, epilogue,
## remoteAccum, resultTy and returnStmt
## are unused
block:
let this = workerContext.currentTask
var backoff = BalancerBackoff(
@ -404,9 +479,6 @@ template parallelReduceWrapper(
idx: untyped{ident},
prologue, loopBody, mergeLocalWithRemote, epilogue,
remoteTaskAwaitable, awaitableType: untyped): untyped =
## To be called within a loop task
## Gets the loop bounds and iterate the over them
## Also polls runtime status for dynamic loop splitting
block:
let this = workerContext.currentTask
var backoff = BalancerBackoff(
@ -447,111 +519,16 @@ template parallelReduceWrapper(
# #
# ############################################################
iterator pseudoRandomPermutation(randomSeed: uint32, maxExclusive: int32): int32 =
## Create a (low-quality) pseudo-random permutation from [0, max)
# Design considerations and randomness constraint for work-stealing, see docs/random_permutations.md
#
# Linear Congruential Generator: https://en.wikipedia.org/wiki/Linear_congruential_generator
#
# Xₙ₊₁ = aXₙ+c (mod m) generates all random number mod m without repetition
# if and only if (Hull-Dobell theorem):
# 1. c and m are coprime
# 2. a-1 is divisible by all prime factors of m
# 3. a-1 is divisible by 4 if m is divisible by 4
#
# Alternative 1. By choosing a=1, all conditions are easy to reach.
#
# The randomness quality is not important besides distributing potential contention,
# i.e. randomly trying thread i, then i+1, then i+n-1 (mod n) is good enough.
#
# Assuming 6 threads, co-primes are [1, 5], which means the following permutations
# assuming we start with victim 0:
# - [0, 1, 2, 3, 4, 5]
# - [0, 5, 4, 3, 2, 1]
# While we don't care much about randoness quality, it's a bit disappointing.
#
# Alternative 2. We can choose m to be the next power of 2, meaning all odd integers are co-primes,
# consequently:
# - we don't need a GCD to find the coprimes
# - we don't need to cache coprimes, removing a cache-miss potential
# - a != 1, so we now have a multiplicative factor, which makes output more "random looking".
# n and (m-1) <=> n mod m, if m is a power of 2
let maxExclusive = cast[uint32](maxExclusive)
let M = maxExclusive.nextPowerOfTwo_vartime()
let c = (randomSeed and ((M shr 1) - 1)) * 2 + 1 # c odd and c ∈ [0, M)
let a = (randomSeed and ((M shr 2) - 1)) * 4 + 1 # a-1 divisible by 2 (all prime factors of m) and by 4 if m divisible by 4
let mask = M-1 # for mod M
let start = randomSeed and mask
var x = start
while true:
if x < maxExclusive:
yield cast[int32](x)
x = (a*x + c) and mask # ax + c (mod M), with M power of 2
if x == start:
break
proc tryStealOne(ctx: var WorkerContext): ptr Task =
## Try to steal a task.
let seed = ctx.rng.next().uint32
for targetId in seed.pseudoRandomPermutation(2*ctx.threadpool.numThreads):
let seed = ctx.rng.nextU32()
for targetId in seed.pseudoRandomPermutation(ctx.threadpool.numThreads):
if targetId == ctx.id:
continue
let stolenTask = ctx.id.steal(ctx.threadpool.workerQueues[targetId])
if not stolenTask.isNil():
ctx.recentThefts += 1
# Theft successful, there might be more work for idle threads, wake one
ctx.threadpool.globalBackoff.wake()
return stolenTask
return nil
proc updateStealStrategy(ctx: var WorkerContext) =
## Estimate work-stealing efficiency during the last interval
## If the value is below a threshold, switch strategies
const StealAdaptativeInterval = 25
if ctx.recentTheftsAdaptative == StealAdaptativeInterval:
let recentTheftsNonAdaptative = ctx.recentThefts - ctx.recentTheftsAdaptative
let adaptativeTasks = ctx.recentTasks - ctx.recentLeaps - recentTheftsNonAdaptative
let ratio = adaptativeTasks.float32 / StealAdaptativeInterval.float32
if ctx.stealHalf and ratio < 2.0f:
# Tasks stolen are coarse-grained, steal only one to reduce re-steal
ctx.stealHalf = false
elif not ctx.stealHalf and ratio == 1.0f:
# All tasks processed were stolen tasks, we need to steal many at a time
ctx.stealHalf = true
# Reset interval
ctx.recentTasks = 0
ctx.recentThefts = 0
ctx.recentTheftsAdaptative = 0
ctx.recentLeaps = 0
proc tryStealAdaptative(ctx: var WorkerContext): ptr Task =
## Try to steal one or many tasks, depending on load
# TODO: while running 'threadpool/examples/e02_parallel_pi.nim'
# stealHalf can error out in tasks_flowvars.nim with:
# "precondition not task.completed.load(moAcquire)"
ctx.stealHalf = false
# ctx.updateStealStrategy()
let seed = ctx.rng.next().uint32
for targetId in seed.pseudoRandomPermutation(2*ctx.threadpool.numThreads):
if targetId == ctx.id:
continue
let stolenTask =
if ctx.stealHalf: ctx.id.stealHalf(ctx.taskqueue[], ctx.threadpool.workerQueues[targetId])
else: ctx.id.steal(ctx.threadpool.workerQueues[targetId])
if not stolenTask.isNil():
ctx.recentThefts += 1
ctx.recentTheftsAdaptative += 1
# Theft successful, there might be more work for idle threads, wake one
ctx.threadpool.globalBackoff.wake()
return stolenTask
@ -575,106 +552,45 @@ proc tryLeapfrog(ctx: var WorkerContext, awaitedTask: ptr Task): ptr Task =
if thiefID != SentinelThief:
break
cpuRelax()
ascertain: 0 <= thiefID and thiefID < 2*ctx.threadpool.numThreads
ascertain: 0 <= thiefID and thiefID < ctx.threadpool.numThreads
# Leapfrogging is used when completing a future, so steal only one task
# and don't leave tasks stranded in our queue.
let leapTask = ctx.id.steal(ctx.threadpool.workerQueues[thiefID])
if not leapTask.isNil():
ctx.recentLeaps += 1
# Theft successful, there might be more work for idle threads, wake one
ctx.threadpool.globalBackoff.wake()
return leapTask
return nil
proc eventLoopRegular(ctx: var WorkerContext) {.raises:[], gcsafe.} =
proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.} =
## Each worker thread executes this loop over and over.
while true:
# 1. Pick from local queue
debug: log("Worker %3d: eventLoopRegular 1 - searching task from local queue\n", ctx.id)
debug: log("Worker %3d: eventLoop 1 - searching task from local queue\n", ctx.id)
while (var task = ctx.taskqueue[].pop(); not task.isNil):
debug: log("Worker %3d: eventLoopRegular 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask)
debug: log("Worker %3d: eventLoop 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask)
ctx.run(task)
# 2. Run out of tasks, become a thief
debug: log("Worker %3d: eventLoopRegular 2 - becoming a thief\n", ctx.id)
debug: log("Worker %3d: eventLoop 2 - becoming a thief\n", ctx.id)
let ticket = ctx.threadpool.globalBackoff.sleepy()
if (var stolenTask = ctx.tryStealAdaptative(); not stolenTask.isNil):
if (var stolenTask = ctx.tryStealOne(); not stolenTask.isNil):
# We manage to steal a task, cancel sleep
ctx.threadpool.globalBackoff.cancelSleep()
# 2.a Run task
debug: log("Worker %3d: eventLoopRegular 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask)
debug: log("Worker %3d: eventLoop 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask)
ctx.run(stolenTask)
elif ctx.signal.terminate.load(moAcquire):
# 2.b Threadpool has no more tasks and we were signaled to terminate
ctx.threadpool.globalBackoff.cancelSleep()
debugTermination: log("Worker %3d: eventLoopRegular 2.b - terminated\n", ctx.id)
debugTermination: log("Worker %3d: eventLoop 2.b - terminated\n", ctx.id)
break
else:
# 2.c Park the thread until a new task enters the threadpool
debug: log("Worker %3d: eventLoopRegular 2.b - sleeping\n", ctx.id)
debug: log("Worker %3d: eventLoop 2.b - sleeping\n", ctx.id)
ctx.threadpool.globalBackoff.sleep(ticket)
debug: log("Worker %3d: eventLoopRegular 2.b - waking\n", ctx.id)
proc eventLoopReserve(ctx: var WorkerContext) {.raises:[], gcsafe.} =
## A reserve worker is a relay when a thread is stuck awaiting a future completion.
## This ensure those threads are available as soon as the future completes, minimizing latency
## while ensuring the runtime uses all available hardware resources, maximizing throughput.
template reserveSleepCheck: untyped =
let ticket = ctx.threadpool.reserveBackoff.sleepy()
let (reservePlanningSleep, reserveCommittedSleep) = ctx.threadpool.reserveBackoff.getNumWaiters()
let numActiveReservists = ctx.threadpool.numThreads - (reservePlanningSleep-1 + reserveCommittedSleep) # -1 we don't want to count ourselves
if ctx.signal.terminate.load(moAcquire): # If terminated, we leave everything as-is, the regular workers will finish
ctx.threadpool.reserveBackoff.cancelSleep()
debugTermination: log("Worker %3d: reserveSleepCheck - terminated\n", ctx.id)
return
elif numActiveReservists > ctx.threadpool.numIdleThreadsAwaitingFutures.load(moAcquire):
ctx.threadpool.globalBackoff.wake() # In case we were just woken up for a task or we have tasks in our queue, pass the torch
debug: log("Worker %3d: reserveSleepCheck - going to sleep on reserve backoff\n", ctx.id)
ctx.threadpool.reserveBackoff.sleep(ticket)
debug: log("Worker %3d: reserveSleepCheck - waking on reserve backoff\n", ctx.id)
else:
ctx.threadpool.reserveBackoff.cancelSleep()
while true:
# 1. Pick from local queue
debug: log("Worker %3d: eventLoopReserve 1 - searching task from local queue\n", ctx.id)
while true:
reserveSleepCheck()
var task = ctx.taskqueue[].pop()
if task.isNil():
break
debug: log("Worker %3d: eventLoopReserve 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask)
ctx.run(task)
# 2. Run out of tasks, become a thief
debug: log("Worker %3d: eventLoopReserve 2 - becoming a thief\n", ctx.id)
let ticket = ctx.threadpool.globalBackoff.sleepy() # If using a reserve worker was necessary, sleep on the backoff for active threads
if (var stolenTask = ctx.tryStealAdaptative(); not stolenTask.isNil):
# We manage to steal a task, cancel sleep
ctx.threadpool.globalBackoff.cancelSleep()
# 2.a Run task
debug: log("Worker %3d: eventLoopReserve 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask)
ctx.run(stolenTask)
elif ctx.signal.terminate.load(moAcquire):
# 2.b Threadpool has no more tasks and we were signaled to terminate
ctx.threadpool.globalBackoff.cancelSleep()
debugTermination: log("Worker %3d: eventLoopReserve 2.b - terminated\n", ctx.id)
break
else:
# 2.c Park the thread until a new task enters the threadpool.
# It is intentionally parked with all active threads as long as a reservist is needed
let (reservePlanningSleep, reserveCommittedSleep) = ctx.threadpool.reserveBackoff.getNumWaiters()
let numActiveReservists = ctx.threadpool.numThreads - (reservePlanningSleep-1 + reserveCommittedSleep) # -1 we don't want to count ourselves
if numActiveReservists > ctx.threadpool.numIdleThreadsAwaitingFutures.load(moAcquire):
ctx.threadpool.globalBackoff.cancelSleep()
continue
debug: log("Worker %3d: eventLoopReserve 2.b - sleeping on active threads backoff\n", ctx.id)
ctx.threadpool.globalBackoff.sleep(ticket)
debug: log("Worker %3d: eventLoopReserve 2.b - waking on active threads backoff\n", ctx.id)
debug: log("Worker %3d: eventLoop 2.b - waking\n", ctx.id)
# ############################################################
# #
@ -682,9 +598,6 @@ proc eventLoopReserve(ctx: var WorkerContext) {.raises:[], gcsafe.} =
# #
# ############################################################
template isRootTask(task: ptr Task): bool =
task == RootTask
proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
## Eagerly complete an awaited FlowVar
template ctx: untyped = workerContext
@ -712,47 +625,67 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
debug: log("Worker %3d: sync 1 - future ready, exiting\n", ctx.id)
return
## 2. We run out-of-tasks or out-of-direct-child of our current awaited task
## So the task is bottlenecked by dependencies in other threads,
## hence we abandon our enqueued work and steal.
##
## See also
## - Proactive work-stealing for futures
## Kyle Singer, Yifan Xu, I-Ting Angelina Lee, 2019
## https://dl.acm.org/doi/10.1145/3293883.3295735
# 2. We run out-of-tasks or out-of-direct-child of our current awaited task
# So the task is bottlenecked by dependencies in other threads,
# hence we abandon our enqueued work and steal.
#
# See also
# - Proactive work-stealing for futures
# Kyle Singer, Yifan Xu, I-Ting Angelina Lee, 2019
# https://dl.acm.org/doi/10.1145/3293883.3295735
#
# Design tradeoffs
# ----------------
#
# At this point, we have significant design decisions:
# - Do we steal from other workers in hope we advance our awaited task?
# Note: A greedy scheduler (no worker idle as long as there is a runnable task)
# is at most 2x slower than the optimal schedule (proof in Cilk paper)
# - Do we advance our own queue for tasks that are not child of our awaited tasks?
# - Do we park instead of working on unrelated task?
# Note: With hyperthreading, real hardware resources are 2x less than the reported number of cores.
# Hence parking might free contended memory bandwitdh or execution ports.
# - Do we just not sleep, potentially wasting energy?
#
# - If we work, we maximize throughput, but we increase latency to handle the future's continuation.
# If that continuation would have created more parallel work, we would actually have restricted parallelism.
# - If we park with tasks left, we minimize latency on the continuation, but we don't use hardware resources fully,
# and there are CPUs without hyperthreading (on ARM for example)
# - If we park when no tasks are left, if more work is enqueued, as we don't park on the global backoff we will miss it.
# Note: we don't park on the global backoff, because it's not possible to control which thread to wake with it (or we wake all).
# - Wakeup latency is high, having "reserve threads" that take over the active slot of the awaiting thread
# in theory maintains throughput and minimize the latency of the future's continuation
# but in practice, performance can worsen significantly on fine-grained parallelism.
debug: log("Worker %3d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x)\n", ctx.id, ctx.currentTask)
while not isFutReady():
if (let leapTask = ctx.tryLeapfrog(fv.task); not leapTask.isNil):
# Leapfrogging, the thief had an empty queue, hence if there are tasks in its queue, it's generated by our blocked task.
# Help the thief clear those, as if it did not finish, it's likely blocked on those children tasks.
debug: log("Worker %3d: sync 2.1 - leapfrog task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, leapTask, leapTask.parent, ctx.currentTask)
ctx.run(leapTask)
elif (let stolenTask = ctx.tryStealOne(); not stolenTask.isNil):
# We stole a task, we hope we advance our awaited task.
debug: log("Worker %3d: sync 2.2 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask)
ctx.run(stolenTask)
elif (let ownTask = ctx.taskqueue[].pop(); not ownTask.isNil):
# We advance our own queue, this increases global throughput but may impact latency on the awaited task.
debug: log("Worker %3d: sync 2.3 - couldn't steal, running own task\n", ctx.id)
ctx.run(ownTask)
else:
# At this point, we have significant design decisions:
# - Do we steal from other workers in hope we advance our awaited task?
# - Do we advance our own queue for tasks that are not child of our awaited tasks?
# - Do we park instead of working on unrelated task. With hyperthreading that would actually still leave the core busy enough?
#
# - If we work, we maximize throughput, but we increase latency to handle the future's continuation.
# If that future creates more parallel work, we would actually have restricted parallelism.
# - If we park, we minimize latency, but we don't use the full hardware resources, and there are CPUs without hyperthreading (on ARM for example)
# Furthermore, a work-stealing scheduler is within 2x an optimal scheduler if it is greedy, i.e., as long as there is enough work, all cores are used.
#
# The solution chosen is to wake a reserve thread, keeping hardware offered/throughput constant. And put the awaiting thread to sleep.
# Nothing to do, we park.
# - On today's hyperthreaded systems, this might reduce contention on a core resources like memory caches and execution ports
# - If more work is created, we won't be notified as we need to park on a dedicated notifier for precise wakeup when future is ready
ctx.localBackoff.prepareToPark()
discard ctx.threadpool.numIdleThreadsAwaitingFutures.fetchAdd(1, moRelease)
ctx.threadpool.reserveBackoff.wake()
var expected = (ptr EventNotifier)(nil)
if compareExchange(fv.task.waiter, expected, desired = ctx.localBackoff.addr, moAcquireRelease):
ctx.localBackoff.park()
discard ctx.threadpool.numIdleThreadsAwaitingFutures.fetchSub(1, moRelease)
proc syncAll*(tp: Threadpool) {.raises: [].} =
## Blocks until all pending tasks are completed
## This MUST only be called from
## the root scope that created the threadpool
## This MUST only be called from the root scope that created the threadpool
template ctx: untyped = workerContext
debugTermination:
@ -769,17 +702,11 @@ proc syncAll*(tp: Threadpool) {.raises: [].} =
ctx.run(task)
# 2. Help other threads
debugTermination:
let regular = tp.globalBackoff.getNumWaiters()
let reserve = tp.reserveBackoff.getNumWaiters()
log("Worker %3d: syncAll 2 - becoming a thief - (preSleep: %d, sleeping %d) regular and (preSleep: %d, sleeping %d) reserve workers\n",
ctx.id, regular.preSleep, regular.committedSleep, reserve.preSleep, reserve.committedSleep)
if (var stolenTask = ctx.tryStealAdaptative(); not stolenTask.isNil):
if (var stolenTask = ctx.tryStealOne(); not stolenTask.isNil):
# 2.a We stole some task
debug: log("Worker %3d: syncAll 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask)
ctx.run(stolenTask)
elif tp.reserveBackoff.getNumWaiters() == (0'i32, tp.numThreads) and
tp.globalBackoff.getNumWaiters() == (0'i32, tp.numThreads-1): # Don't count ourselves
elif tp.globalBackoff.getNumWaiters() == (0'i32, tp.numThreads-1): # Don't count ourselves
# 2.b all threads besides the current are parked
debugTermination: log("Worker %3d: syncAll 2.b - termination, all other threads sleeping\n", ctx.id)
break
@ -810,26 +737,23 @@ proc new*(T: type Threadpool, numThreads = countProcessors()): T {.raises: [Reso
type TpObj = typeof(default(Threadpool)[]) # due to C import, we need a dynamic sizeof
var tp = allocHeapUncheckedAlignedPtr(Threadpool, sizeof(TpObj), alignment = 64)
tp.barrier.init(2*numThreads.uint32)
tp.barrier.init(numThreads.uint32)
tp.globalBackoff.initialize()
tp.reserveBackoff.initialize()
tp.numThreads = numThreads.int32
tp.numIdleThreadsAwaitingFutures.store(0, moRelaxed)
# Allocate for `numThreads` regular workers and `numTHreads` reserve workers
tp.workerQueues = allocHeapArrayAligned(Taskqueue, 2*numThreads, alignment = 64)
tp.workers = allocHeapArrayAligned(Thread[(Threadpool, WorkerID)], 2*numThreads, alignment = 64)
tp.workerSignals = allocHeapArrayAligned(Signal, 2*numThreads, alignment = 64)
tp.workerQueues = allocHeapArrayAligned(Taskqueue, numThreads, alignment = 64)
tp.workers = allocHeapArrayAligned(Thread[(Threadpool, WorkerID)], numThreads, alignment = 64)
tp.workerSignals = allocHeapArrayAligned(Signal, numThreads, alignment = 64)
# Setup master thread
workerContext.id = 0
workerContext.threadpool = tp
# Start worker threads
for i in 1 ..< 2*numThreads:
for i in 1 ..< numThreads:
createThread(tp.workers[i], workerEntryFn, (tp, WorkerID(i)))
# Root worker
setupWorker()
workerContext.setupWorker()
# Root task, this is a sentinel task that is never called.
workerContext.currentTask = RootTask
@ -842,7 +766,7 @@ proc cleanup(tp: var Threadpool) {.raises: [].} =
## Cleanup all resources allocated by the threadpool
preCondition: workerContext.currentTask.isRootTask()
for i in 1 ..< 2*tp.numThreads:
for i in 1 ..< tp.numThreads:
joinThread(tp.workers[i])
tp.workerSignals.freeHeapAligned()
@ -859,16 +783,15 @@ proc shutdown*(tp: var Threadpool) {.raises:[].} =
tp.syncAll()
# Signal termination to all threads
for i in 0 ..< 2*tp.numThreads:
for i in 0 ..< tp.numThreads:
tp.workerSignals[i].terminate.store(true, moRelease)
tp.globalBackoff.wakeAll()
tp.reserveBackoff.wakeAll()
# 1 matching barrier in workerEntryFn
discard tp.barrier.wait()
teardownWorker()
workerContext.teardownWorker()
tp.cleanup()
# Delete dummy task
@ -882,12 +805,6 @@ proc shutdown*(tp: var Threadpool) {.raises:[].} =
# #
# ############################################################
proc getThreadID(tp: Threadpool): int {.inline, used.} =
## Returns the worker local ID.
## This is a debug proc for logging purposes
## The threadpool needs to be imported with {.all.} pragma
workerContext.id
# Task parallel API
# ---------------------------------------------
@ -904,9 +821,6 @@ macro spawn*(tp: Threadpool, fnCall: typed): untyped =
# Data parallel API
# ---------------------------------------------
# TODO: we can fuse parallelFor and parallelForStrided
# in a single proc once {.experimental: "flexibleOptionalParams".}
# is not experimental anymore
macro parallelFor*(tp: Threadpool, loopParams: untyped, body: untyped): untyped =
## Parallel for loop.