diff --git a/constantine.nimble b/constantine.nimble index c7393ef..41199e7 100644 --- a/constantine.nimble +++ b/constantine.nimble @@ -243,6 +243,8 @@ const testDescNvidia: seq[string] = @[ const testDescThreadpool: seq[string] = @[ "constantine/platforms/threadpool/examples/e01_simple_tasks.nim", "constantine/platforms/threadpool/examples/e02_parallel_pi.nim", + "constantine/platforms/threadpool/examples/e03_parallel_for.nim", + "constantine/platforms/threadpool/examples/e04_parallel_reduce.nim", # "constantine/platforms/threadpool/benchmarks/bouncing_producer_consumer/threadpool_bpc.nim", # Need timing not implemented on Windows "constantine/platforms/threadpool/benchmarks/dfs/threadpool_dfs.nim", "constantine/platforms/threadpool/benchmarks/fibonacci/threadpool_fib.nim", diff --git a/constantine/platforms/primitives.nim b/constantine/platforms/primitives.nim index 2990714..6837f7e 100644 --- a/constantine/platforms/primitives.nim +++ b/constantine/platforms/primitives.nim @@ -34,6 +34,10 @@ export allocs, compiler_optim_hints +# Note: +# - cpuinfo_x86 initialize globals with following CPU features detection. +# This will impact benchmarks that do not need it, such as the threadpool. + when X86 and GCC_Compatible: import isa/[cpuinfo_x86, macro_assembler_x86] export cpuinfo_x86, macro_assembler_x86 @@ -69,7 +73,7 @@ func ceilDiv_vartime*(a, b: auto): auto {.inline.} = ## ceil division, to be used only on length or at compile-time ## ceil(a / b) # "LengthInDigits: static int" doesn't match "int" - # if "SomeInteger" is used instead of "autoi" + # if "SomeInteger" is used instead of "auto" (a + b - 1) div b # ############################################################ diff --git a/constantine/platforms/threadpool/README.md b/constantine/platforms/threadpool/README.md index 1136e86..9e6b5d2 100644 --- a/constantine/platforms/threadpool/README.md +++ b/constantine/platforms/threadpool/README.md @@ -16,13 +16,11 @@ This threadpool will desirable properties are: and not dealing with threadpool contention, latencies and overheads. Compared to [Weave](https://github.com/mratsim/weave), here are the tradeoffs: -- Constantine's threadpool only provide spawn/sync (task parallelism).\ - There is no (extremely) optimized parallel for (data parallelism)\ - or precise in/out dependencies (events / dataflow parallelism). +- Constantine's threadpool provides spawn/sync (task parallelism) + and optimized parallelFor for (data parallelism).\ + It however does not provide precise in/out dependencies (events / dataflow parallelism). - Constantine's threadpool has been significantly optimized to provide - overhead lower than Weave's default (and as low as Weave "lazy" + "alloca" allocation scheme) -- Constantine's threadpool provides the same adaptative scheduling strategy as Weave - with additional enhancement (leapfrogging) + overhead lower than Weave's default (and as low as Weave "lazy" + "alloca" allocation scheme). Compared to [nim-taskpools](https://github.com/status-im), here are the tradeoffs: - Constantine does not use std/tasks: @@ -35,4 +33,6 @@ Compared to [nim-taskpools](https://github.com/status-im), here are the tradeoff - The future (Flowvar) result channel - Contention improvement, Constantine is entirely lock-free while Nim-taskpools need a lock+condition variable for putting threads to sleep - Powersaving improvement, threads sleep when awaiting for a task and there is no work available. -- Scheduling improvement, Constantine's threadpool incorporate Weave's adaptative scheduling policy with additional enhancement (leapfrogging) \ No newline at end of file +- Scheduling improvement, Constantine's threadpool incorporate Weave's adaptative scheduling policy with additional enhancement (leapfrogging) + +See also [design.md](./docs/design.md) \ No newline at end of file diff --git a/constantine/platforms/threadpool/crossthread/backoff.nim b/constantine/platforms/threadpool/crossthread/backoff.nim index 471126e..f205859 100644 --- a/constantine/platforms/threadpool/crossthread/backoff.nim +++ b/constantine/platforms/threadpool/crossthread/backoff.nim @@ -200,13 +200,17 @@ proc cancelSleep*(ec: var Eventcount) {.inline.} = proc wake*(ec: var EventCount) {.inline.} = ## Wake a thread if at least 1 is parked let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease) - if (prev and kAnyWaiterMask) != 0: + if (prev and kPreWaitMask) != 0: + # Some threads are in prewait and will see the epoch change + # no need to do an expensive syscall + return + if (prev and kWaitMask) != 0: ec.futex.wake() proc wakeAll*(ec: var EventCount) {.inline.} = ## Wake all threads if at least 1 is parked let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease) - if (prev and kAnyWaiterMask) != 0: + if (prev and kWaitMask) != 0: ec.futex.wakeAll() proc getNumWaiters*(ec: var EventCount): tuple[preSleep, committedSleep: int32] {.noInit, inline.} = diff --git a/constantine/platforms/threadpool/crossthread/taskqueues.nim b/constantine/platforms/threadpool/crossthread/taskqueues.nim index 55fa387..1aabf6d 100644 --- a/constantine/platforms/threadpool/crossthread/taskqueues.nim +++ b/constantine/platforms/threadpool/crossthread/taskqueues.nim @@ -14,25 +14,21 @@ # David Chase, Yossi Lev, 1993 # https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf # -# - Non-Blocking Steal-Half Work Queues -# Danny Hendler, Nir Shavit, 2002 -# https://www.cs.bgu.ac.il/~hendlerd/papers/p280-hendler.pdf -# # - Correct and Efficient Work-Stealing for Weak Memory Models # Nhat Minh Lê, Antoniu Pop, Albert Cohen, Francesco Zappa Nardelli, 2013 # https://fzn.fr/readings/ppopp13.pdf # -# The task queue implements the following push, pop, steal, stealHalf +# The task queue implements the following push, pop, steal # # front back # --------------------------------- # steal() <- | | | | <- push() -# stealHalf() <- | Task 0 | Task 1 | Task 2 | -> pop() +# | Task 0 | Task 1 | Task 2 | -> pop() # any thread | | | | owner-only # --------------------------------- # # To reduce contention, stealing is done on the opposite end from push/pop -# so that there is a race only for the very last task(s). +# so that there is a race only for the very last task. {.push raises: [], checks: off.} # No exceptions in a multithreading datastructure @@ -57,7 +53,7 @@ type ## Foreign threads steal at the front. ## ## There is no memory reclamation scheme for simplicity - front {.align: 64.}: Atomic[int] # Consumers - steal/stealHalf + front {.align: 64.}: Atomic[int] # Consumers - steal back: Atomic[int] # Producer - push/pop buf: Atomic[ptr Buf] garbage: ptr Buf @@ -215,72 +211,3 @@ proc steal*(thiefID: int32, tq: var Taskqueue): ptr Task = # Failed race. return nil result.thiefID.store(thiefID, moRelease) - -proc stealHalfImpl(dst: var Buf, dstBack: int, src: var Taskqueue): int = - ## Theft part of stealHalf: - ## - updates the victim metadata if successful - ## - uncommitted updates to the thief tq whether successful or not - ## Returns -1 if dst buffer is too small - ## Assumes `dst` buffer is empty (i.e. not ahead-of-time thefts) - - while true: - # Try as long as there are something to steal, we are idling anyway. - - var f = src.front.load(moAcquire) - fence(moSequentiallyConsistent) - let b = src.back.load(moAcquire) - var n = b-f - n = n - (n shr 1) # Division by 2 rounded up, so if only one task is left, we still get it. - - if n <= 0: - return 0 - if n > dst.capacity: - return -1 - - # Non-empty queue. - let sBuf = src.buf.load(moConsume) - for i in 0 ..< n: # Copy LIFO or FIFO? - dst[dstBack+i] = sBuf[][f+i] - if compareExchange(src.front, f, f+n, moSequentiallyConsistent, moRelaxed): - return n - -proc stealHalf*(thiefID: int32, dst: var Taskqueue, src: var Taskqueue): ptr Task = - ## Dequeue up to half of the items in the `src` tq, fom the front. - ## Return the last of those, or nil if none found - - while true: - # Prepare for batch steal - let - bDst = dst.back.load(moRelaxed) - fDst = dst.front.load(moAcquire) - var dBuf = dst.buf.load(moAcquire) - let sBuf = src.buf.load(moAcquire) - - if dBuf.capacity < sBuf.capacity: - # We could grow to sBuf/2 since we steal half, but we want to minimize - # churn if we are actually in the process of stealing and the buffers grows. - dst.grow(dBuf, sBuf.capacity, fDst, bDst) - - # Steal - let n = dBuf[].stealHalfImpl(bDst, src) - - if n == 0: - return nil - if n == -1: - # Oops, victim buffer grew bigger than ours, restart the whole process - continue - - # Update metadata - for i in 0 ..< n: - dBuf[][bDst+i].thiefID.store(thiefID, moRelease) - - # Commit/publish theft, return the first item for processing - let last = dBuf[][bDst+n-1] - fence(moSequentiallyConsistent) - if n == 1: - return last - - # We have more than one item, so some must go in our queue - # they are already here but inaccessible for other thieves - dst.back.store(bDst+n-1, moRelease) # We assume that queue was empty and so dst.front didn't change - return last diff --git a/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim b/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim index 959092f..07b0c2f 100644 --- a/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim +++ b/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim @@ -9,7 +9,7 @@ import std/atomics, ../instrumentation, - ../../allocs, ../../primitives, + ../../allocs, ./backoff # Tasks have an efficient design so that a single heap allocation @@ -27,14 +27,10 @@ import type Task* = object - # Intrusive metadata + # Synchronization # ------------------ parent*: ptr Task # When a task is awaiting, a thread can quickly prioritize the direct child of a task - thiefID*: Atomic[int32] # ID of the worker that stole and run the task. For leapfrogging. - - # Result sync - # ------------------ hasFuture*: bool # Ownership: if a task has a future, the future deallocates it. Otherwise the worker thread does. completed*: Atomic[bool] waiter*: Atomic[ptr EventNotifier] @@ -42,25 +38,20 @@ type # Data parallelism # ------------------ isFirstIter*: bool # Awaitable for-loops return true for first iter. Loops are split before first iter. + envSize*: int32 # This is used for splittable loop loopStart*: int loopStop*: int loopStride*: int loopStepsLeft*: int reductionDAG*: ptr ReductionDagNode # For parallel loop reduction, merge with other range result - # Dataflow parallelism - # -------------------- - dependsOnEvent: bool # We cannot leapfrog a task triggered by an event - # Execution # ------------------ fn*: proc (env: pointer) {.nimcall, gcsafe, raises: [].} - # destroy*: proc (env: pointer) {.nimcall, gcsafe.} # Constantine only deals with plain old data - envSize*: int32 + # destroy*: proc (env: pointer) {.nimcall, gcsafe.} # Constantine only deals with plain old env env*{.align:sizeof(int).}: UncheckedArray[byte] Flowvar*[T] = object - ## A Flowvar is a placeholder for a future result that may be computed in parallel task: ptr Task ReductionDagNode* = object @@ -82,7 +73,8 @@ const SentinelThief* = 0xFACADE'i32 proc newSpawn*( T: typedesc[Task], parent: ptr Task, - fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].}): ptr Task = + fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].} + ): ptr Task {.inline.} = const size = sizeof(T) @@ -93,22 +85,12 @@ proc newSpawn*( result.completed.store(false, moRelaxed) result.waiter.store(nil, moRelaxed) result.fn = fn - result.envSize = 0 - - result.isFirstIter = false - result.loopStart = 0 - result.loopStop = 0 - result.loopStride = 0 - result.loopStepsLeft = 0 - result.reductionDAG = nil - - result.dependsOnEvent = false proc newSpawn*( T: typedesc[Task], parent: ptr Task, fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].}, - env: auto): ptr Task = + env: auto): ptr Task {.inline.} = const size = sizeof(T) + # size without Unchecked sizeof(env) @@ -120,24 +102,20 @@ proc newSpawn*( result.completed.store(false, moRelaxed) result.waiter.store(nil, moRelaxed) result.fn = fn - result.envSize = int32 sizeof(env) cast[ptr[type env]](result.env)[] = env - result.isFirstIter = false - result.loopStart = 0 - result.loopStop = 0 - result.loopStride = 0 - result.loopStepsLeft = 0 - result.reductionDAG = nil - - result.dependsOnEvent = false +func ceilDiv_vartime*(a, b: auto): auto {.inline.} = + ## ceil division, to be used only on length or at compile-time + ## ceil(a / b) + (a + b - 1) div b proc newLoop*( T: typedesc[Task], parent: ptr Task, start, stop, stride: int, isFirstIter: bool, - fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].}): ptr Task = + fn: proc (env: pointer) {.nimcall, gcsafe, raises: [].} + ): ptr Task = const size = sizeof(T) preCondition: start < stop @@ -157,8 +135,6 @@ proc newLoop*( result.loopStepsLeft = ceilDiv_vartime(stop-start, stride) result.reductionDAG = nil - result.dependsOnEvent = false - proc newLoop*( T: typedesc[Task], parent: ptr Task, @@ -188,8 +164,6 @@ proc newLoop*( result.loopStepsLeft = ceilDiv_vartime(stop-start, stride) result.reductionDAG = nil - result.dependsOnEvent = false - # Flowvars # ------------------------------------------------------------------------- @@ -235,9 +209,6 @@ proc sync*[T](fv: sink Flowvar[T]): T {.noInit, inline, gcsafe.} = ## and returned. ## The thread is not idle and will complete pending tasks. mixin completeFuture - if fv.task.isNil: - zeroMem(result.addr, sizeof(T)) - return completeFuture(fv, result) cleanup(fv) diff --git a/constantine/platforms/threadpool/design.md b/constantine/platforms/threadpool/docs/design.md similarity index 73% rename from constantine/platforms/threadpool/design.md rename to constantine/platforms/threadpool/docs/design.md index 06a2c87..e3ce1c7 100644 --- a/constantine/platforms/threadpool/design.md +++ b/constantine/platforms/threadpool/docs/design.md @@ -22,7 +22,7 @@ Also neither supports putting awaiting threads to sleep when the future they wan | Communication mechanisms | Shared-memory | Message-passing / Channels | Shared-memory | Shared-memory | Load balancing strategy | static (GCC), work-stealing (Intel/LLVM) | work-sharing / work-requesting | work-stealing | work-stealing | | Blocked tasks don't block runtime | N/A | no | yes | yes | -| Load-balancing strategy for task parallelism (important for fine-grained parallelism) | global queue (GCC), steal-one (Intel/LLVM) | Adaptative steal-one/steal-half | steal-one | steal-one (steal-half WIP) | +| Load-balancing strategy for task parallelism (important for fine-grained parallelism) | global queue (GCC), steal-one (Intel/LLVM) | Adaptative steal-one/steal-half | steal-one | steal-one | | Load-balancing strategy for data parallelism | eager splitting depending on iteration count and cpu count | lazy splitting depending on idle CPUs and workload | N/A | lazy splitting depending on idle CPUs and workload | | Backoff worker when idle | yes (?) | yes | yes | yes | | Backoff worker when awaiting task but no work | N/A | no | no | yes | @@ -30,6 +30,43 @@ Also neither supports putting awaiting threads to sleep when the future they wan ## Key features design +### Scheduler overhead/contention + +#### Distributed task queues +To enable fine-grained parallelism, i.e. parallelizing tasks in the microseconds range, it's critical to reduce contention. +A global task queue will be hammered by N threads, leading to each thrashing each other caches. +In contrast, distributed task queues with random victim selection significantly reduce contention. + +#### Memory allocation +Another source of overhead is the allocator, the worst case for allocators is allocation in a thread and deallocation in another, especially if the +allocating thread is always the same. Unfortunately this is common in producer-consumer workloads. +Besides multithreaded allocations/deallocations will trigger costly atomic-swaps and possibly fragmentation. +Minimizing allocations to the utmost will significantly help on fine-grained tasks. +- Weave solved that problem by having 2 levels of cache: a memory-pool for tasks and futures and a lookaside list that caches tasks to reduce further pressure on the memory pool. +- Nim-taskpools does not address this problem, it has an allocation overhead per tasks of 1 for std/tasks, 1 for the linked list that holds them, 1 for the result channel/flowvar. + Unlike GCC OpenMP which freezes on a fibonacci 40 benchmark, it can still finish but it's 20x slower than Weave. +- Constantine's threadpool solves the problem by making everything intrusive to a task: the task env, the future, the linked list. +In fact this solution is even faster than Weave's, probably due to significantly less page faults and cache misses. +Note that Weave has an even faster mode when futures don't escape their function by allocating them on the stack but without compiler support (heap allocation elision) that restricts the programs you can write. + +### Load balancing for task parallelism + +When a worker runs out of task, it steals from others' task queues. +They may steal one or multiple tasks. +In case of severe load imbalance, a steal-half policy can quickly rebalance workers queue to the global average. +This also helps reduce scheduler overhead by having logarithmically less steal attempts. +However, it may lead to significantly more rebalancing if workers generate few tasks. + +Weave implements adaptative work-stealing with runtime selection of steal-one/steal-half +- Embracing Explicit Communication in Work-Stealing Runtime Systems.\ + Andreas Prell, 2016\ + https://epub.uni-bayreuth.de/id/eprint/2990/ + +Constantine's threadpool will likely adopt the same if the following task queues can be implemented with low overhead +- Non-Blocking Steal-Half Work Queues\ + Danny Hendler, Nir Shavit, 2002\ + https://www.cs.bgu.ac.il/~hendlerd/papers/p280-hendler.pdf + ### Load-balancing for data parallelism A critical issue in most (all?) runtimes used in HPC (OpenMP and Intel TBB in particular) is that they split their parallel for loop ahead of time. @@ -76,34 +113,35 @@ Instead we can have each thread start working and use backpressure to lazily eva ### Backoff workers when awaiting a future This problem is quite tricky: -- For latency, and to potentially create more work ASAP, we want such a worker to follow through on the blocked future ASAP. -- For throughput, and because a scheduler is optimal only when greedy, we want an idle thread to take any available work. - - But what if the work then blocks that worker for 1s? This hurts latency and might lead to work starvation if the continuation would have created more work. +- For latency we want the worker to continue as soon as the future is completed. This might also create more work and expose more parallelism opportunities (in recursive divide-and-conquer algorithms for example) +- For throughput, and because a scheduler is optimal only when greedy (i.e. within 2x of the best schedule, see Cilk paper), we want an idle thread to take any available work ASAP. + - but what if that worker ends up with work that blocks it for a long-time? It may lead to work starvation. - There is no robust, cross-platform API, to wake a specific thread awaiting on a futex or condition variable. - The simplest design then would be to have an array of futexes, when backing-off sleep on those. The issue then is that when offering work you have to scan that array to find a worker to wake. Contrary to a idle worker, the waker is working so this scan hurts throughput and latency, and due to the many atomics operations required, will completely thrash the cache of that worker. + - The even more simple is to wake-all on future completion - Another potential data structure would be a concurrent sparse-set but designing concurrent data structures is difficult. - and locking would be expensive for an active worker + and locking would be expensive for an active worker. +- Alternative designs would be: + - Not sleep + - Having reserve threads: + Before sleeping when blocked on a future the thread wakes a reserve thread. As the number of hardware resources is maintained we maintain throughput. + The waiting thread is also immediately available when the future is completed since it cannot be stuck in work. + A well-behaved program will always have at least 1 thread making progress among N, so a reserve of size N is sufficient. + Unfortunately, this solution suffers for high latency of wakeups and/or kernel context-switch. + For fine-grained tasks it is quite impactful: heat benchmark is 7x slower, fibonacci 1.5x, depth-first-search 2.5x. + For actual workload, the elliptic curve sum reduction is also significantly slower. + - Using continuations: + We could just store a continuation in the future so the thread that completes the future picks up the continuation. -The solution in Constantine's threadpool to minimize latency and maximize throughput and avoid implementing another concurrent data structure is -having "reserve threads". Before sleeping when blocked on a future the thread wakes a reserve thread. This maintain throughput, and the thread is immediately available -as well when the future is completed. A well-behaved program will always have at least 1 thread making progress among N, so a reserve of size N is sufficient. - -### Scheduler overhead/contention - -To enable fine-grained parallelism, i.e. parallelizing tasks in the microseconds range, it's critical to reduce contention. -A global task queue will be hammered by N threads, leading to each thrashing each other caches. -In contrast, distributed task queues with random victim selection significantly reduce contention. - -Another source of overhead is the allocator, the worst case for allocators is allocation in a thread and deallocation in another, especially if the -allocating thread is always the same. Unfortunately this is common in producer-consumer workloads. -Besides multithreaded allocations/deallocations will trigger costly atomic-swaps and possibly fragmentation. -Minimizing allocations to the utmost will significantly help on fine-grained tasks. -- Weave solved that problem by having 2 levels of cache: a memory-pool for tasks and futures and a lookaside list that caches tasks to reduce further pressure on the memory pool. -- Nim-taskpools does not address this problem, it has an allocation overhead per tasks of 1 for std/tasks, 1 for the linked list that holds them, 1 for the result channel/flowvar. - Unlike GCC OpenMP which freezes on a fibonacci 40 benchmark, it can still finish but it's 20x slower than Weave. -- Constantine's threadpool solves the problem by making everything intrusive to a task: the task env, the future, the linked list. -In fact this solution is even faster than Weave's, probably due to significantly less page faults and cache misses. -Note that Weave has an even faster mode when futures don't escape their function by allocating them on the stack but without compiler support (heap allocation elision) that restricts the programs you can write. +The workaround for now is just to work on any available tasks to maximize throughput then sleep. +This has 2 disadvantages: +- For coarse-grain parallelism, if the waiting thread steals a long task. + However, coarse-grain parallelism is easier to split into multiple tasks, + while exposing fine-grain parallelism and properly handling with its overhead is the usual problem. +- As we can't wake a specific thread on a common futex or condition variable, + when awaiting a future, a thread sleeps on a local one. + Hence if no work can be stolen, the waiter sleeps. But if work is then recreated, it stays sleeping as only the global futex is notified. + Note that with hyperthreading, the sibling thread(s) can still use the core fully so throughput might not be impacted. diff --git a/constantine/platforms/threadpool/parallel_offloading.nim b/constantine/platforms/threadpool/parallel_offloading.nim index f4535c9..098733c 100644 --- a/constantine/platforms/threadpool/parallel_offloading.nim +++ b/constantine/platforms/threadpool/parallel_offloading.nim @@ -41,7 +41,7 @@ proc spawnVoid(funcCall: NimNode, args, argsTy: NimNode, workerContext, schedule let fnName = $fn let withArgs = args.len > 0 let tpSpawn_closure = ident("ctt_tpSpawnVoidClosure_" & fnName) - var loopFnCall = newCall(fn) + var fnCall = newCall(fn) let env = ident("ctt_tpSpawnVoidEnv_") # typed pointer to env # Schedule @@ -53,10 +53,10 @@ proc spawnVoid(funcCall: NimNode, args, argsTy: NimNode, workerContext, schedule if funcCall.len == 2: # With only 1 arg, the tuple syntax doesn't construct a tuple # let env = (123) # is an int - loopFnCall.add nnkDerefExpr.newTree(env) + fnCall.add nnkDerefExpr.newTree(env) else: # This handles the 0 arg case as well for i in 1 ..< funcCall.len: - loopFnCall.add nnkBracketExpr.newTree( + fnCall.add nnkBracketExpr.newTree( env, newLit i-1) @@ -65,7 +65,7 @@ proc spawnVoid(funcCall: NimNode, args, argsTy: NimNode, workerContext, schedule proc `tpSpawn_closure`(env: pointer) {.nimcall, gcsafe, raises: [].} = when bool(`withArgs`): let `env` = cast[ptr `argsTy`](env) - `loopFnCall` + `fnCall` # Create the task result.add quote do: @@ -88,7 +88,7 @@ proc spawnRet(funcCall: NimNode, retTy, args, argsTy: NimNode, workerContext, sc let fn = funcCall[0] let fnName = $fn let tpSpawn_closure = ident("ctt_tpSpawnRetClosure_" & fnName) - var loopFnCall = newCall(fn) + var fnCall = newCall(fn) let env = ident("ctt_tpSpawnRetEnv_") # typed pointer to env # tasks have no return value. @@ -117,12 +117,12 @@ proc spawnRet(funcCall: NimNode, retTy, args, argsTy: NimNode, workerContext, sc # env stores | ptr Task | result | arg₀ | arg₁ | ... | argₙ # so arguments starts at env[2] in the wrapping funcCall functions for i in 1 ..< funcCall.len: - loopFnCall.add nnkBracketExpr.newTree(env, newLit i+1) + fnCall.add nnkBracketExpr.newTree(env, newLit i+1) result.add quote do: proc `tpSpawn_closure`(env: pointer) {.nimcall, gcsafe, raises: [].} = let `env` = cast[ptr `envParamsTy`](env) - let res = `loopFnCall` + let res = `fnCall` readyWith(`env`[0], res) # Regenerate fresh ident, retTy has been tagged as a function call param diff --git a/constantine/platforms/threadpool/threadpool.nim b/constantine/platforms/threadpool/threadpool.nim index 024d89d..316aefd 100644 --- a/constantine/platforms/threadpool/threadpool.nim +++ b/constantine/platforms/threadpool/threadpool.nim @@ -20,13 +20,113 @@ import ./instrumentation, ./primitives/barriers, ./parallel_offloading, - ../allocs, ../bithacks, - ../../../helpers/prng_unsafe + ../allocs, ../bithacks export # flowvars Flowvar, isSpawned, isReady, sync +# ############################################################ +# # +# RNG # +# # +# ############################################################ +# +# We don't need a CSPRNG, the RNG is to select a random victim when work-stealing +# +# - Scrambled Linear Pseudorandom Number Generators +# Blackman, Vigna, 2021 +# https://vigna.di.unimi.it/ftp/papers/ScrambledLinear.pdf +# https://prng.di.unimi.it/ + +type WorkStealingRng = object + ## This is the state of a Xoshiro256+ PRNG + ## It is used for work-stealing. The low bits have low linear complexity. + ## So we use the high 32 bits to seed our pseudo random walk of thread taskqueues. + s: array[4, uint64] + +func splitMix64(state: var uint64): uint64 = + state += 0x9e3779b97f4a7c15'u64 + result = state + result = (result xor (result shr 30)) * 0xbf58476d1ce4e5b9'u64 + result = (result xor (result shr 27)) * 0xbf58476d1ce4e5b9'u64 + result = result xor (result shr 31) + +func seed(rng: var WorkStealingRng, x: SomeInteger) = + ## Seed the random number generator with a fixed seed + var sm64 = uint64(x) + rng.s[0] = splitMix64(sm64) + rng.s[1] = splitMix64(sm64) + rng.s[2] = splitMix64(sm64) + rng.s[3] = splitMix64(sm64) + +func rotl(x: uint64, k: static int): uint64 {.inline.} = + return (x shl k) or (x shr (64 - k)) + +template `^=`(x: var uint64, y: uint64) = + x = x xor y + +func nextU32(rng: var WorkStealingRng): uint32 = + ## Compute a random uint32 + # Need to use the high bits + result = uint32((rng.s[0] + rng.s[3]) shr 32) + + let t = rng.s[1] shl 17 + rng.s[2] ^= rng.s[0]; + rng.s[3] ^= rng.s[1]; + rng.s[1] ^= rng.s[2]; + rng.s[0] ^= rng.s[3]; + + rng.s[2] ^= t; + + rng.s[3] = rotl(rng.s[3], 45); + +iterator pseudoRandomPermutation(randomSeed: uint32, maxExclusive: int32): int32 = + ## Create (low-quality) pseudo-random permutations for [0, max) + # Design considerations and randomness constraint for work-stealing, see docs/random_permutations.md + # + # Linear Congruential Generator: https://en.wikipedia.org/wiki/Linear_congruential_generator + # + # Xₙ₊₁ = aXₙ+c (mod m) generates all random number mod m without repetition + # if and only if (Hull-Dobell theorem): + # 1. c and m are coprime + # 2. a-1 is divisible by all prime factors of m + # 3. a-1 is divisible by 4 if m is divisible by 4 + # + # Alternative 1. By choosing a=1, all conditions are easy to reach. + # + # The randomness quality is not important besides distributing potential contention, + # i.e. randomly trying thread i, then i+1, then i+n-1 (mod n) is good enough. + # + # Assuming 6 threads, co-primes are [1, 5], which means the following permutations + # assuming we start with victim 0: + # - [0, 1, 2, 3, 4, 5] + # - [0, 5, 4, 3, 2, 1] + # While we don't care much about randoness quality, it's a bit disappointing. + # + # Alternative 2. We can choose m to be the next power of 2, meaning all odd integers are co-primes, + # consequently: + # - we don't need a GCD to find the coprimes + # - we don't need to cache coprimes, removing a cache-miss potential + # - a != 1, so we now have a multiplicative factor, which makes output more "random looking". + + # n and (m-1) <=> n mod m, if m is a power of 2 + let maxExclusive = cast[uint32](maxExclusive) + let M = maxExclusive.nextPowerOfTwo_vartime() + let c = (randomSeed and ((M shr 1) - 1)) * 2 + 1 # c odd and c ∈ [0, M) + let a = (randomSeed and ((M shr 2) - 1)) * 4 + 1 # a-1 divisible by 2 (all prime factors of m) and by 4 if m divisible by 4 + + let mask = M-1 # for mod M + let start = randomSeed and mask + + var x = start + while true: + if x < maxExclusive: + yield cast[int32](x) + x = (a*x + c) and mask # ax + c (mod M), with M power of 2 + if x == start: + break + # ############################################################ # # # Types # @@ -46,35 +146,25 @@ type threadpool: Threadpool # Tasks - taskqueue: ptr Taskqueue # owned task queue + taskqueue: ptr Taskqueue # owned task queue currentTask: ptr Task # Synchronization - localBackoff: EventNotifier # Multi-Producer Single-Consumer backoff - signal: ptr Signal # owned signal + localBackoff: EventNotifier # Multi-Producer Single-Consumer backoff + signal: ptr Signal # owned signal # Thefts - rng: RngState # RNG state to select victims - - # Adaptative theft policy - stealHalf: bool - recentTasks: int32 - recentThefts: int32 - recentTheftsAdaptative: int32 - recentLeaps: int32 + rng: WorkStealingRng # RNG state to select victims Threadpool* = ptr object barrier: SyncBarrier # Barrier for initialization and teardown # -- align: 64 globalBackoff: EventCount # Multi-Producer Multi-Consumer backoff - reserveBackoff: EventCount # -- align: 64 - numThreads*{.align: 64.}: int32 # N regular workers + N reserve workers - workerQueues: ptr UncheckedArray[Taskqueue] # size 2N - workers: ptr UncheckedArray[Thread[(Threadpool, WorkerID)]] # size 2N - workerSignals: ptr UncheckedArray[Signal] # size 2N - # -- align: 64 - numIdleThreadsAwaitingFutures*{.align: 64.}: Atomic[int32] + numThreads*{.align: 64.}: int32 # N regular workers + workerQueues: ptr UncheckedArray[Taskqueue] # size N + workers: ptr UncheckedArray[Thread[(Threadpool, WorkerID)]] # size N + workerSignals: ptr UncheckedArray[Signal] # size N # ############################################################ # # @@ -85,14 +175,21 @@ type var workerContext {.threadvar.}: WorkerContext ## Thread-local Worker context ## We assume that a threadpool has exclusive ownership + ## + ## TODO: if we want to allow non-conflicting threadpools instantiated by the same thread: + ## - only the threadID per threadpool should be stored and the associated + ## context should be stored at the Threadpool-level. + ## Then we need to associate threadpool pointer to workerID in that threadpool + ## - Or we completely remove thread-local data + ## and use a Minimal Perfect Hash Function. + ## We can approximate a threadID by retrieving the address of a dummy thread-local variable. + ## - Or we sort threadID and use binary search -proc setupWorker() = +proc setupWorker(ctx: var WorkerContext) = ## Initialize the thread-local context of a worker ## Requires the ID and threadpool fields to be initialized - template ctx: untyped = workerContext - preCondition: not ctx.threadpool.isNil() - preCondition: 0 <= ctx.id and ctx.id < 2*ctx.threadpool.numThreads + preCondition: 0 <= ctx.id and ctx.id < ctx.threadpool.numThreads preCondition: not ctx.threadpool.workerQueues.isNil() preCondition: not ctx.threadpool.workerSignals.isNil() @@ -111,19 +208,12 @@ proc setupWorker() = # Init ctx.taskqueue[].init(initialCapacity = 32) - # Adaptative theft policy - ctx.recentTasks = 0 - ctx.recentThefts = 0 - ctx.recentTheftsAdaptative = 0 - ctx.recentLeaps = 0 - -proc teardownWorker() = +proc teardownWorker(ctx: var WorkerContext) = ## Cleanup the thread-local context of a worker - workerContext.localBackoff.`=destroy`() - workerContext.taskqueue[].teardown() + ctx.localBackoff.`=destroy`() + ctx.taskqueue[].teardown() -proc eventLoopRegular(ctx: var WorkerContext) {.raises:[], gcsafe.} -proc eventLoopReserve(ctx: var WorkerContext) {.raises:[], gcsafe.} +proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.} proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises: [].} = ## On the start of the threadpool workers will execute this @@ -137,15 +227,12 @@ proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises ctx.id = params.id ctx.threadpool = params.threadpool - setupWorker() + ctx.setupWorker() # 1 matching barrier in Threadpool.new() for root thread discard params.threadpool.barrier.wait() - if ctx.id < ctx.threadpool.numThreads: - ctx.eventLoopRegular() - else: - ctx.eventLoopReserve() + ctx.eventLoop() debugTermination: log(">>> Worker %3d shutting down <<<\n", ctx.id) @@ -153,7 +240,7 @@ proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises # 1 matching barrier in threadpool.shutdown() for root thread discard params.threadpool.barrier.wait() - teardownWorker() + ctx.teardownWorker() # ############################################################ # # @@ -167,12 +254,11 @@ const RootTask = cast[ptr Task](0xEFFACED0) proc run*(ctx: var WorkerContext, task: ptr Task) {.raises:[].} = ## Run a task, frees it if it is not owned by a Flowvar - let suspendedTask = workerContext.currentTask + let suspendedTask = ctx.currentTask ctx.currentTask = task debug: log("Worker %3d: running task 0x%.08x (previous: 0x%.08x, %d pending, thiefID %d)\n", ctx.id, task, suspendedTask, ctx.taskqueue[].peek(), task.thiefID) task.fn(task.env.addr) debug: log("Worker %3d: completed task 0x%.08x (%d pending)\n", ctx.id, task, ctx.taskqueue[].peek()) - ctx.recentTasks += 1 ctx.currentTask = suspendedTask if not task.hasFuture: freeHeap(task) @@ -193,10 +279,6 @@ proc schedule(ctx: var WorkerContext, tn: ptr Task, forceWake = false) {.inline. # Instead of notifying every time a task is scheduled, we notify # only when the worker queue is empty. This is a good approximation # of starvation in work-stealing. - # - Tzannes, A., G. C. Caragea, R. Barua, and U. Vishkin. - # Lazy binary-splitting: a run-time adaptive work-stealing scheduler. - # In PPoPP ’10, Bangalore, India, January 2010. ACM, pp. 179–190. - # https://user.eng.umd.edu/~barua/ppopp164.pdf let wasEmpty = ctx.taskqueue[].peek() == 0 ctx.taskqueue[].push(tn) if forceWake or wasEmpty: @@ -259,7 +341,7 @@ iterator splitUpperRanges( ctx.id, task, task.loopStepsLeft, curLoopIndex, task.loopStart, task.loopStop, task.loopStride, numIdle) # Send a chunk of work to all idle workers + ourselves - let availableWorkers = numIdle + 1 + let availableWorkers = cast[int](numIdle + 1) let baseChunkSize = task.loopStepsLeft div availableWorkers let cutoff = task.loopStepsLeft mod availableWorkers @@ -364,7 +446,7 @@ proc loadBalanceLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int, if ctx.taskqueue[].peek() == 0: let waiters = ctx.threadpool.globalBackoff.getNumWaiters() # We assume that the worker that scheduled the task will work on it. I.e. idleness is underestimated. - let numIdle = waiters.preSleep + waiters.committedSleep + int32(task.isFirstIter) + let numIdle = waiters.preSleep + waiters.committedSleep + cast[int32](task.isFirstIter) if numIdle > 0: ctx.splitAndDispatchLoop(task, curLoopIndex, numIdle) backoff.decrease() @@ -376,13 +458,6 @@ proc loadBalanceLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int, backoff.nextCheck += task.loopStride shl backoff.windowLogSize template parallelForWrapper(idx: untyped{ident}, loopBody: untyped): untyped = - ## To be called within a loop task - ## Gets the loop bounds and iterate the over them - ## Also polls runtime status for dynamic loop splitting - ## - ## Loop prologue, epilogue, - ## remoteAccum, resultTy and returnStmt - ## are unused block: let this = workerContext.currentTask var backoff = BalancerBackoff( @@ -404,9 +479,6 @@ template parallelReduceWrapper( idx: untyped{ident}, prologue, loopBody, mergeLocalWithRemote, epilogue, remoteTaskAwaitable, awaitableType: untyped): untyped = - ## To be called within a loop task - ## Gets the loop bounds and iterate the over them - ## Also polls runtime status for dynamic loop splitting block: let this = workerContext.currentTask var backoff = BalancerBackoff( @@ -447,111 +519,16 @@ template parallelReduceWrapper( # # # ############################################################ -iterator pseudoRandomPermutation(randomSeed: uint32, maxExclusive: int32): int32 = - ## Create a (low-quality) pseudo-random permutation from [0, max) - # Design considerations and randomness constraint for work-stealing, see docs/random_permutations.md - # - # Linear Congruential Generator: https://en.wikipedia.org/wiki/Linear_congruential_generator - # - # Xₙ₊₁ = aXₙ+c (mod m) generates all random number mod m without repetition - # if and only if (Hull-Dobell theorem): - # 1. c and m are coprime - # 2. a-1 is divisible by all prime factors of m - # 3. a-1 is divisible by 4 if m is divisible by 4 - # - # Alternative 1. By choosing a=1, all conditions are easy to reach. - # - # The randomness quality is not important besides distributing potential contention, - # i.e. randomly trying thread i, then i+1, then i+n-1 (mod n) is good enough. - # - # Assuming 6 threads, co-primes are [1, 5], which means the following permutations - # assuming we start with victim 0: - # - [0, 1, 2, 3, 4, 5] - # - [0, 5, 4, 3, 2, 1] - # While we don't care much about randoness quality, it's a bit disappointing. - # - # Alternative 2. We can choose m to be the next power of 2, meaning all odd integers are co-primes, - # consequently: - # - we don't need a GCD to find the coprimes - # - we don't need to cache coprimes, removing a cache-miss potential - # - a != 1, so we now have a multiplicative factor, which makes output more "random looking". - - # n and (m-1) <=> n mod m, if m is a power of 2 - let maxExclusive = cast[uint32](maxExclusive) - let M = maxExclusive.nextPowerOfTwo_vartime() - let c = (randomSeed and ((M shr 1) - 1)) * 2 + 1 # c odd and c ∈ [0, M) - let a = (randomSeed and ((M shr 2) - 1)) * 4 + 1 # a-1 divisible by 2 (all prime factors of m) and by 4 if m divisible by 4 - - let mask = M-1 # for mod M - let start = randomSeed and mask - - var x = start - while true: - if x < maxExclusive: - yield cast[int32](x) - x = (a*x + c) and mask # ax + c (mod M), with M power of 2 - if x == start: - break - proc tryStealOne(ctx: var WorkerContext): ptr Task = ## Try to steal a task. - let seed = ctx.rng.next().uint32 - for targetId in seed.pseudoRandomPermutation(2*ctx.threadpool.numThreads): + let seed = ctx.rng.nextU32() + for targetId in seed.pseudoRandomPermutation(ctx.threadpool.numThreads): if targetId == ctx.id: continue let stolenTask = ctx.id.steal(ctx.threadpool.workerQueues[targetId]) if not stolenTask.isNil(): - ctx.recentThefts += 1 - # Theft successful, there might be more work for idle threads, wake one - ctx.threadpool.globalBackoff.wake() - return stolenTask - return nil - -proc updateStealStrategy(ctx: var WorkerContext) = - ## Estimate work-stealing efficiency during the last interval - ## If the value is below a threshold, switch strategies - const StealAdaptativeInterval = 25 - if ctx.recentTheftsAdaptative == StealAdaptativeInterval: - let recentTheftsNonAdaptative = ctx.recentThefts - ctx.recentTheftsAdaptative - let adaptativeTasks = ctx.recentTasks - ctx.recentLeaps - recentTheftsNonAdaptative - - let ratio = adaptativeTasks.float32 / StealAdaptativeInterval.float32 - if ctx.stealHalf and ratio < 2.0f: - # Tasks stolen are coarse-grained, steal only one to reduce re-steal - ctx.stealHalf = false - elif not ctx.stealHalf and ratio == 1.0f: - # All tasks processed were stolen tasks, we need to steal many at a time - ctx.stealHalf = true - - # Reset interval - ctx.recentTasks = 0 - ctx.recentThefts = 0 - ctx.recentTheftsAdaptative = 0 - ctx.recentLeaps = 0 - -proc tryStealAdaptative(ctx: var WorkerContext): ptr Task = - ## Try to steal one or many tasks, depending on load - - # TODO: while running 'threadpool/examples/e02_parallel_pi.nim' - # stealHalf can error out in tasks_flowvars.nim with: - # "precondition not task.completed.load(moAcquire)" - ctx.stealHalf = false - # ctx.updateStealStrategy() - - let seed = ctx.rng.next().uint32 - for targetId in seed.pseudoRandomPermutation(2*ctx.threadpool.numThreads): - if targetId == ctx.id: - continue - - let stolenTask = - if ctx.stealHalf: ctx.id.stealHalf(ctx.taskqueue[], ctx.threadpool.workerQueues[targetId]) - else: ctx.id.steal(ctx.threadpool.workerQueues[targetId]) - - if not stolenTask.isNil(): - ctx.recentThefts += 1 - ctx.recentTheftsAdaptative += 1 # Theft successful, there might be more work for idle threads, wake one ctx.threadpool.globalBackoff.wake() return stolenTask @@ -575,106 +552,45 @@ proc tryLeapfrog(ctx: var WorkerContext, awaitedTask: ptr Task): ptr Task = if thiefID != SentinelThief: break cpuRelax() - ascertain: 0 <= thiefID and thiefID < 2*ctx.threadpool.numThreads + ascertain: 0 <= thiefID and thiefID < ctx.threadpool.numThreads # Leapfrogging is used when completing a future, so steal only one task # and don't leave tasks stranded in our queue. let leapTask = ctx.id.steal(ctx.threadpool.workerQueues[thiefID]) if not leapTask.isNil(): - ctx.recentLeaps += 1 # Theft successful, there might be more work for idle threads, wake one ctx.threadpool.globalBackoff.wake() return leapTask return nil -proc eventLoopRegular(ctx: var WorkerContext) {.raises:[], gcsafe.} = +proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.} = ## Each worker thread executes this loop over and over. while true: # 1. Pick from local queue - debug: log("Worker %3d: eventLoopRegular 1 - searching task from local queue\n", ctx.id) + debug: log("Worker %3d: eventLoop 1 - searching task from local queue\n", ctx.id) while (var task = ctx.taskqueue[].pop(); not task.isNil): - debug: log("Worker %3d: eventLoopRegular 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask) + debug: log("Worker %3d: eventLoop 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask) ctx.run(task) # 2. Run out of tasks, become a thief - debug: log("Worker %3d: eventLoopRegular 2 - becoming a thief\n", ctx.id) + debug: log("Worker %3d: eventLoop 2 - becoming a thief\n", ctx.id) let ticket = ctx.threadpool.globalBackoff.sleepy() - if (var stolenTask = ctx.tryStealAdaptative(); not stolenTask.isNil): + if (var stolenTask = ctx.tryStealOne(); not stolenTask.isNil): # We manage to steal a task, cancel sleep ctx.threadpool.globalBackoff.cancelSleep() # 2.a Run task - debug: log("Worker %3d: eventLoopRegular 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) + debug: log("Worker %3d: eventLoop 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) ctx.run(stolenTask) elif ctx.signal.terminate.load(moAcquire): # 2.b Threadpool has no more tasks and we were signaled to terminate ctx.threadpool.globalBackoff.cancelSleep() - debugTermination: log("Worker %3d: eventLoopRegular 2.b - terminated\n", ctx.id) + debugTermination: log("Worker %3d: eventLoop 2.b - terminated\n", ctx.id) break else: # 2.c Park the thread until a new task enters the threadpool - debug: log("Worker %3d: eventLoopRegular 2.b - sleeping\n", ctx.id) + debug: log("Worker %3d: eventLoop 2.b - sleeping\n", ctx.id) ctx.threadpool.globalBackoff.sleep(ticket) - debug: log("Worker %3d: eventLoopRegular 2.b - waking\n", ctx.id) - -proc eventLoopReserve(ctx: var WorkerContext) {.raises:[], gcsafe.} = - ## A reserve worker is a relay when a thread is stuck awaiting a future completion. - ## This ensure those threads are available as soon as the future completes, minimizing latency - ## while ensuring the runtime uses all available hardware resources, maximizing throughput. - - template reserveSleepCheck: untyped = - let ticket = ctx.threadpool.reserveBackoff.sleepy() - let (reservePlanningSleep, reserveCommittedSleep) = ctx.threadpool.reserveBackoff.getNumWaiters() - let numActiveReservists = ctx.threadpool.numThreads - (reservePlanningSleep-1 + reserveCommittedSleep) # -1 we don't want to count ourselves - - if ctx.signal.terminate.load(moAcquire): # If terminated, we leave everything as-is, the regular workers will finish - ctx.threadpool.reserveBackoff.cancelSleep() - debugTermination: log("Worker %3d: reserveSleepCheck - terminated\n", ctx.id) - return - elif numActiveReservists > ctx.threadpool.numIdleThreadsAwaitingFutures.load(moAcquire): - ctx.threadpool.globalBackoff.wake() # In case we were just woken up for a task or we have tasks in our queue, pass the torch - debug: log("Worker %3d: reserveSleepCheck - going to sleep on reserve backoff\n", ctx.id) - ctx.threadpool.reserveBackoff.sleep(ticket) - debug: log("Worker %3d: reserveSleepCheck - waking on reserve backoff\n", ctx.id) - else: - ctx.threadpool.reserveBackoff.cancelSleep() - - while true: - # 1. Pick from local queue - debug: log("Worker %3d: eventLoopReserve 1 - searching task from local queue\n", ctx.id) - while true: - reserveSleepCheck() - var task = ctx.taskqueue[].pop() - if task.isNil(): - break - debug: log("Worker %3d: eventLoopReserve 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask) - ctx.run(task) - - # 2. Run out of tasks, become a thief - debug: log("Worker %3d: eventLoopReserve 2 - becoming a thief\n", ctx.id) - let ticket = ctx.threadpool.globalBackoff.sleepy() # If using a reserve worker was necessary, sleep on the backoff for active threads - if (var stolenTask = ctx.tryStealAdaptative(); not stolenTask.isNil): - # We manage to steal a task, cancel sleep - ctx.threadpool.globalBackoff.cancelSleep() - # 2.a Run task - debug: log("Worker %3d: eventLoopReserve 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) - ctx.run(stolenTask) - elif ctx.signal.terminate.load(moAcquire): - # 2.b Threadpool has no more tasks and we were signaled to terminate - ctx.threadpool.globalBackoff.cancelSleep() - debugTermination: log("Worker %3d: eventLoopReserve 2.b - terminated\n", ctx.id) - break - else: - # 2.c Park the thread until a new task enters the threadpool. - # It is intentionally parked with all active threads as long as a reservist is needed - let (reservePlanningSleep, reserveCommittedSleep) = ctx.threadpool.reserveBackoff.getNumWaiters() - let numActiveReservists = ctx.threadpool.numThreads - (reservePlanningSleep-1 + reserveCommittedSleep) # -1 we don't want to count ourselves - if numActiveReservists > ctx.threadpool.numIdleThreadsAwaitingFutures.load(moAcquire): - ctx.threadpool.globalBackoff.cancelSleep() - continue - - debug: log("Worker %3d: eventLoopReserve 2.b - sleeping on active threads backoff\n", ctx.id) - ctx.threadpool.globalBackoff.sleep(ticket) - debug: log("Worker %3d: eventLoopReserve 2.b - waking on active threads backoff\n", ctx.id) + debug: log("Worker %3d: eventLoop 2.b - waking\n", ctx.id) # ############################################################ # # @@ -682,9 +598,6 @@ proc eventLoopReserve(ctx: var WorkerContext) {.raises:[], gcsafe.} = # # # ############################################################ -template isRootTask(task: ptr Task): bool = - task == RootTask - proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = ## Eagerly complete an awaited FlowVar template ctx: untyped = workerContext @@ -712,47 +625,67 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = debug: log("Worker %3d: sync 1 - future ready, exiting\n", ctx.id) return - ## 2. We run out-of-tasks or out-of-direct-child of our current awaited task - ## So the task is bottlenecked by dependencies in other threads, - ## hence we abandon our enqueued work and steal. - ## - ## See also - ## - Proactive work-stealing for futures - ## Kyle Singer, Yifan Xu, I-Ting Angelina Lee, 2019 - ## https://dl.acm.org/doi/10.1145/3293883.3295735 + # 2. We run out-of-tasks or out-of-direct-child of our current awaited task + # So the task is bottlenecked by dependencies in other threads, + # hence we abandon our enqueued work and steal. + # + # See also + # - Proactive work-stealing for futures + # Kyle Singer, Yifan Xu, I-Ting Angelina Lee, 2019 + # https://dl.acm.org/doi/10.1145/3293883.3295735 + # + # Design tradeoffs + # ---------------- + # + # At this point, we have significant design decisions: + # - Do we steal from other workers in hope we advance our awaited task? + # Note: A greedy scheduler (no worker idle as long as there is a runnable task) + # is at most 2x slower than the optimal schedule (proof in Cilk paper) + # - Do we advance our own queue for tasks that are not child of our awaited tasks? + # - Do we park instead of working on unrelated task? + # Note: With hyperthreading, real hardware resources are 2x less than the reported number of cores. + # Hence parking might free contended memory bandwitdh or execution ports. + # - Do we just not sleep, potentially wasting energy? + # + # - If we work, we maximize throughput, but we increase latency to handle the future's continuation. + # If that continuation would have created more parallel work, we would actually have restricted parallelism. + # - If we park with tasks left, we minimize latency on the continuation, but we don't use hardware resources fully, + # and there are CPUs without hyperthreading (on ARM for example) + # - If we park when no tasks are left, if more work is enqueued, as we don't park on the global backoff we will miss it. + # Note: we don't park on the global backoff, because it's not possible to control which thread to wake with it (or we wake all). + # - Wakeup latency is high, having "reserve threads" that take over the active slot of the awaiting thread + # in theory maintains throughput and minimize the latency of the future's continuation + # but in practice, performance can worsen significantly on fine-grained parallelism. + debug: log("Worker %3d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x)\n", ctx.id, ctx.currentTask) while not isFutReady(): + if (let leapTask = ctx.tryLeapfrog(fv.task); not leapTask.isNil): # Leapfrogging, the thief had an empty queue, hence if there are tasks in its queue, it's generated by our blocked task. # Help the thief clear those, as if it did not finish, it's likely blocked on those children tasks. debug: log("Worker %3d: sync 2.1 - leapfrog task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, leapTask, leapTask.parent, ctx.currentTask) ctx.run(leapTask) + elif (let stolenTask = ctx.tryStealOne(); not stolenTask.isNil): + # We stole a task, we hope we advance our awaited task. + debug: log("Worker %3d: sync 2.2 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) + ctx.run(stolenTask) + elif (let ownTask = ctx.taskqueue[].pop(); not ownTask.isNil): + # We advance our own queue, this increases global throughput but may impact latency on the awaited task. + debug: log("Worker %3d: sync 2.3 - couldn't steal, running own task\n", ctx.id) + ctx.run(ownTask) else: - # At this point, we have significant design decisions: - # - Do we steal from other workers in hope we advance our awaited task? - # - Do we advance our own queue for tasks that are not child of our awaited tasks? - # - Do we park instead of working on unrelated task. With hyperthreading that would actually still leave the core busy enough? - # - # - If we work, we maximize throughput, but we increase latency to handle the future's continuation. - # If that future creates more parallel work, we would actually have restricted parallelism. - # - If we park, we minimize latency, but we don't use the full hardware resources, and there are CPUs without hyperthreading (on ARM for example) - # Furthermore, a work-stealing scheduler is within 2x an optimal scheduler if it is greedy, i.e., as long as there is enough work, all cores are used. - # - # The solution chosen is to wake a reserve thread, keeping hardware offered/throughput constant. And put the awaiting thread to sleep. + # Nothing to do, we park. + # - On today's hyperthreaded systems, this might reduce contention on a core resources like memory caches and execution ports + # - If more work is created, we won't be notified as we need to park on a dedicated notifier for precise wakeup when future is ready ctx.localBackoff.prepareToPark() - discard ctx.threadpool.numIdleThreadsAwaitingFutures.fetchAdd(1, moRelease) - ctx.threadpool.reserveBackoff.wake() var expected = (ptr EventNotifier)(nil) if compareExchange(fv.task.waiter, expected, desired = ctx.localBackoff.addr, moAcquireRelease): ctx.localBackoff.park() - discard ctx.threadpool.numIdleThreadsAwaitingFutures.fetchSub(1, moRelease) - proc syncAll*(tp: Threadpool) {.raises: [].} = ## Blocks until all pending tasks are completed - ## This MUST only be called from - ## the root scope that created the threadpool + ## This MUST only be called from the root scope that created the threadpool template ctx: untyped = workerContext debugTermination: @@ -769,17 +702,11 @@ proc syncAll*(tp: Threadpool) {.raises: [].} = ctx.run(task) # 2. Help other threads - debugTermination: - let regular = tp.globalBackoff.getNumWaiters() - let reserve = tp.reserveBackoff.getNumWaiters() - log("Worker %3d: syncAll 2 - becoming a thief - (preSleep: %d, sleeping %d) regular and (preSleep: %d, sleeping %d) reserve workers\n", - ctx.id, regular.preSleep, regular.committedSleep, reserve.preSleep, reserve.committedSleep) - if (var stolenTask = ctx.tryStealAdaptative(); not stolenTask.isNil): + if (var stolenTask = ctx.tryStealOne(); not stolenTask.isNil): # 2.a We stole some task debug: log("Worker %3d: syncAll 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) ctx.run(stolenTask) - elif tp.reserveBackoff.getNumWaiters() == (0'i32, tp.numThreads) and - tp.globalBackoff.getNumWaiters() == (0'i32, tp.numThreads-1): # Don't count ourselves + elif tp.globalBackoff.getNumWaiters() == (0'i32, tp.numThreads-1): # Don't count ourselves # 2.b all threads besides the current are parked debugTermination: log("Worker %3d: syncAll 2.b - termination, all other threads sleeping\n", ctx.id) break @@ -810,26 +737,23 @@ proc new*(T: type Threadpool, numThreads = countProcessors()): T {.raises: [Reso type TpObj = typeof(default(Threadpool)[]) # due to C import, we need a dynamic sizeof var tp = allocHeapUncheckedAlignedPtr(Threadpool, sizeof(TpObj), alignment = 64) - tp.barrier.init(2*numThreads.uint32) + tp.barrier.init(numThreads.uint32) tp.globalBackoff.initialize() - tp.reserveBackoff.initialize() tp.numThreads = numThreads.int32 - tp.numIdleThreadsAwaitingFutures.store(0, moRelaxed) - # Allocate for `numThreads` regular workers and `numTHreads` reserve workers - tp.workerQueues = allocHeapArrayAligned(Taskqueue, 2*numThreads, alignment = 64) - tp.workers = allocHeapArrayAligned(Thread[(Threadpool, WorkerID)], 2*numThreads, alignment = 64) - tp.workerSignals = allocHeapArrayAligned(Signal, 2*numThreads, alignment = 64) + tp.workerQueues = allocHeapArrayAligned(Taskqueue, numThreads, alignment = 64) + tp.workers = allocHeapArrayAligned(Thread[(Threadpool, WorkerID)], numThreads, alignment = 64) + tp.workerSignals = allocHeapArrayAligned(Signal, numThreads, alignment = 64) # Setup master thread workerContext.id = 0 workerContext.threadpool = tp # Start worker threads - for i in 1 ..< 2*numThreads: + for i in 1 ..< numThreads: createThread(tp.workers[i], workerEntryFn, (tp, WorkerID(i))) # Root worker - setupWorker() + workerContext.setupWorker() # Root task, this is a sentinel task that is never called. workerContext.currentTask = RootTask @@ -842,7 +766,7 @@ proc cleanup(tp: var Threadpool) {.raises: [].} = ## Cleanup all resources allocated by the threadpool preCondition: workerContext.currentTask.isRootTask() - for i in 1 ..< 2*tp.numThreads: + for i in 1 ..< tp.numThreads: joinThread(tp.workers[i]) tp.workerSignals.freeHeapAligned() @@ -859,16 +783,15 @@ proc shutdown*(tp: var Threadpool) {.raises:[].} = tp.syncAll() # Signal termination to all threads - for i in 0 ..< 2*tp.numThreads: + for i in 0 ..< tp.numThreads: tp.workerSignals[i].terminate.store(true, moRelease) tp.globalBackoff.wakeAll() - tp.reserveBackoff.wakeAll() # 1 matching barrier in workerEntryFn discard tp.barrier.wait() - teardownWorker() + workerContext.teardownWorker() tp.cleanup() # Delete dummy task @@ -882,12 +805,6 @@ proc shutdown*(tp: var Threadpool) {.raises:[].} = # # # ############################################################ -proc getThreadID(tp: Threadpool): int {.inline, used.} = - ## Returns the worker local ID. - ## This is a debug proc for logging purposes - ## The threadpool needs to be imported with {.all.} pragma - workerContext.id - # Task parallel API # --------------------------------------------- @@ -904,9 +821,6 @@ macro spawn*(tp: Threadpool, fnCall: typed): untyped = # Data parallel API # --------------------------------------------- -# TODO: we can fuse parallelFor and parallelForStrided -# in a single proc once {.experimental: "flexibleOptionalParams".} -# is not experimental anymore macro parallelFor*(tp: Threadpool, loopParams: untyped, body: untyped): untyped = ## Parallel for loop.