From 1cb6c3d9e11401ed50c5753fcc22078a6aee281b Mon Sep 17 00:00:00 2001 From: Mamy Ratsimbazafy Date: Sat, 25 Feb 2023 17:11:33 +0100 Subject: [PATCH] [Threadpool] Backoff revamp (#224) * Threadpool: eventcount didn't put threads to actual sleep :/ * rework task awaiter sleep to prevent use-after-free race condition after task completion * Need memory fence for StoreLoad synchronization ordering * update design doc * set memory order in sleep of eventcount * cleanup debug logs * comment cleanup [skip ci] --- .../threadpool/crossthread/backoff.nim | 183 ++++-------------- .../threadpool/crossthread/taskqueues.nim | 2 +- .../threadpool/crossthread/tasks_flowvars.nim | 136 ++++++++++--- .../platforms/threadpool/docs/design.md | 22 ++- .../threadpool/primitives/futexes_linux.nim | 46 +++-- .../threadpool/primitives/futexes_macos.nim | 25 ++- .../threadpool/primitives/futexes_windows.nim | 39 ++-- .../platforms/threadpool/threadpool.nim | 84 ++++---- 8 files changed, 274 insertions(+), 263 deletions(-) diff --git a/constantine/platforms/threadpool/crossthread/backoff.nim b/constantine/platforms/threadpool/crossthread/backoff.nim index f205859..421598f 100644 --- a/constantine/platforms/threadpool/crossthread/backoff.nim +++ b/constantine/platforms/threadpool/crossthread/backoff.nim @@ -10,72 +10,8 @@ import std/atomics, ../primitives/futexes -# We implement 2 datastructures to put threads to sleep: -# 1. An event notifier to put an awaiting thread to sleep when the task they require is worked on by another thread -# 2. An eventcount to put an idle thread to sleep - {.push raises:[], checks:off.} -# ############################################################ -# -# Event Notifier -# -# ############################################################ - -# Formal verification at: https://github.com/mratsim/weave/blob/7682784/formal_verification/event_notifiers.tla#L76-L109 - -type - EventNotifier* = object - ## Multi Producers, Single Consumer event notification - ## This is can be seen as a wait-free condition variable for producers - ## that avoids them spending time in expensive kernel land due to mutexes. - # ---- Consumer specific ---- - ticket{.align: 64.}: uint8 # A ticket for the consumer to sleep in a phase - # ---- Contention ---- no real need for padding as cache line should be reloaded in case of contention anyway - futex: Futex # A Futex (atomic int32 that can put thread to sleep) - phase: Atomic[uint8] # A binary timestamp, toggles between 0 and 1 (but there is no atomic "not") - signaled: Atomic[bool] # Signaling condition - -func initialize*(en: var EventNotifier) {.inline.} = - en.futex.initialize() - en.ticket = 0 - en.phase.store(0, moRelaxed) - en.signaled.store(false, moRelaxed) - -func `=destroy`*(en: var EventNotifier) {.inline.} = - en.futex.teardown() - -func `=copy`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be copied".} -func `=sink`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be moved".} - -func prepareToPark*(en: var EventNotifier) {.inline.} = - ## The consumer intends to sleep soon. - ## This must be called before the formal notification - ## via a channel. - if not en.signaled.load(moRelaxed): - en.ticket = en.phase.load(moRelaxed) - -proc park*(en: var EventNotifier) {.inline.} = - ## Wait until we are signaled of an event - ## Thread is parked and does not consume CPU resources - ## This may wakeup spuriously. - if not en.signaled.load(moRelaxed): - if en.ticket == en.phase.load(moRelaxed): - en.futex.wait(0) - en.signaled.store(false, moRelaxed) - en.futex.initialize() - -proc notify*(en: var EventNotifier) {.inline.} = - ## Signal a thread that it can be unparked - - if en.signaled.load(moRelaxed): - # Another producer is signaling - return - en.signaled.store(true, moRelease) - discard en.phase.fetchXor(1, moRelaxed) - en.futex.store(1, moRelease) - en.futex.wake() - # ############################################################ # # Eventcount @@ -85,8 +21,6 @@ proc notify*(en: var EventNotifier) {.inline.} = type Eventcount* = object ## The lock-free equivalent of a condition variable. - ## Supports up to 256 threads on 32-bit. - ## Supports up to 65536 threads on 64-bit. ## ## Usage, if a thread needs to be parked until a condition is true ## and signaled by another thread: @@ -102,122 +36,87 @@ type ## else: ## ec.sleep() ## ``` - - state: Atomic[uint] - # State is actually the equivalent of a bitfield - # type State = object - # when sizeof(uint) == 8: - # waiters {.bitsize: 16.}: uint16 - # preWaiters {.bitsize: 16.}: uint16 - # epoch {.bitsize: 32.}: uint32 - # else: - # waiters {.bitsize: 8.}: uint8 - # preWaiters {.bitsize: 8.}: uint8 - # epoch {.bitsize: 16.}: uint16 + waitset: Atomic[uint32] + # type waitset = object + # preSleep {.bitsize: 16.}: uint32 + # committedSleep {.bitsize: 16.}: uint32 # - # but there is no native fetchAdd for bitfields. - - futex: Futex - # Technically we could use the futex as the state. - # When you wait on a Futex, it waits only if the value of the futex - # matches with a reference value. - # But our reference value will be the epoch of notifications - # and it is non-trivial to zero-out the waiters bits. - # - One way could be to split a 64-bit number in 2 - # and cast the epoch part to Futex but that would only work on 64-bit CPU. - # - Another more hacky way would be to pad with a zero-out uint16 before and after the Futex - # and depending on big or little endian provide a shifted address as Futex. + # We need precise committed sleep count for the `syncAll` barrier because a `preSleep` waiter + # may steal a task and create more work. + events: Futex ParkingTicket* = object epoch: uint32 -const # bitfield - # On 32-bit - # Low 8 bits are waiters, up to 2⁸ = 256 threads are supported - # Next 8 bits are pre-waiters, planning to wait but not committed. - # Next 16 bits is the epoch. - # The epoch deals with the ABA problem - # - up to 65536 wake requests on 32-bit - # Epoch rolling over to 0 are not a problem, they won't change the low 16 bits. - # On 64-bit +const # bitfield setup # Low 16 bits are waiters, up to 2¹⁶ = 65536 threads are supported # Next 16 bits are pre-waiters, planning to wait but not committed. - # Next 32 bits is the epoch. - # The epoch deals with the ABA problem - # - up to 4 294 967 296 wake requests on 64-bit - # Epoch rolling over to 0 are not a problem, they won't change the low 16 bits. # # OS limitations: # - Windows 10 supports up to 256 cores (https://www.microsoft.com/en-us/microsoft-365/blog/2017/12/15/windows-10-pro-workstations-power-advanced-workloads/) # - Linux CPUSET supports up to 1024 threads (https://man7.org/linux/man-pages/man3/CPU_SET.3.html) # # Hardware limitations: - # - Xeon Platinum 9282, 56 cores - 112 threads - # - 8 sockets: 896 threads + # - Xeon Platinum 9282, 56 cores - 112 threads per socket + # - up to 8 sockets: 896 threads - scale = sizeof(uint) div 4 # 2 for 64-bit, 1 for 32-bit. - - kEpochShift = 16'u * scale - kPreWaitShift = 8'u * scale - - kEpoch = 1'u shl kEpochShift - kPreWait = 1'u shl kPreWaitShift - kWait = 1'u - kTransitionToWait = kWait - kPreWait - - kWaitMask = kPreWait-1 - kAnyWaiterMask = kEpoch-1 - kPreWaitMask = kAnyWaiterMask xor kWaitMask # 0x0000FF00 on 32-bit + kPreWaitShift = 8'u32 + kPreWait = 1'u32 shl kPreWaitShift + kWait = 1'u32 + kCommitToWait = kWait - kPreWait + kWaitMask = kPreWait-1 + kPreWaitMask = not kWaitMask func initialize*(ec: var EventCount) {.inline.} = - ec.state.store(0, moRelaxed) - ec.futex.initialize() + ec.waitset.store(0, moRelaxed) + ec.events.initialize() func `=destroy`*(ec: var EventCount) {.inline.} = - ec.futex.teardown() + ec.events.teardown() proc sleepy*(ec: var Eventcount): ParkingTicket {.noInit, inline.} = ## To be called before checking if the condition to not sleep is met. ## Returns a ticket to be used when committing to sleep - let prevState = ec.state.fetchAdd(kPreWait, moAcquireRelease) - result.epoch = uint32(prevState shr kEpochShift) + discard ec.waitset.fetchAdd(kPreWait, moRelease) + result.epoch = ec.events.load(moAcquire) proc sleep*(ec: var Eventcount, ticket: ParkingTicket) {.inline.} = ## Put a thread to sleep until notified. - ## If the ticket becomes invalid (a notfication has been received) + ## If the ticket becomes invalid (a notification has been received) ## by the time sleep is called, the thread won't enter sleep - discard ec.state.fetchAdd(kTransitionToWait, moAcquireRelease) + discard ec.waitset.fetchAdd(kCommitToWait, moRelease) - while ec.state.load(moAcquire) shr kEpochShift == ticket.epoch: - ec.futex.wait(ticket.epoch) # We don't use the futex internal value + while ec.events.load(moAcquire) == ticket.epoch: + ec.events.wait(ticket.epoch) - let prev {.used.} = ec.state.fetchSub(kWait, moRelaxed) + discard ec.waitset.fetchSub(kWait, moRelease) proc cancelSleep*(ec: var Eventcount) {.inline.} = ## Cancel a sleep that was scheduled. - let prev {.used.} = ec.state.fetchSub(kPreWait, moRelaxed) + discard ec.waitset.fetchSub(kPreWait, moRelease) proc wake*(ec: var EventCount) {.inline.} = - ## Wake a thread if at least 1 is parked - let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease) - if (prev and kPreWaitMask) != 0: - # Some threads are in prewait and will see the epoch change + ## Prevent an idle thread from sleeping + ## or wait a sleeping one if there wasn't any idle + discard ec.events.increment(1, moRelease) + let waiters = ec.waitset.load(moAcquire) + if (waiters and kPreWaitMask) != 0: + # Some threads are in prewait and will see the event count change # no need to do an expensive syscall return - if (prev and kWaitMask) != 0: - ec.futex.wake() + if waiters != 0: + ec.events.wake() proc wakeAll*(ec: var EventCount) {.inline.} = ## Wake all threads if at least 1 is parked - let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease) - if (prev and kWaitMask) != 0: - ec.futex.wakeAll() + discard ec.events.increment(1, moRelease) + let waiters = ec.waitset.load(moAcquire) + if (waiters and kWaitMask) != 0: + ec.events.wakeAll() proc getNumWaiters*(ec: var EventCount): tuple[preSleep, committedSleep: int32] {.noInit, inline.} = ## Get the number of idle threads: - ## (planningToSleep, committedToSleep) - let waiters = ec.state.load(moAcquire) + ## (preSleep, committedSleep) + let waiters = ec.waitset.load(moAcquire) result.preSleep = cast[int32]((waiters and kPreWaitMask) shr kPreWaitShift) result.committedSleep = cast[int32](waiters and kWaitMask) - -{.pop.} # {.push raises:[], checks:off.} \ No newline at end of file diff --git a/constantine/platforms/threadpool/crossthread/taskqueues.nim b/constantine/platforms/threadpool/crossthread/taskqueues.nim index 1aabf6d..cd30763 100644 --- a/constantine/platforms/threadpool/crossthread/taskqueues.nim +++ b/constantine/platforms/threadpool/crossthread/taskqueues.nim @@ -210,4 +210,4 @@ proc steal*(thiefID: int32, tq: var Taskqueue): ptr Task = if not compareExchange(tq.front, f, f+1, moSequentiallyConsistent, moRelaxed): # Failed race. return nil - result.thiefID.store(thiefID, moRelease) + result.setThief(thiefID) diff --git a/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim b/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim index 07b0c2f..92337e9 100644 --- a/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim +++ b/constantine/platforms/threadpool/crossthread/tasks_flowvars.nim @@ -10,7 +10,7 @@ import std/atomics, ../instrumentation, ../../allocs, - ./backoff + ../primitives/futexes # Tasks have an efficient design so that a single heap allocation # is required per `spawn`. @@ -26,19 +26,33 @@ import # (The name future is already used for IO scheduling) type + TaskState = object + ## This state allows synchronization between: + ## - a waiter that may sleep if no work and task is incomplete + ## - a thief that completes the task + ## - a waiter that frees task memory + ## - a waiter that will pick up the task continuation + ## + ## Supports up to 2¹⁵ = 32768 threads + completed: Futex + synchro: Atomic[uint32] + # type synchro = object + # canBeFreed {.bitsize: 1.}: uint32 - Transfer ownership from thief to waiter + # pad {.bitsize: 1.}: uint32 + # waiterID {.bitsize: 15.}: uint32 - ID of the waiter blocked on the task completion. + # thiefID {.bitsize: 15.}: uint32 - ID of the worker that stole and run the task. For leapfrogging. + Task* = object # Synchronization # ------------------ - parent*: ptr Task # When a task is awaiting, a thread can quickly prioritize the direct child of a task - thiefID*: Atomic[int32] # ID of the worker that stole and run the task. For leapfrogging. - hasFuture*: bool # Ownership: if a task has a future, the future deallocates it. Otherwise the worker thread does. - completed*: Atomic[bool] - waiter*: Atomic[ptr EventNotifier] + state: TaskState + parent*: ptr Task # Latency: When a task is awaited, a thread can quickly prioritize its direct children. + hasFuture*: bool # Ownership: if a task has a future, the future deallocates it. Otherwise the worker thread does. # Data parallelism # ------------------ - isFirstIter*: bool # Awaitable for-loops return true for first iter. Loops are split before first iter. - envSize*: int32 # This is used for splittable loop + isFirstIter*: bool # Load-Balancing: New loops are split before first iter. Split loops are run once before reconsidering split. + envSize*: int32 # Metadata: In splittable loops we need to copy the `env` upon splitting loopStart*: int loopStop*: int loopStride*: int @@ -65,10 +79,89 @@ type task*: ptr Task next*: ptr ReductionDagNode -# Tasks +# Task State # ------------------------------------------------------------------------- -const SentinelThief* = 0xFACADE'i32 +# Tasks have the following lifecycle: +# - A task creator that schedule a task on its queue +# - A task runner, task creator or thief, that runs the task +# - Once the task is finished: +# - if the task has no future, the task runner frees the task +# - if the task has a future, +# - the task runner can immediately pick up new work +# - the awaiting thread frees the task +# - the awaiting thread might be sleeping and need to be woken up. +# +# There is a delicate dance as we are need to prevent 2 issues: +# +# 1. A deadlock: if the waiter is never woken up after the thief completes the task +# 2. A use-after-free: if the thief tries to access the task after the waiter frees it. +# +# To solve 1, we need to set a `completed` flag, then check again if the waiter parked before. +# To solve 2, we either need to ensure that after the `completed` flag is set, the task runner +# doesn't access the task anymore which is impossible due to 1; +# or we have the waiter spinlock on another flag `canBeFreed`. + +const # bitfield setup + kCanBeFreedShift = 31 + kCanBeFreed = 1'u32 shl kCanBeFreedShift + kCanBeFreedMask = kCanBeFreed # 0x80000000 + + kWaiterShift = 15 + kThiefMask = (1'u32 shl kWaiterShift) - 1 # 0x00007FFF + kWaiterMask = kThiefMask shl kWaiterShift # 0x3FFF8000 + + SentinelWaiter = high(uint32) and kWaiterMask + SentinelThief* = high(uint32) and kThiefMask + +proc initSynchroState*(task: ptr Task) {.inline.} = + task.state.completed.store(0, moRelaxed) + task.state.synchro.store(SentinelWaiter or SentinelThief, moRelaxed) + +# Flowvar synchronization +# ----------------------- + +proc isGcReady*(task: ptr Task): bool {.inline.} = + ## Check if task can be freed by the waiter if it was stolen + (task.state.synchro.load(moAcquire) and kCanBeFreedMask) != 0 + +proc setGcReady*(task: ptr Task) {.inline.} = + ## Thief transfers full task ownership to waiter + discard task.state.synchro.fetchAdd(kCanBeFreed, moRelease) + +proc isCompleted*(task: ptr Task): bool {.inline.} = + ## Check task completion + task.state.completed.load(moAcquire) != 0 + +proc setCompleted*(task: ptr Task) {.inline.} = + ## Set a task to `complete` + ## Wake a waiter thread if there is one + task.state.completed.store(1, moRelaxed) + fence(moSequentiallyConsistent) + let waiter = task.state.synchro.load(moRelaxed) + if (waiter and kWaiterMask) != SentinelWaiter: + task.state.completed.wake() + +proc sleepUntilComplete*(task: ptr Task, waiterID: int32) {.inline.} = + ## Sleep while waiting for task completion + let waiter = (cast[uint32](waiterID) shl kWaiterShift) - SentinelWaiter + discard task.state.synchro.fetchAdd(waiter, moRelaxed) + fence(moAcquire) + while task.state.completed.load(moRelaxed) == 0: + task.state.completed.wait(0) + +# Leapfrogging synchronization +# ---------------------------- + +proc getThief*(task: ptr Task): uint32 {.inline.} = + task.state.synchro.load(moAcquire) and kThiefMask + +proc setThief*(task: ptr Task, thiefID: int32) {.inline.} = + let thief = cast[uint32](thiefID) - SentinelThief + discard task.state.synchro.fetchAdd(thief, moRelease) + +# Tasks +# ------------------------------------------------------------------------- proc newSpawn*( T: typedesc[Task], @@ -79,11 +172,9 @@ proc newSpawn*( const size = sizeof(T) result = allocHeapUnchecked(T, size) + result.initSynchroState() result.parent = parent - result.thiefID.store(SentinelThief, moRelaxed) result.hasFuture = false - result.completed.store(false, moRelaxed) - result.waiter.store(nil, moRelaxed) result.fn = fn proc newSpawn*( @@ -96,11 +187,9 @@ proc newSpawn*( sizeof(env) result = allocHeapUnchecked(T, size) + result.initSynchroState() result.parent = parent - result.thiefID.store(SentinelThief, moRelaxed) result.hasFuture = false - result.completed.store(false, moRelaxed) - result.waiter.store(nil, moRelaxed) result.fn = fn cast[ptr[type env]](result.env)[] = env @@ -120,11 +209,9 @@ proc newLoop*( preCondition: start < stop result = allocHeapUnchecked(T, size) + result.initSynchroState() result.parent = parent - result.thiefID.store(SentinelThief, moRelaxed) result.hasFuture = false - result.completed.store(false, moRelaxed) - result.waiter.store(nil, moRelaxed) result.fn = fn result.envSize = 0 @@ -148,11 +235,9 @@ proc newLoop*( preCondition: start < stop result = allocHeapUnchecked(T, size) + result.initSynchroState() result.parent = parent - result.thiefID.store(SentinelThief, moRelaxed) result.hasFuture = false - result.completed.store(false, moRelaxed) - result.waiter.store(nil, moRelaxed) result.fn = fn result.envSize = int32(sizeof(env)) cast[ptr[type env]](result.env)[] = env @@ -180,6 +265,8 @@ proc newFlowVar*(T: typedesc, task: ptr Task): Flowvar[T] {.inline.} = cast[ptr ptr Task](task.env.addr)[] = task proc cleanup*(fv: var Flowvar) {.inline.} = + while not fv.task.isGcReady(): + cpuRelax() fv.task.freeHeap() fv.task = nil @@ -195,14 +282,13 @@ func isReady*[T](fv: Flowvar[T]): bool {.inline.} = ## In that case `sync` will not block. ## Otherwise the current will block to help on all the pending tasks ## until the Flowvar is ready. - fv.task.completed.load(moAcquire) + fv.task.isCompleted() func readyWith*[T](task: ptr Task, childResult: T) {.inline.} = ## Send the Flowvar result from the child thread processing the task ## to its parent thread. - precondition: not task.completed.load(moAcquire) + precondition: not task.isCompleted() cast[ptr (ptr Task, T)](task.env.addr)[1] = childResult - task.completed.store(true, moRelease) proc sync*[T](fv: sink Flowvar[T]): T {.noInit, inline, gcsafe.} = ## Blocks the current thread until the flowvar is available diff --git a/constantine/platforms/threadpool/docs/design.md b/constantine/platforms/threadpool/docs/design.md index e3ce1c7..2f37c24 100644 --- a/constantine/platforms/threadpool/docs/design.md +++ b/constantine/platforms/threadpool/docs/design.md @@ -113,7 +113,8 @@ Instead we can have each thread start working and use backpressure to lazily eva ### Backoff workers when awaiting a future This problem is quite tricky: -- For latency we want the worker to continue as soon as the future is completed. This might also create more work and expose more parallelism opportunities (in recursive divide-and-conquer algorithms for example) +- For latency we want the worker to continue as soon as the future is completed. This might also create more work and expose more parallelism opportunities (in recursive divide-and-conquer algorithms for example).\ + Note that with hyperthreading, the sibling thread(s) can still use the core fully so throughput might not be impacted. - For throughput, and because a scheduler is optimal only when greedy (i.e. within 2x of the best schedule, see Cilk paper), we want an idle thread to take any available work ASAP. - but what if that worker ends up with work that blocks it for a long-time? It may lead to work starvation. - There is no robust, cross-platform API, to wake a specific thread awaiting on a futex or condition variable. @@ -136,12 +137,13 @@ This problem is quite tricky: - Using continuations: We could just store a continuation in the future so the thread that completes the future picks up the continuation. -The workaround for now is just to work on any available tasks to maximize throughput then sleep. -This has 2 disadvantages: -- For coarse-grain parallelism, if the waiting thread steals a long task. - However, coarse-grain parallelism is easier to split into multiple tasks, - while exposing fine-grain parallelism and properly handling with its overhead is the usual problem. -- As we can't wake a specific thread on a common futex or condition variable, - when awaiting a future, a thread sleeps on a local one. - Hence if no work can be stolen, the waiter sleeps. But if work is then recreated, it stays sleeping as only the global futex is notified. - Note that with hyperthreading, the sibling thread(s) can still use the core fully so throughput might not be impacted. +Besides design issues, there are also engineering issues as we can't wake a specific thread on a common futex or condition variable. +- Either a thread sleeps on a locally owned one, but how to communicate its address to the thief? + And how to synchronize freeing the task memory? + In particular, if we use the task as the medium, how to avoid race condition where: + task is completed by thief, task memory is freed by waiter, thief tries to get the waiter futex/condition variable + and triggers a use-after-free. +- or its sleeps on the global and each stolen completed task triggers a wakeAll +- or we create a backoff data structure where specific waiters can be woken up. + +Our solution is to embed the backoff structure in the task and add an additional flag to notify when the task can be freed safely. \ No newline at end of file diff --git a/constantine/platforms/threadpool/primitives/futexes_linux.nim b/constantine/platforms/threadpool/primitives/futexes_linux.nim index 8fd5a25..149723b 100644 --- a/constantine/platforms/threadpool/primitives/futexes_linux.nim +++ b/constantine/platforms/threadpool/primitives/futexes_linux.nim @@ -12,21 +12,32 @@ import std/atomics export MemoryOrder -type - Futex* = object - value: Atomic[uint32] - FutexOp = distinct cint +# OS primitives +# ------------------------------------------------------------------------ -var NR_Futex {.importc: "__NR_futex", header: "".}: cint -var FutexWaitPrivate {.importc:"FUTEX_WAIT_PRIVATE", header: "".}: FutexOp -var FutexWakePrivate {.importc:"FUTEX_WAKE_PRIVATE", header: "".}: FutexOp + + +const + NR_Futex = 202 + + FUTEX_WAIT_PRIVATE = 128 + FUTEX_WAKE_PRIVATE = 129 proc syscall(sysno: clong): cint {.header:"", varargs.} proc sysFutex( - futex: var Futex, op: FutexOp, val1: cuint or cint, + futexAddr: pointer, operation: uint32, expected: uint32 or int32, timeout: pointer = nil, val2: pointer = nil, val3: cint = 0): cint {.inline.} = - syscall(NR_Futex, futex.value.addr, op, val1, timeout, val2, val3) + ## See https://web.archive.org/web/20230208151430/http://locklessinc.com/articles/futex_cheat_sheet/ + ## and https://www.akkadia.org/drepper/futex.pdf + syscall(NR_Futex, futexAddr, operation, expected, timeout, val2, val3) + +# Futex API +# ------------------------------------------------------------------------ + +type + Futex* = object + value: Atomic[uint32] proc initialize*(futex: var Futex) {.inline.} = futex.value.store(0, moRelaxed) @@ -37,19 +48,20 @@ proc teardown*(futex: var Futex) {.inline.} = proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} = futex.value.load(order) -proc loadMut*(futex: var Futex): var Atomic[uint32] {.inline.} = - futex.value - proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} = futex.value.store(value, order) -proc wait*(futex: var Futex, refVal: uint32) {.inline.} = - ## Suspend a thread if the value of the futex is the same as refVal. +proc increment*(futex: var Futex, value: uint32, order: MemoryOrder): uint32 {.inline.} = + ## Increment a futex value, returns the previous one. + futex.value.fetchAdd(value, order) + +proc wait*(futex: var Futex, expected: uint32) {.inline.} = + ## Suspend a thread if the value of the futex is the same as expected. # Returns 0 in case of a successful suspend # If value are different, it returns EWOULDBLOCK # We discard as this is not needed and simplifies compat with Windows futex - discard sysFutex(futex, FutexWaitPrivate, refVal) + discard sysFutex(futex.value.addr, FutexWaitPrivate, expected) proc wake*(futex: var Futex) {.inline.} = ## Wake one thread (from the same process) @@ -57,7 +69,7 @@ proc wake*(futex: var Futex) {.inline.} = # Returns the number of actually woken threads # or a Posix error code (if negative) # We discard as this is not needed and simplifies compat with Windows futex - discard sysFutex(futex, FutexWakePrivate, 1) + discard sysFutex(futex.value.addr, FutexWakePrivate, 1) proc wakeAll*(futex: var Futex) {.inline.} = ## Wake all threads (from the same process) @@ -65,4 +77,4 @@ proc wakeAll*(futex: var Futex) {.inline.} = # Returns the number of actually woken threads # or a Posix error code (if negative) # We discard as this is not needed and simplifies compat with Windows futex - discard sysFutex(futex, FutexWakePrivate, high(int32)) \ No newline at end of file + discard sysFutex(futex.value.addr, FutexWakePrivate, high(int32)) \ No newline at end of file diff --git a/constantine/platforms/threadpool/primitives/futexes_macos.nim b/constantine/platforms/threadpool/primitives/futexes_macos.nim index f6141de..4d1ce24 100644 --- a/constantine/platforms/threadpool/primitives/futexes_macos.nim +++ b/constantine/platforms/threadpool/primitives/futexes_macos.nim @@ -7,7 +7,10 @@ import std/atomics -# A wrapper for Darwin futex. +# OS primitives +# ------------------------------------------------------------------------ + +# Darwin futexes. # They are used in libc++ so likely to be very stable. # A new API appeared in OSX Big Sur (Jan 2021) ulock_wait2 and macOS pthread_cond_t has been migrated to it # - https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6 @@ -73,10 +76,13 @@ const ULF_WAKE_MASK = ULF_NO_ERRNO or ULF_WAKE_THREAD or ULF_WAKE_ALLOW_NON_OWNER -proc ulock_wait(operation: uint32, address: pointer, value: uint64, timeout: uint32): cint {.importc:"__ulock_wait", cdecl.} -proc ulock_wait2(operation: uint32, address: pointer, value: uint64, timeout, value2: uint64): cint {.importc:"__ulock_wait2", cdecl.} +proc ulock_wait(operation: uint32, address: pointer, expected: uint64, timeout: uint32): cint {.importc:"__ulock_wait", cdecl.} +proc ulock_wait2(operation: uint32, address: pointer, expected: uint64, timeout, value2: uint64): cint {.importc:"__ulock_wait2", cdecl.} proc ulock_wake(operation: uint32, address: pointer, wake_value: uint64): cint {.importc:"__ulock_wake", cdecl.} +# Futex API +# ------------------------------------------------------------------------ + type Futex* = object value: Atomic[uint32] @@ -90,15 +96,16 @@ proc teardown*(futex: var Futex) {.inline.} = proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} = futex.value.load(order) -proc loadMut*(futex: var Futex): var Atomic[uint32] {.inline.} = - futex.value - proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} = futex.value.store(value, order) -proc wait*(futex: var Futex, refVal: uint32) {.inline.} = - ## Suspend a thread if the value of the futex is the same as refVal. - discard ulock_wait(UL_UNFAIR_LOCK64_SHARED or ULF_NO_ERRNO, futex.value.addr, uint64 refVal, 0) +proc increment*(futex: var Futex, value: uint32, order: MemoryOrder): uint32 {.inline.} = + ## Increment a futex value, returns the previous one. + futex.value.fetchAdd(value, order) + +proc wait*(futex: var Futex, expected: uint32) {.inline.} = + ## Suspend a thread if the value of the futex is the same as expected. + discard ulock_wait(UL_UNFAIR_LOCK64_SHARED or ULF_NO_ERRNO, futex.value.addr, uint64 expected, 0) proc wake*(futex: var Futex) {.inline.} = ## Wake one thread (from the same process) diff --git a/constantine/platforms/threadpool/primitives/futexes_windows.nim b/constantine/platforms/threadpool/primitives/futexes_windows.nim index 0b0fca0..2f082d3 100644 --- a/constantine/platforms/threadpool/primitives/futexes_windows.nim +++ b/constantine/platforms/threadpool/primitives/futexes_windows.nim @@ -8,21 +8,12 @@ # An implementation of futex using Windows primitives import std/atomics, winlean -export MemoryOrder -type - Futex* = object - value: Atomic[uint32] +# OS primitives +# ------------------------------------------------------------------------ # Contrary to the documentation, the futex related primitives are NOT in kernel32.dll # but in API-MS-Win-Core-Synch-l1-2-0.dll ¯\_(ツ)_/¯ - -proc initialize*(futex: var Futex) {.inline.} = - futex.value.store(0, moRelaxed) - -proc teardown*(futex: var Futex) {.inline.} = - futex.value.store(0, moRelaxed) - proc WaitOnAddress( Address: pointer, CompareAddress: pointer, AddressSize: csize_t, dwMilliseconds: DWORD @@ -32,23 +23,37 @@ proc WaitOnAddress( proc WakeByAddressSingle(Address: pointer) {.importc, stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0.dll".} proc WakeByAddressAll(Address: pointer) {.importc, stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0.dll".} +# Futex API +# ------------------------------------------------------------------------ + +type + Futex* = object + value: Atomic[uint32] + +proc initialize*(futex: var Futex) {.inline.} = + futex.value.store(0, moRelaxed) + +proc teardown*(futex: var Futex) {.inline.} = + futex.value.store(0, moRelaxed) + proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} = futex.value.load(order) -proc loadMut*(futex: var Futex): var Atomic[uint32] {.inline.} = - futex.value - proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} = futex.value.store(value, order) -proc wait*(futex: var Futex, refVal: uint32) {.inline.} = - ## Suspend a thread if the value of the futex is the same as refVal. +proc increment*(futex: var Futex, value: uint32, order: MemoryOrder): uint32 {.inline.} = + ## Increment a futex value, returns the previous one. + futex.value.fetchAdd(value, order) + +proc wait*(futex: var Futex, expected: uint32) {.inline.} = + ## Suspend a thread if the value of the futex is the same as expected. # Returns TRUE if the wait succeeds or FALSE if not. # getLastError() will contain the error information, for example # if it failed due to a timeout. # We discard as this is not needed and simplifies compat with Linux futex - discard WaitOnAddress(futex.value.addr, refVal.unsafeAddr, csize_t sizeof(refVal), INFINITE) + discard WaitOnAddress(futex.value.addr, expected.unsafeAddr, csize_t sizeof(expected), INFINITE) proc wake*(futex: var Futex) {.inline.} = ## Wake one thread (from the same process) diff --git a/constantine/platforms/threadpool/threadpool.nim b/constantine/platforms/threadpool/threadpool.nim index 316aefd..ca09640 100644 --- a/constantine/platforms/threadpool/threadpool.nim +++ b/constantine/platforms/threadpool/threadpool.nim @@ -150,7 +150,6 @@ type currentTask: ptr Task # Synchronization - localBackoff: EventNotifier # Multi-Producer Single-Consumer backoff signal: ptr Signal # owned signal # Thefts @@ -184,6 +183,7 @@ var workerContext {.threadvar.}: WorkerContext ## and use a Minimal Perfect Hash Function. ## We can approximate a threadID by retrieving the address of a dummy thread-local variable. ## - Or we sort threadID and use binary search + ## The FlowVar would also need to store the Threadpool proc setupWorker(ctx: var WorkerContext) = ## Initialize the thread-local context of a worker @@ -197,7 +197,6 @@ proc setupWorker(ctx: var WorkerContext) = ctx.rng.seed(0xEFFACED + ctx.id) # Synchronization - ctx.localBackoff.initialize() ctx.signal = addr ctx.threadpool.workerSignals[ctx.id] ctx.signal.terminate.store(false, moRelaxed) @@ -210,7 +209,6 @@ proc setupWorker(ctx: var WorkerContext) = proc teardownWorker(ctx: var WorkerContext) = ## Cleanup the thread-local context of a worker - ctx.localBackoff.`=destroy`() ctx.taskqueue[].teardown() proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.} @@ -249,32 +247,34 @@ proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises # ############################################################ # Sentinel values -const ReadyFuture = cast[ptr EventNotifier](0xCA11AB1E) const RootTask = cast[ptr Task](0xEFFACED0) proc run*(ctx: var WorkerContext, task: ptr Task) {.raises:[].} = ## Run a task, frees it if it is not owned by a Flowvar let suspendedTask = ctx.currentTask ctx.currentTask = task - debug: log("Worker %3d: running task 0x%.08x (previous: 0x%.08x, %d pending, thiefID %d)\n", ctx.id, task, suspendedTask, ctx.taskqueue[].peek(), task.thiefID) + debug: log("Worker %3d: running task 0x%.08x (previous: 0x%.08x, %d pending, thiefID %d)\n", ctx.id, task, suspendedTask, ctx.taskqueue[].peek(), task.getThief()) task.fn(task.env.addr) debug: log("Worker %3d: completed task 0x%.08x (%d pending)\n", ctx.id, task, ctx.taskqueue[].peek()) ctx.currentTask = suspendedTask - if not task.hasFuture: + + if not task.hasFuture: # Are we the final owner? + debug: log("Worker %3d: freeing task 0x%.08x with no future\n", ctx.id, task) freeHeap(task) return + # Sync with an awaiting thread in completeFuture that didn't find work - var expected = (ptr EventNotifier)(nil) - if not compareExchange(task.waiter, expected, desired = ReadyFuture, moAcquireRelease): - debug: log("Worker %3d: completed task 0x%.08x, notifying waiter 0x%.08x\n", ctx.id, task, expected) - expected[].notify() + # and transfer ownership of the task to it. + debug: log("Worker %3d: transfering task 0x%.08x to future holder\n", ctx.id, task) + task.setCompleted() + task.setGcReady() proc schedule(ctx: var WorkerContext, tn: ptr Task, forceWake = false) {.inline.} = ## Schedule a task in the threadpool ## This wakes a sibling thread if our local queue is empty ## or forceWake is true. - debug: log("Worker %3d: schedule task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, tn, tn.parent, ctx.currentTask) + debug: log("Worker %3d: schedule task 0x%.08x (parent/current task 0x%.08x)\n", ctx.id, tn, tn.parent) # Instead of notifying every time a task is scheduled, we notify # only when the worker queue is empty. This is a good approximation @@ -308,7 +308,7 @@ proc schedule(ctx: var WorkerContext, tn: ptr Task, forceWake = false) {.inline. iterator splitUpperRanges( ctx: WorkerContext, task: ptr Task, - curLoopIndex: int, numIdle: int32 + curLoopIndex: int, approxIdle: int32 ): tuple[start, size: int] = ## Split the iteration range based on the number of idle threads ## returns chunks with parameters (start, stopEx, len) @@ -338,10 +338,10 @@ iterator splitUpperRanges( debugSplit: log("Worker %3d: task 0x%.08x - %8d step(s) left (current: %3d, start: %3d, stop: %3d, stride: %3d, %3d idle worker(s))\n", - ctx.id, task, task.loopStepsLeft, curLoopIndex, task.loopStart, task.loopStop, task.loopStride, numIdle) + ctx.id, task, task.loopStepsLeft, curLoopIndex, task.loopStart, task.loopStop, task.loopStride, approxIdle) # Send a chunk of work to all idle workers + ourselves - let availableWorkers = cast[int](numIdle + 1) + let availableWorkers = cast[int](approxIdle + 1) let baseChunkSize = task.loopStepsLeft div availableWorkers let cutoff = task.loopStepsLeft mod availableWorkers @@ -405,19 +405,18 @@ func decrease(backoff: var BalancerBackoff) {.inline.} = if backoff.windowLogSize < 0: backoff.windowLogSize = 0 -proc splitAndDispatchLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int, numIdle: int32) = +proc splitAndDispatchLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int, approxIdle: int32) = # The iterator mutates the task with the first chunk metadata let stop = task.loopStop - for (offset, numSteps) in ctx.splitUpperRanges(task, curLoopIndex, numIdle): + for (offset, numSteps) in ctx.splitUpperRanges(task, curLoopIndex, approxIdle): if numSteps == 0: break let upperSplit = allocHeapUnchecked(Task, sizeof(Task) + task.envSize) copyMem(upperSplit, task, sizeof(Task) + task.envSize) + upperSplit.initSynchroState() upperSplit.parent = task - upperSplit.thiefID.store(SentinelThief, moRelaxed) - upperSplit.waiter.store(nil, moRelaxed) upperSplit.isFirstIter = false upperSplit.loopStart = offset @@ -446,9 +445,9 @@ proc loadBalanceLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int, if ctx.taskqueue[].peek() == 0: let waiters = ctx.threadpool.globalBackoff.getNumWaiters() # We assume that the worker that scheduled the task will work on it. I.e. idleness is underestimated. - let numIdle = waiters.preSleep + waiters.committedSleep + cast[int32](task.isFirstIter) - if numIdle > 0: - ctx.splitAndDispatchLoop(task, curLoopIndex, numIdle) + let approxIdle = waiters.preSleep + waiters.committedSleep + cast[int32](task.isFirstIter) + if approxIdle > 0: + ctx.splitAndDispatchLoop(task, curLoopIndex, approxIdle) backoff.decrease() else: backoff.increase() @@ -547,15 +546,13 @@ proc tryLeapfrog(ctx: var WorkerContext, awaitedTask: ptr Task): ptr Task = var thiefID = SentinelThief while true: - debug: log("Worker %3d: leapfrogging - waiting for thief of task 0x%.08x to publish their ID\n", ctx.id, awaitedTask) - thiefID = awaitedTask.thiefID.load(moAcquire) + debug: log("Worker %3d: leapfrogging - waiting for thief of task 0x%.08x to publish their ID (thiefID read %d)\n", ctx.id, awaitedTask, thiefID) + thiefID = awaitedTask.getThief() if thiefID != SentinelThief: break cpuRelax() ascertain: 0 <= thiefID and thiefID < ctx.threadpool.numThreads - # Leapfrogging is used when completing a future, so steal only one task - # and don't leave tasks stranded in our queue. let leapTask = ctx.id.steal(ctx.threadpool.workerQueues[thiefID]) if not leapTask.isNil(): # Theft successful, there might be more work for idle threads, wake one @@ -588,9 +585,9 @@ proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.} = break else: # 2.c Park the thread until a new task enters the threadpool - debug: log("Worker %3d: eventLoop 2.b - sleeping\n", ctx.id) + debugTermination: log("Worker %3d: eventLoop 2.b - sleeping\n", ctx.id) ctx.threadpool.globalBackoff.sleep(ticket) - debug: log("Worker %3d: eventLoop 2.b - waking\n", ctx.id) + debugTermination: log("Worker %3d: eventLoop 2.b - waking\n", ctx.id) # ############################################################ # # @@ -603,7 +600,7 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = template ctx: untyped = workerContext template isFutReady(): untyped = - let isReady = fv.task.completed.load(moAcquire) + let isReady = fv.task.isCompleted() if isReady: parentResult = cast[ptr (ptr Task, T)](fv.task.env.addr)[1] isReady @@ -611,9 +608,8 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = if isFutReady(): return - ## 1. Process all the children of the current tasks. - ## This ensures that we can give control back ASAP. - debug: log("Worker %3d: sync 1 - searching task from local queue\n", ctx.id) + ## 1. Process all the children of the current tasks first, ignoring the rest. + debug: log("Worker %3d: sync 1 - searching task from local queue (awaitedTask 0x%.08x)\n", ctx.id, fv.task) while (let task = ctx.taskqueue[].pop(); not task.isNil): if task.parent != ctx.currentTask: debug: log("Worker %3d: sync 1 - skipping non-direct descendant task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask) @@ -622,7 +618,7 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = debug: log("Worker %3d: sync 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask) ctx.run(task) if isFutReady(): - debug: log("Worker %3d: sync 1 - future ready, exiting\n", ctx.id) + debug: log("Worker %3d: sync 1 - future ready, exiting (awaitedTask 0x%.08x)\n", ctx.id, fv.task) return # 2. We run out-of-tasks or out-of-direct-child of our current awaited task @@ -636,6 +632,9 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = # # Design tradeoffs # ---------------- + # see ./docs/design.md + # + # Rundown: # # At this point, we have significant design decisions: # - Do we steal from other workers in hope we advance our awaited task? @@ -646,6 +645,10 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = # Note: With hyperthreading, real hardware resources are 2x less than the reported number of cores. # Hence parking might free contended memory bandwitdh or execution ports. # - Do we just not sleep, potentially wasting energy? + # Note: on "empty tasks" (like Fibonacci), the constant hammering of threads + # actually slows down performance by 2x. + # This is a realistic scenario for IO (waiting to copy a network buffer for example) + # but the second almost-empty benchmark, depth-first-search is actually faster by 17%. # # - If we work, we maximize throughput, but we increase latency to handle the future's continuation. # If that continuation would have created more parallel work, we would actually have restricted parallelism. @@ -657,31 +660,28 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} = # in theory maintains throughput and minimize the latency of the future's continuation # but in practice, performance can worsen significantly on fine-grained parallelism. - debug: log("Worker %3d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x)\n", ctx.id, ctx.currentTask) + debug: log("Worker %3d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x, awaitedTask 0x%.08x)\n", ctx.id, ctx.currentTask, fv.task) while not isFutReady(): - if (let leapTask = ctx.tryLeapfrog(fv.task); not leapTask.isNil): # Leapfrogging, the thief had an empty queue, hence if there are tasks in its queue, it's generated by our blocked task. # Help the thief clear those, as if it did not finish, it's likely blocked on those children tasks. - debug: log("Worker %3d: sync 2.1 - leapfrog task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, leapTask, leapTask.parent, ctx.currentTask) + debug: log("Worker %3d: sync 2.1 - leapfrog task 0x%.08x (parent 0x%.08x, current 0x%.08x, awaitedTask 0x%.08x)\n", ctx.id, leapTask, leapTask.parent, ctx.currentTask, fv.task) ctx.run(leapTask) elif (let stolenTask = ctx.tryStealOne(); not stolenTask.isNil): # We stole a task, we hope we advance our awaited task. - debug: log("Worker %3d: sync 2.2 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) + debug: log("Worker %3d: sync 2.2 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x, awaitedTask 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask, fv.task) ctx.run(stolenTask) elif (let ownTask = ctx.taskqueue[].pop(); not ownTask.isNil): # We advance our own queue, this increases global throughput but may impact latency on the awaited task. - debug: log("Worker %3d: sync 2.3 - couldn't steal, running own task\n", ctx.id) + debug: log("Worker %3d: sync 2.3 - couldn't steal, running own task (awaitedTask 0x%.08x)\n", ctx.id, fv.task) ctx.run(ownTask) else: # Nothing to do, we park. # - On today's hyperthreaded systems, this might reduce contention on a core resources like memory caches and execution ports # - If more work is created, we won't be notified as we need to park on a dedicated notifier for precise wakeup when future is ready - ctx.localBackoff.prepareToPark() - - var expected = (ptr EventNotifier)(nil) - if compareExchange(fv.task.waiter, expected, desired = ctx.localBackoff.addr, moAcquireRelease): - ctx.localBackoff.park() + debugTermination: log("Worker %3d: sync 2.4 - Empty runtime, parking (awaitedTask 0x%.08x)\n", ctx.id, fv.task) + fv.task.sleepUntilComplete(ctx.id) + debugTermination: log("Worker %3d: sync 2.4 - signaled, waking (awaitedTask 0x%.08x)\n", ctx.id, fv.task) proc syncAll*(tp: Threadpool) {.raises: [].} = ## Blocks until all pending tasks are completed