[Threadpool] Backoff revamp (#224)

* Threadpool: eventcount didn't put threads to actual sleep :/

* rework task awaiter sleep to prevent use-after-free race condition after task completion

* Need memory fence for StoreLoad synchronization ordering

* update design doc

* set memory order in sleep of eventcount

* cleanup debug logs

* comment cleanup [skip ci]
This commit is contained in:
Mamy Ratsimbazafy 2023-02-25 17:11:33 +01:00 committed by GitHub
parent 1dfbb8bd4f
commit 1cb6c3d9e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 274 additions and 263 deletions

View File

@ -10,72 +10,8 @@ import
std/atomics, std/atomics,
../primitives/futexes ../primitives/futexes
# We implement 2 datastructures to put threads to sleep:
# 1. An event notifier to put an awaiting thread to sleep when the task they require is worked on by another thread
# 2. An eventcount to put an idle thread to sleep
{.push raises:[], checks:off.} {.push raises:[], checks:off.}
# ############################################################
#
# Event Notifier
#
# ############################################################
# Formal verification at: https://github.com/mratsim/weave/blob/7682784/formal_verification/event_notifiers.tla#L76-L109
type
EventNotifier* = object
## Multi Producers, Single Consumer event notification
## This is can be seen as a wait-free condition variable for producers
## that avoids them spending time in expensive kernel land due to mutexes.
# ---- Consumer specific ----
ticket{.align: 64.}: uint8 # A ticket for the consumer to sleep in a phase
# ---- Contention ---- no real need for padding as cache line should be reloaded in case of contention anyway
futex: Futex # A Futex (atomic int32 that can put thread to sleep)
phase: Atomic[uint8] # A binary timestamp, toggles between 0 and 1 (but there is no atomic "not")
signaled: Atomic[bool] # Signaling condition
func initialize*(en: var EventNotifier) {.inline.} =
en.futex.initialize()
en.ticket = 0
en.phase.store(0, moRelaxed)
en.signaled.store(false, moRelaxed)
func `=destroy`*(en: var EventNotifier) {.inline.} =
en.futex.teardown()
func `=copy`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be copied".}
func `=sink`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be moved".}
func prepareToPark*(en: var EventNotifier) {.inline.} =
## The consumer intends to sleep soon.
## This must be called before the formal notification
## via a channel.
if not en.signaled.load(moRelaxed):
en.ticket = en.phase.load(moRelaxed)
proc park*(en: var EventNotifier) {.inline.} =
## Wait until we are signaled of an event
## Thread is parked and does not consume CPU resources
## This may wakeup spuriously.
if not en.signaled.load(moRelaxed):
if en.ticket == en.phase.load(moRelaxed):
en.futex.wait(0)
en.signaled.store(false, moRelaxed)
en.futex.initialize()
proc notify*(en: var EventNotifier) {.inline.} =
## Signal a thread that it can be unparked
if en.signaled.load(moRelaxed):
# Another producer is signaling
return
en.signaled.store(true, moRelease)
discard en.phase.fetchXor(1, moRelaxed)
en.futex.store(1, moRelease)
en.futex.wake()
# ############################################################ # ############################################################
# #
# Eventcount # Eventcount
@ -85,8 +21,6 @@ proc notify*(en: var EventNotifier) {.inline.} =
type type
Eventcount* = object Eventcount* = object
## The lock-free equivalent of a condition variable. ## The lock-free equivalent of a condition variable.
## Supports up to 256 threads on 32-bit.
## Supports up to 65536 threads on 64-bit.
## ##
## Usage, if a thread needs to be parked until a condition is true ## Usage, if a thread needs to be parked until a condition is true
## and signaled by another thread: ## and signaled by another thread:
@ -102,122 +36,87 @@ type
## else: ## else:
## ec.sleep() ## ec.sleep()
## ``` ## ```
waitset: Atomic[uint32]
state: Atomic[uint] # type waitset = object
# State is actually the equivalent of a bitfield # preSleep {.bitsize: 16.}: uint32
# type State = object # committedSleep {.bitsize: 16.}: uint32
# when sizeof(uint) == 8:
# waiters {.bitsize: 16.}: uint16
# preWaiters {.bitsize: 16.}: uint16
# epoch {.bitsize: 32.}: uint32
# else:
# waiters {.bitsize: 8.}: uint8
# preWaiters {.bitsize: 8.}: uint8
# epoch {.bitsize: 16.}: uint16
# #
# but there is no native fetchAdd for bitfields. # We need precise committed sleep count for the `syncAll` barrier because a `preSleep` waiter
# may steal a task and create more work.
futex: Futex events: Futex
# Technically we could use the futex as the state.
# When you wait on a Futex, it waits only if the value of the futex
# matches with a reference value.
# But our reference value will be the epoch of notifications
# and it is non-trivial to zero-out the waiters bits.
# - One way could be to split a 64-bit number in 2
# and cast the epoch part to Futex but that would only work on 64-bit CPU.
# - Another more hacky way would be to pad with a zero-out uint16 before and after the Futex
# and depending on big or little endian provide a shifted address as Futex.
ParkingTicket* = object ParkingTicket* = object
epoch: uint32 epoch: uint32
const # bitfield const # bitfield setup
# On 32-bit
# Low 8 bits are waiters, up to 2⁸ = 256 threads are supported
# Next 8 bits are pre-waiters, planning to wait but not committed.
# Next 16 bits is the epoch.
# The epoch deals with the ABA problem
# - up to 65536 wake requests on 32-bit
# Epoch rolling over to 0 are not a problem, they won't change the low 16 bits.
# On 64-bit
# Low 16 bits are waiters, up to 2¹⁶ = 65536 threads are supported # Low 16 bits are waiters, up to 2¹⁶ = 65536 threads are supported
# Next 16 bits are pre-waiters, planning to wait but not committed. # Next 16 bits are pre-waiters, planning to wait but not committed.
# Next 32 bits is the epoch.
# The epoch deals with the ABA problem
# - up to 4 294 967 296 wake requests on 64-bit
# Epoch rolling over to 0 are not a problem, they won't change the low 16 bits.
# #
# OS limitations: # OS limitations:
# - Windows 10 supports up to 256 cores (https://www.microsoft.com/en-us/microsoft-365/blog/2017/12/15/windows-10-pro-workstations-power-advanced-workloads/) # - Windows 10 supports up to 256 cores (https://www.microsoft.com/en-us/microsoft-365/blog/2017/12/15/windows-10-pro-workstations-power-advanced-workloads/)
# - Linux CPUSET supports up to 1024 threads (https://man7.org/linux/man-pages/man3/CPU_SET.3.html) # - Linux CPUSET supports up to 1024 threads (https://man7.org/linux/man-pages/man3/CPU_SET.3.html)
# #
# Hardware limitations: # Hardware limitations:
# - Xeon Platinum 9282, 56 cores - 112 threads # - Xeon Platinum 9282, 56 cores - 112 threads per socket
# - 8 sockets: 896 threads # - up to 8 sockets: 896 threads
scale = sizeof(uint) div 4 # 2 for 64-bit, 1 for 32-bit. kPreWaitShift = 8'u32
kPreWait = 1'u32 shl kPreWaitShift
kEpochShift = 16'u * scale kWait = 1'u32
kPreWaitShift = 8'u * scale kCommitToWait = kWait - kPreWait
kWaitMask = kPreWait-1
kEpoch = 1'u shl kEpochShift kPreWaitMask = not kWaitMask
kPreWait = 1'u shl kPreWaitShift
kWait = 1'u
kTransitionToWait = kWait - kPreWait
kWaitMask = kPreWait-1
kAnyWaiterMask = kEpoch-1
kPreWaitMask = kAnyWaiterMask xor kWaitMask # 0x0000FF00 on 32-bit
func initialize*(ec: var EventCount) {.inline.} = func initialize*(ec: var EventCount) {.inline.} =
ec.state.store(0, moRelaxed) ec.waitset.store(0, moRelaxed)
ec.futex.initialize() ec.events.initialize()
func `=destroy`*(ec: var EventCount) {.inline.} = func `=destroy`*(ec: var EventCount) {.inline.} =
ec.futex.teardown() ec.events.teardown()
proc sleepy*(ec: var Eventcount): ParkingTicket {.noInit, inline.} = proc sleepy*(ec: var Eventcount): ParkingTicket {.noInit, inline.} =
## To be called before checking if the condition to not sleep is met. ## To be called before checking if the condition to not sleep is met.
## Returns a ticket to be used when committing to sleep ## Returns a ticket to be used when committing to sleep
let prevState = ec.state.fetchAdd(kPreWait, moAcquireRelease) discard ec.waitset.fetchAdd(kPreWait, moRelease)
result.epoch = uint32(prevState shr kEpochShift) result.epoch = ec.events.load(moAcquire)
proc sleep*(ec: var Eventcount, ticket: ParkingTicket) {.inline.} = proc sleep*(ec: var Eventcount, ticket: ParkingTicket) {.inline.} =
## Put a thread to sleep until notified. ## Put a thread to sleep until notified.
## If the ticket becomes invalid (a notfication has been received) ## If the ticket becomes invalid (a notification has been received)
## by the time sleep is called, the thread won't enter sleep ## by the time sleep is called, the thread won't enter sleep
discard ec.state.fetchAdd(kTransitionToWait, moAcquireRelease) discard ec.waitset.fetchAdd(kCommitToWait, moRelease)
while ec.state.load(moAcquire) shr kEpochShift == ticket.epoch: while ec.events.load(moAcquire) == ticket.epoch:
ec.futex.wait(ticket.epoch) # We don't use the futex internal value ec.events.wait(ticket.epoch)
let prev {.used.} = ec.state.fetchSub(kWait, moRelaxed) discard ec.waitset.fetchSub(kWait, moRelease)
proc cancelSleep*(ec: var Eventcount) {.inline.} = proc cancelSleep*(ec: var Eventcount) {.inline.} =
## Cancel a sleep that was scheduled. ## Cancel a sleep that was scheduled.
let prev {.used.} = ec.state.fetchSub(kPreWait, moRelaxed) discard ec.waitset.fetchSub(kPreWait, moRelease)
proc wake*(ec: var EventCount) {.inline.} = proc wake*(ec: var EventCount) {.inline.} =
## Wake a thread if at least 1 is parked ## Prevent an idle thread from sleeping
let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease) ## or wait a sleeping one if there wasn't any idle
if (prev and kPreWaitMask) != 0: discard ec.events.increment(1, moRelease)
# Some threads are in prewait and will see the epoch change let waiters = ec.waitset.load(moAcquire)
if (waiters and kPreWaitMask) != 0:
# Some threads are in prewait and will see the event count change
# no need to do an expensive syscall # no need to do an expensive syscall
return return
if (prev and kWaitMask) != 0: if waiters != 0:
ec.futex.wake() ec.events.wake()
proc wakeAll*(ec: var EventCount) {.inline.} = proc wakeAll*(ec: var EventCount) {.inline.} =
## Wake all threads if at least 1 is parked ## Wake all threads if at least 1 is parked
let prev = ec.state.fetchAdd(kEpoch, moAcquireRelease) discard ec.events.increment(1, moRelease)
if (prev and kWaitMask) != 0: let waiters = ec.waitset.load(moAcquire)
ec.futex.wakeAll() if (waiters and kWaitMask) != 0:
ec.events.wakeAll()
proc getNumWaiters*(ec: var EventCount): tuple[preSleep, committedSleep: int32] {.noInit, inline.} = proc getNumWaiters*(ec: var EventCount): tuple[preSleep, committedSleep: int32] {.noInit, inline.} =
## Get the number of idle threads: ## Get the number of idle threads:
## (planningToSleep, committedToSleep) ## (preSleep, committedSleep)
let waiters = ec.state.load(moAcquire) let waiters = ec.waitset.load(moAcquire)
result.preSleep = cast[int32]((waiters and kPreWaitMask) shr kPreWaitShift) result.preSleep = cast[int32]((waiters and kPreWaitMask) shr kPreWaitShift)
result.committedSleep = cast[int32](waiters and kWaitMask) result.committedSleep = cast[int32](waiters and kWaitMask)
{.pop.} # {.push raises:[], checks:off.}

View File

@ -210,4 +210,4 @@ proc steal*(thiefID: int32, tq: var Taskqueue): ptr Task =
if not compareExchange(tq.front, f, f+1, moSequentiallyConsistent, moRelaxed): if not compareExchange(tq.front, f, f+1, moSequentiallyConsistent, moRelaxed):
# Failed race. # Failed race.
return nil return nil
result.thiefID.store(thiefID, moRelease) result.setThief(thiefID)

View File

@ -10,7 +10,7 @@ import
std/atomics, std/atomics,
../instrumentation, ../instrumentation,
../../allocs, ../../allocs,
./backoff ../primitives/futexes
# Tasks have an efficient design so that a single heap allocation # Tasks have an efficient design so that a single heap allocation
# is required per `spawn`. # is required per `spawn`.
@ -26,19 +26,33 @@ import
# (The name future is already used for IO scheduling) # (The name future is already used for IO scheduling)
type type
TaskState = object
## This state allows synchronization between:
## - a waiter that may sleep if no work and task is incomplete
## - a thief that completes the task
## - a waiter that frees task memory
## - a waiter that will pick up the task continuation
##
## Supports up to 2¹⁵ = 32768 threads
completed: Futex
synchro: Atomic[uint32]
# type synchro = object
# canBeFreed {.bitsize: 1.}: uint32 - Transfer ownership from thief to waiter
# pad {.bitsize: 1.}: uint32
# waiterID {.bitsize: 15.}: uint32 - ID of the waiter blocked on the task completion.
# thiefID {.bitsize: 15.}: uint32 - ID of the worker that stole and run the task. For leapfrogging.
Task* = object Task* = object
# Synchronization # Synchronization
# ------------------ # ------------------
parent*: ptr Task # When a task is awaiting, a thread can quickly prioritize the direct child of a task state: TaskState
thiefID*: Atomic[int32] # ID of the worker that stole and run the task. For leapfrogging. parent*: ptr Task # Latency: When a task is awaited, a thread can quickly prioritize its direct children.
hasFuture*: bool # Ownership: if a task has a future, the future deallocates it. Otherwise the worker thread does. hasFuture*: bool # Ownership: if a task has a future, the future deallocates it. Otherwise the worker thread does.
completed*: Atomic[bool]
waiter*: Atomic[ptr EventNotifier]
# Data parallelism # Data parallelism
# ------------------ # ------------------
isFirstIter*: bool # Awaitable for-loops return true for first iter. Loops are split before first iter. isFirstIter*: bool # Load-Balancing: New loops are split before first iter. Split loops are run once before reconsidering split.
envSize*: int32 # This is used for splittable loop envSize*: int32 # Metadata: In splittable loops we need to copy the `env` upon splitting
loopStart*: int loopStart*: int
loopStop*: int loopStop*: int
loopStride*: int loopStride*: int
@ -65,10 +79,89 @@ type
task*: ptr Task task*: ptr Task
next*: ptr ReductionDagNode next*: ptr ReductionDagNode
# Tasks # Task State
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
const SentinelThief* = 0xFACADE'i32 # Tasks have the following lifecycle:
# - A task creator that schedule a task on its queue
# - A task runner, task creator or thief, that runs the task
# - Once the task is finished:
# - if the task has no future, the task runner frees the task
# - if the task has a future,
# - the task runner can immediately pick up new work
# - the awaiting thread frees the task
# - the awaiting thread might be sleeping and need to be woken up.
#
# There is a delicate dance as we are need to prevent 2 issues:
#
# 1. A deadlock: if the waiter is never woken up after the thief completes the task
# 2. A use-after-free: if the thief tries to access the task after the waiter frees it.
#
# To solve 1, we need to set a `completed` flag, then check again if the waiter parked before.
# To solve 2, we either need to ensure that after the `completed` flag is set, the task runner
# doesn't access the task anymore which is impossible due to 1;
# or we have the waiter spinlock on another flag `canBeFreed`.
const # bitfield setup
kCanBeFreedShift = 31
kCanBeFreed = 1'u32 shl kCanBeFreedShift
kCanBeFreedMask = kCanBeFreed # 0x80000000
kWaiterShift = 15
kThiefMask = (1'u32 shl kWaiterShift) - 1 # 0x00007FFF
kWaiterMask = kThiefMask shl kWaiterShift # 0x3FFF8000
SentinelWaiter = high(uint32) and kWaiterMask
SentinelThief* = high(uint32) and kThiefMask
proc initSynchroState*(task: ptr Task) {.inline.} =
task.state.completed.store(0, moRelaxed)
task.state.synchro.store(SentinelWaiter or SentinelThief, moRelaxed)
# Flowvar synchronization
# -----------------------
proc isGcReady*(task: ptr Task): bool {.inline.} =
## Check if task can be freed by the waiter if it was stolen
(task.state.synchro.load(moAcquire) and kCanBeFreedMask) != 0
proc setGcReady*(task: ptr Task) {.inline.} =
## Thief transfers full task ownership to waiter
discard task.state.synchro.fetchAdd(kCanBeFreed, moRelease)
proc isCompleted*(task: ptr Task): bool {.inline.} =
## Check task completion
task.state.completed.load(moAcquire) != 0
proc setCompleted*(task: ptr Task) {.inline.} =
## Set a task to `complete`
## Wake a waiter thread if there is one
task.state.completed.store(1, moRelaxed)
fence(moSequentiallyConsistent)
let waiter = task.state.synchro.load(moRelaxed)
if (waiter and kWaiterMask) != SentinelWaiter:
task.state.completed.wake()
proc sleepUntilComplete*(task: ptr Task, waiterID: int32) {.inline.} =
## Sleep while waiting for task completion
let waiter = (cast[uint32](waiterID) shl kWaiterShift) - SentinelWaiter
discard task.state.synchro.fetchAdd(waiter, moRelaxed)
fence(moAcquire)
while task.state.completed.load(moRelaxed) == 0:
task.state.completed.wait(0)
# Leapfrogging synchronization
# ----------------------------
proc getThief*(task: ptr Task): uint32 {.inline.} =
task.state.synchro.load(moAcquire) and kThiefMask
proc setThief*(task: ptr Task, thiefID: int32) {.inline.} =
let thief = cast[uint32](thiefID) - SentinelThief
discard task.state.synchro.fetchAdd(thief, moRelease)
# Tasks
# -------------------------------------------------------------------------
proc newSpawn*( proc newSpawn*(
T: typedesc[Task], T: typedesc[Task],
@ -79,11 +172,9 @@ proc newSpawn*(
const size = sizeof(T) const size = sizeof(T)
result = allocHeapUnchecked(T, size) result = allocHeapUnchecked(T, size)
result.initSynchroState()
result.parent = parent result.parent = parent
result.thiefID.store(SentinelThief, moRelaxed)
result.hasFuture = false result.hasFuture = false
result.completed.store(false, moRelaxed)
result.waiter.store(nil, moRelaxed)
result.fn = fn result.fn = fn
proc newSpawn*( proc newSpawn*(
@ -96,11 +187,9 @@ proc newSpawn*(
sizeof(env) sizeof(env)
result = allocHeapUnchecked(T, size) result = allocHeapUnchecked(T, size)
result.initSynchroState()
result.parent = parent result.parent = parent
result.thiefID.store(SentinelThief, moRelaxed)
result.hasFuture = false result.hasFuture = false
result.completed.store(false, moRelaxed)
result.waiter.store(nil, moRelaxed)
result.fn = fn result.fn = fn
cast[ptr[type env]](result.env)[] = env cast[ptr[type env]](result.env)[] = env
@ -120,11 +209,9 @@ proc newLoop*(
preCondition: start < stop preCondition: start < stop
result = allocHeapUnchecked(T, size) result = allocHeapUnchecked(T, size)
result.initSynchroState()
result.parent = parent result.parent = parent
result.thiefID.store(SentinelThief, moRelaxed)
result.hasFuture = false result.hasFuture = false
result.completed.store(false, moRelaxed)
result.waiter.store(nil, moRelaxed)
result.fn = fn result.fn = fn
result.envSize = 0 result.envSize = 0
@ -148,11 +235,9 @@ proc newLoop*(
preCondition: start < stop preCondition: start < stop
result = allocHeapUnchecked(T, size) result = allocHeapUnchecked(T, size)
result.initSynchroState()
result.parent = parent result.parent = parent
result.thiefID.store(SentinelThief, moRelaxed)
result.hasFuture = false result.hasFuture = false
result.completed.store(false, moRelaxed)
result.waiter.store(nil, moRelaxed)
result.fn = fn result.fn = fn
result.envSize = int32(sizeof(env)) result.envSize = int32(sizeof(env))
cast[ptr[type env]](result.env)[] = env cast[ptr[type env]](result.env)[] = env
@ -180,6 +265,8 @@ proc newFlowVar*(T: typedesc, task: ptr Task): Flowvar[T] {.inline.} =
cast[ptr ptr Task](task.env.addr)[] = task cast[ptr ptr Task](task.env.addr)[] = task
proc cleanup*(fv: var Flowvar) {.inline.} = proc cleanup*(fv: var Flowvar) {.inline.} =
while not fv.task.isGcReady():
cpuRelax()
fv.task.freeHeap() fv.task.freeHeap()
fv.task = nil fv.task = nil
@ -195,14 +282,13 @@ func isReady*[T](fv: Flowvar[T]): bool {.inline.} =
## In that case `sync` will not block. ## In that case `sync` will not block.
## Otherwise the current will block to help on all the pending tasks ## Otherwise the current will block to help on all the pending tasks
## until the Flowvar is ready. ## until the Flowvar is ready.
fv.task.completed.load(moAcquire) fv.task.isCompleted()
func readyWith*[T](task: ptr Task, childResult: T) {.inline.} = func readyWith*[T](task: ptr Task, childResult: T) {.inline.} =
## Send the Flowvar result from the child thread processing the task ## Send the Flowvar result from the child thread processing the task
## to its parent thread. ## to its parent thread.
precondition: not task.completed.load(moAcquire) precondition: not task.isCompleted()
cast[ptr (ptr Task, T)](task.env.addr)[1] = childResult cast[ptr (ptr Task, T)](task.env.addr)[1] = childResult
task.completed.store(true, moRelease)
proc sync*[T](fv: sink Flowvar[T]): T {.noInit, inline, gcsafe.} = proc sync*[T](fv: sink Flowvar[T]): T {.noInit, inline, gcsafe.} =
## Blocks the current thread until the flowvar is available ## Blocks the current thread until the flowvar is available

View File

@ -113,7 +113,8 @@ Instead we can have each thread start working and use backpressure to lazily eva
### Backoff workers when awaiting a future ### Backoff workers when awaiting a future
This problem is quite tricky: This problem is quite tricky:
- For latency we want the worker to continue as soon as the future is completed. This might also create more work and expose more parallelism opportunities (in recursive divide-and-conquer algorithms for example) - For latency we want the worker to continue as soon as the future is completed. This might also create more work and expose more parallelism opportunities (in recursive divide-and-conquer algorithms for example).\
Note that with hyperthreading, the sibling thread(s) can still use the core fully so throughput might not be impacted.
- For throughput, and because a scheduler is optimal only when greedy (i.e. within 2x of the best schedule, see Cilk paper), we want an idle thread to take any available work ASAP. - For throughput, and because a scheduler is optimal only when greedy (i.e. within 2x of the best schedule, see Cilk paper), we want an idle thread to take any available work ASAP.
- but what if that worker ends up with work that blocks it for a long-time? It may lead to work starvation. - but what if that worker ends up with work that blocks it for a long-time? It may lead to work starvation.
- There is no robust, cross-platform API, to wake a specific thread awaiting on a futex or condition variable. - There is no robust, cross-platform API, to wake a specific thread awaiting on a futex or condition variable.
@ -136,12 +137,13 @@ This problem is quite tricky:
- Using continuations: - Using continuations:
We could just store a continuation in the future so the thread that completes the future picks up the continuation. We could just store a continuation in the future so the thread that completes the future picks up the continuation.
The workaround for now is just to work on any available tasks to maximize throughput then sleep. Besides design issues, there are also engineering issues as we can't wake a specific thread on a common futex or condition variable.
This has 2 disadvantages: - Either a thread sleeps on a locally owned one, but how to communicate its address to the thief?
- For coarse-grain parallelism, if the waiting thread steals a long task. And how to synchronize freeing the task memory?
However, coarse-grain parallelism is easier to split into multiple tasks, In particular, if we use the task as the medium, how to avoid race condition where:
while exposing fine-grain parallelism and properly handling with its overhead is the usual problem. task is completed by thief, task memory is freed by waiter, thief tries to get the waiter futex/condition variable
- As we can't wake a specific thread on a common futex or condition variable, and triggers a use-after-free.
when awaiting a future, a thread sleeps on a local one. - or its sleeps on the global and each stolen completed task triggers a wakeAll
Hence if no work can be stolen, the waiter sleeps. But if work is then recreated, it stays sleeping as only the global futex is notified. - or we create a backoff data structure where specific waiters can be woken up.
Note that with hyperthreading, the sibling thread(s) can still use the core fully so throughput might not be impacted.
Our solution is to embed the backoff structure in the task and add an additional flag to notify when the task can be freed safely.

View File

@ -12,21 +12,32 @@
import std/atomics import std/atomics
export MemoryOrder export MemoryOrder
type # OS primitives
Futex* = object # ------------------------------------------------------------------------
value: Atomic[uint32]
FutexOp = distinct cint
var NR_Futex {.importc: "__NR_futex", header: "<sys/syscall.h>".}: cint
var FutexWaitPrivate {.importc:"FUTEX_WAIT_PRIVATE", header: "<linux/futex.h>".}: FutexOp
var FutexWakePrivate {.importc:"FUTEX_WAKE_PRIVATE", header: "<linux/futex.h>".}: FutexOp const
NR_Futex = 202
FUTEX_WAIT_PRIVATE = 128
FUTEX_WAKE_PRIVATE = 129
proc syscall(sysno: clong): cint {.header:"<unistd.h>", varargs.} proc syscall(sysno: clong): cint {.header:"<unistd.h>", varargs.}
proc sysFutex( proc sysFutex(
futex: var Futex, op: FutexOp, val1: cuint or cint, futexAddr: pointer, operation: uint32, expected: uint32 or int32,
timeout: pointer = nil, val2: pointer = nil, val3: cint = 0): cint {.inline.} = timeout: pointer = nil, val2: pointer = nil, val3: cint = 0): cint {.inline.} =
syscall(NR_Futex, futex.value.addr, op, val1, timeout, val2, val3) ## See https://web.archive.org/web/20230208151430/http://locklessinc.com/articles/futex_cheat_sheet/
## and https://www.akkadia.org/drepper/futex.pdf
syscall(NR_Futex, futexAddr, operation, expected, timeout, val2, val3)
# Futex API
# ------------------------------------------------------------------------
type
Futex* = object
value: Atomic[uint32]
proc initialize*(futex: var Futex) {.inline.} = proc initialize*(futex: var Futex) {.inline.} =
futex.value.store(0, moRelaxed) futex.value.store(0, moRelaxed)
@ -37,19 +48,20 @@ proc teardown*(futex: var Futex) {.inline.} =
proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} = proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} =
futex.value.load(order) futex.value.load(order)
proc loadMut*(futex: var Futex): var Atomic[uint32] {.inline.} =
futex.value
proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} = proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} =
futex.value.store(value, order) futex.value.store(value, order)
proc wait*(futex: var Futex, refVal: uint32) {.inline.} = proc increment*(futex: var Futex, value: uint32, order: MemoryOrder): uint32 {.inline.} =
## Suspend a thread if the value of the futex is the same as refVal. ## Increment a futex value, returns the previous one.
futex.value.fetchAdd(value, order)
proc wait*(futex: var Futex, expected: uint32) {.inline.} =
## Suspend a thread if the value of the futex is the same as expected.
# Returns 0 in case of a successful suspend # Returns 0 in case of a successful suspend
# If value are different, it returns EWOULDBLOCK # If value are different, it returns EWOULDBLOCK
# We discard as this is not needed and simplifies compat with Windows futex # We discard as this is not needed and simplifies compat with Windows futex
discard sysFutex(futex, FutexWaitPrivate, refVal) discard sysFutex(futex.value.addr, FutexWaitPrivate, expected)
proc wake*(futex: var Futex) {.inline.} = proc wake*(futex: var Futex) {.inline.} =
## Wake one thread (from the same process) ## Wake one thread (from the same process)
@ -57,7 +69,7 @@ proc wake*(futex: var Futex) {.inline.} =
# Returns the number of actually woken threads # Returns the number of actually woken threads
# or a Posix error code (if negative) # or a Posix error code (if negative)
# We discard as this is not needed and simplifies compat with Windows futex # We discard as this is not needed and simplifies compat with Windows futex
discard sysFutex(futex, FutexWakePrivate, 1) discard sysFutex(futex.value.addr, FutexWakePrivate, 1)
proc wakeAll*(futex: var Futex) {.inline.} = proc wakeAll*(futex: var Futex) {.inline.} =
## Wake all threads (from the same process) ## Wake all threads (from the same process)
@ -65,4 +77,4 @@ proc wakeAll*(futex: var Futex) {.inline.} =
# Returns the number of actually woken threads # Returns the number of actually woken threads
# or a Posix error code (if negative) # or a Posix error code (if negative)
# We discard as this is not needed and simplifies compat with Windows futex # We discard as this is not needed and simplifies compat with Windows futex
discard sysFutex(futex, FutexWakePrivate, high(int32)) discard sysFutex(futex.value.addr, FutexWakePrivate, high(int32))

View File

@ -7,7 +7,10 @@
import std/atomics import std/atomics
# A wrapper for Darwin futex. # OS primitives
# ------------------------------------------------------------------------
# Darwin futexes.
# They are used in libc++ so likely to be very stable. # They are used in libc++ so likely to be very stable.
# A new API appeared in OSX Big Sur (Jan 2021) ulock_wait2 and macOS pthread_cond_t has been migrated to it # A new API appeared in OSX Big Sur (Jan 2021) ulock_wait2 and macOS pthread_cond_t has been migrated to it
# - https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6 # - https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
@ -73,10 +76,13 @@ const ULF_WAKE_MASK = ULF_NO_ERRNO or
ULF_WAKE_THREAD or ULF_WAKE_THREAD or
ULF_WAKE_ALLOW_NON_OWNER ULF_WAKE_ALLOW_NON_OWNER
proc ulock_wait(operation: uint32, address: pointer, value: uint64, timeout: uint32): cint {.importc:"__ulock_wait", cdecl.} proc ulock_wait(operation: uint32, address: pointer, expected: uint64, timeout: uint32): cint {.importc:"__ulock_wait", cdecl.}
proc ulock_wait2(operation: uint32, address: pointer, value: uint64, timeout, value2: uint64): cint {.importc:"__ulock_wait2", cdecl.} proc ulock_wait2(operation: uint32, address: pointer, expected: uint64, timeout, value2: uint64): cint {.importc:"__ulock_wait2", cdecl.}
proc ulock_wake(operation: uint32, address: pointer, wake_value: uint64): cint {.importc:"__ulock_wake", cdecl.} proc ulock_wake(operation: uint32, address: pointer, wake_value: uint64): cint {.importc:"__ulock_wake", cdecl.}
# Futex API
# ------------------------------------------------------------------------
type type
Futex* = object Futex* = object
value: Atomic[uint32] value: Atomic[uint32]
@ -90,15 +96,16 @@ proc teardown*(futex: var Futex) {.inline.} =
proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} = proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} =
futex.value.load(order) futex.value.load(order)
proc loadMut*(futex: var Futex): var Atomic[uint32] {.inline.} =
futex.value
proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} = proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} =
futex.value.store(value, order) futex.value.store(value, order)
proc wait*(futex: var Futex, refVal: uint32) {.inline.} = proc increment*(futex: var Futex, value: uint32, order: MemoryOrder): uint32 {.inline.} =
## Suspend a thread if the value of the futex is the same as refVal. ## Increment a futex value, returns the previous one.
discard ulock_wait(UL_UNFAIR_LOCK64_SHARED or ULF_NO_ERRNO, futex.value.addr, uint64 refVal, 0) futex.value.fetchAdd(value, order)
proc wait*(futex: var Futex, expected: uint32) {.inline.} =
## Suspend a thread if the value of the futex is the same as expected.
discard ulock_wait(UL_UNFAIR_LOCK64_SHARED or ULF_NO_ERRNO, futex.value.addr, uint64 expected, 0)
proc wake*(futex: var Futex) {.inline.} = proc wake*(futex: var Futex) {.inline.} =
## Wake one thread (from the same process) ## Wake one thread (from the same process)

View File

@ -8,21 +8,12 @@
# An implementation of futex using Windows primitives # An implementation of futex using Windows primitives
import std/atomics, winlean import std/atomics, winlean
export MemoryOrder
type # OS primitives
Futex* = object # ------------------------------------------------------------------------
value: Atomic[uint32]
# Contrary to the documentation, the futex related primitives are NOT in kernel32.dll # Contrary to the documentation, the futex related primitives are NOT in kernel32.dll
# but in API-MS-Win-Core-Synch-l1-2-0.dll ¯\_(ツ)_/¯ # but in API-MS-Win-Core-Synch-l1-2-0.dll ¯\_(ツ)_/¯
proc initialize*(futex: var Futex) {.inline.} =
futex.value.store(0, moRelaxed)
proc teardown*(futex: var Futex) {.inline.} =
futex.value.store(0, moRelaxed)
proc WaitOnAddress( proc WaitOnAddress(
Address: pointer, CompareAddress: pointer, Address: pointer, CompareAddress: pointer,
AddressSize: csize_t, dwMilliseconds: DWORD AddressSize: csize_t, dwMilliseconds: DWORD
@ -32,23 +23,37 @@ proc WaitOnAddress(
proc WakeByAddressSingle(Address: pointer) {.importc, stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0.dll".} proc WakeByAddressSingle(Address: pointer) {.importc, stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0.dll".}
proc WakeByAddressAll(Address: pointer) {.importc, stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0.dll".} proc WakeByAddressAll(Address: pointer) {.importc, stdcall, dynlib: "API-MS-Win-Core-Synch-l1-2-0.dll".}
# Futex API
# ------------------------------------------------------------------------
type
Futex* = object
value: Atomic[uint32]
proc initialize*(futex: var Futex) {.inline.} =
futex.value.store(0, moRelaxed)
proc teardown*(futex: var Futex) {.inline.} =
futex.value.store(0, moRelaxed)
proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} = proc load*(futex: var Futex, order: MemoryOrder): uint32 {.inline.} =
futex.value.load(order) futex.value.load(order)
proc loadMut*(futex: var Futex): var Atomic[uint32] {.inline.} =
futex.value
proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} = proc store*(futex: var Futex, value: uint32, order: MemoryOrder) {.inline.} =
futex.value.store(value, order) futex.value.store(value, order)
proc wait*(futex: var Futex, refVal: uint32) {.inline.} = proc increment*(futex: var Futex, value: uint32, order: MemoryOrder): uint32 {.inline.} =
## Suspend a thread if the value of the futex is the same as refVal. ## Increment a futex value, returns the previous one.
futex.value.fetchAdd(value, order)
proc wait*(futex: var Futex, expected: uint32) {.inline.} =
## Suspend a thread if the value of the futex is the same as expected.
# Returns TRUE if the wait succeeds or FALSE if not. # Returns TRUE if the wait succeeds or FALSE if not.
# getLastError() will contain the error information, for example # getLastError() will contain the error information, for example
# if it failed due to a timeout. # if it failed due to a timeout.
# We discard as this is not needed and simplifies compat with Linux futex # We discard as this is not needed and simplifies compat with Linux futex
discard WaitOnAddress(futex.value.addr, refVal.unsafeAddr, csize_t sizeof(refVal), INFINITE) discard WaitOnAddress(futex.value.addr, expected.unsafeAddr, csize_t sizeof(expected), INFINITE)
proc wake*(futex: var Futex) {.inline.} = proc wake*(futex: var Futex) {.inline.} =
## Wake one thread (from the same process) ## Wake one thread (from the same process)

View File

@ -150,7 +150,6 @@ type
currentTask: ptr Task currentTask: ptr Task
# Synchronization # Synchronization
localBackoff: EventNotifier # Multi-Producer Single-Consumer backoff
signal: ptr Signal # owned signal signal: ptr Signal # owned signal
# Thefts # Thefts
@ -184,6 +183,7 @@ var workerContext {.threadvar.}: WorkerContext
## and use a Minimal Perfect Hash Function. ## and use a Minimal Perfect Hash Function.
## We can approximate a threadID by retrieving the address of a dummy thread-local variable. ## We can approximate a threadID by retrieving the address of a dummy thread-local variable.
## - Or we sort threadID and use binary search ## - Or we sort threadID and use binary search
## The FlowVar would also need to store the Threadpool
proc setupWorker(ctx: var WorkerContext) = proc setupWorker(ctx: var WorkerContext) =
## Initialize the thread-local context of a worker ## Initialize the thread-local context of a worker
@ -197,7 +197,6 @@ proc setupWorker(ctx: var WorkerContext) =
ctx.rng.seed(0xEFFACED + ctx.id) ctx.rng.seed(0xEFFACED + ctx.id)
# Synchronization # Synchronization
ctx.localBackoff.initialize()
ctx.signal = addr ctx.threadpool.workerSignals[ctx.id] ctx.signal = addr ctx.threadpool.workerSignals[ctx.id]
ctx.signal.terminate.store(false, moRelaxed) ctx.signal.terminate.store(false, moRelaxed)
@ -210,7 +209,6 @@ proc setupWorker(ctx: var WorkerContext) =
proc teardownWorker(ctx: var WorkerContext) = proc teardownWorker(ctx: var WorkerContext) =
## Cleanup the thread-local context of a worker ## Cleanup the thread-local context of a worker
ctx.localBackoff.`=destroy`()
ctx.taskqueue[].teardown() ctx.taskqueue[].teardown()
proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.} proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.}
@ -249,32 +247,34 @@ proc workerEntryFn(params: tuple[threadpool: Threadpool, id: WorkerID]) {.raises
# ############################################################ # ############################################################
# Sentinel values # Sentinel values
const ReadyFuture = cast[ptr EventNotifier](0xCA11AB1E)
const RootTask = cast[ptr Task](0xEFFACED0) const RootTask = cast[ptr Task](0xEFFACED0)
proc run*(ctx: var WorkerContext, task: ptr Task) {.raises:[].} = proc run*(ctx: var WorkerContext, task: ptr Task) {.raises:[].} =
## Run a task, frees it if it is not owned by a Flowvar ## Run a task, frees it if it is not owned by a Flowvar
let suspendedTask = ctx.currentTask let suspendedTask = ctx.currentTask
ctx.currentTask = task ctx.currentTask = task
debug: log("Worker %3d: running task 0x%.08x (previous: 0x%.08x, %d pending, thiefID %d)\n", ctx.id, task, suspendedTask, ctx.taskqueue[].peek(), task.thiefID) debug: log("Worker %3d: running task 0x%.08x (previous: 0x%.08x, %d pending, thiefID %d)\n", ctx.id, task, suspendedTask, ctx.taskqueue[].peek(), task.getThief())
task.fn(task.env.addr) task.fn(task.env.addr)
debug: log("Worker %3d: completed task 0x%.08x (%d pending)\n", ctx.id, task, ctx.taskqueue[].peek()) debug: log("Worker %3d: completed task 0x%.08x (%d pending)\n", ctx.id, task, ctx.taskqueue[].peek())
ctx.currentTask = suspendedTask ctx.currentTask = suspendedTask
if not task.hasFuture:
if not task.hasFuture: # Are we the final owner?
debug: log("Worker %3d: freeing task 0x%.08x with no future\n", ctx.id, task)
freeHeap(task) freeHeap(task)
return return
# Sync with an awaiting thread in completeFuture that didn't find work # Sync with an awaiting thread in completeFuture that didn't find work
var expected = (ptr EventNotifier)(nil) # and transfer ownership of the task to it.
if not compareExchange(task.waiter, expected, desired = ReadyFuture, moAcquireRelease): debug: log("Worker %3d: transfering task 0x%.08x to future holder\n", ctx.id, task)
debug: log("Worker %3d: completed task 0x%.08x, notifying waiter 0x%.08x\n", ctx.id, task, expected) task.setCompleted()
expected[].notify() task.setGcReady()
proc schedule(ctx: var WorkerContext, tn: ptr Task, forceWake = false) {.inline.} = proc schedule(ctx: var WorkerContext, tn: ptr Task, forceWake = false) {.inline.} =
## Schedule a task in the threadpool ## Schedule a task in the threadpool
## This wakes a sibling thread if our local queue is empty ## This wakes a sibling thread if our local queue is empty
## or forceWake is true. ## or forceWake is true.
debug: log("Worker %3d: schedule task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, tn, tn.parent, ctx.currentTask) debug: log("Worker %3d: schedule task 0x%.08x (parent/current task 0x%.08x)\n", ctx.id, tn, tn.parent)
# Instead of notifying every time a task is scheduled, we notify # Instead of notifying every time a task is scheduled, we notify
# only when the worker queue is empty. This is a good approximation # only when the worker queue is empty. This is a good approximation
@ -308,7 +308,7 @@ proc schedule(ctx: var WorkerContext, tn: ptr Task, forceWake = false) {.inline.
iterator splitUpperRanges( iterator splitUpperRanges(
ctx: WorkerContext, task: ptr Task, ctx: WorkerContext, task: ptr Task,
curLoopIndex: int, numIdle: int32 curLoopIndex: int, approxIdle: int32
): tuple[start, size: int] = ): tuple[start, size: int] =
## Split the iteration range based on the number of idle threads ## Split the iteration range based on the number of idle threads
## returns chunks with parameters (start, stopEx, len) ## returns chunks with parameters (start, stopEx, len)
@ -338,10 +338,10 @@ iterator splitUpperRanges(
debugSplit: debugSplit:
log("Worker %3d: task 0x%.08x - %8d step(s) left (current: %3d, start: %3d, stop: %3d, stride: %3d, %3d idle worker(s))\n", log("Worker %3d: task 0x%.08x - %8d step(s) left (current: %3d, start: %3d, stop: %3d, stride: %3d, %3d idle worker(s))\n",
ctx.id, task, task.loopStepsLeft, curLoopIndex, task.loopStart, task.loopStop, task.loopStride, numIdle) ctx.id, task, task.loopStepsLeft, curLoopIndex, task.loopStart, task.loopStop, task.loopStride, approxIdle)
# Send a chunk of work to all idle workers + ourselves # Send a chunk of work to all idle workers + ourselves
let availableWorkers = cast[int](numIdle + 1) let availableWorkers = cast[int](approxIdle + 1)
let baseChunkSize = task.loopStepsLeft div availableWorkers let baseChunkSize = task.loopStepsLeft div availableWorkers
let cutoff = task.loopStepsLeft mod availableWorkers let cutoff = task.loopStepsLeft mod availableWorkers
@ -405,19 +405,18 @@ func decrease(backoff: var BalancerBackoff) {.inline.} =
if backoff.windowLogSize < 0: if backoff.windowLogSize < 0:
backoff.windowLogSize = 0 backoff.windowLogSize = 0
proc splitAndDispatchLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int, numIdle: int32) = proc splitAndDispatchLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int, approxIdle: int32) =
# The iterator mutates the task with the first chunk metadata # The iterator mutates the task with the first chunk metadata
let stop = task.loopStop let stop = task.loopStop
for (offset, numSteps) in ctx.splitUpperRanges(task, curLoopIndex, numIdle): for (offset, numSteps) in ctx.splitUpperRanges(task, curLoopIndex, approxIdle):
if numSteps == 0: if numSteps == 0:
break break
let upperSplit = allocHeapUnchecked(Task, sizeof(Task) + task.envSize) let upperSplit = allocHeapUnchecked(Task, sizeof(Task) + task.envSize)
copyMem(upperSplit, task, sizeof(Task) + task.envSize) copyMem(upperSplit, task, sizeof(Task) + task.envSize)
upperSplit.initSynchroState()
upperSplit.parent = task upperSplit.parent = task
upperSplit.thiefID.store(SentinelThief, moRelaxed)
upperSplit.waiter.store(nil, moRelaxed)
upperSplit.isFirstIter = false upperSplit.isFirstIter = false
upperSplit.loopStart = offset upperSplit.loopStart = offset
@ -446,9 +445,9 @@ proc loadBalanceLoop(ctx: var WorkerContext, task: ptr Task, curLoopIndex: int,
if ctx.taskqueue[].peek() == 0: if ctx.taskqueue[].peek() == 0:
let waiters = ctx.threadpool.globalBackoff.getNumWaiters() let waiters = ctx.threadpool.globalBackoff.getNumWaiters()
# We assume that the worker that scheduled the task will work on it. I.e. idleness is underestimated. # We assume that the worker that scheduled the task will work on it. I.e. idleness is underestimated.
let numIdle = waiters.preSleep + waiters.committedSleep + cast[int32](task.isFirstIter) let approxIdle = waiters.preSleep + waiters.committedSleep + cast[int32](task.isFirstIter)
if numIdle > 0: if approxIdle > 0:
ctx.splitAndDispatchLoop(task, curLoopIndex, numIdle) ctx.splitAndDispatchLoop(task, curLoopIndex, approxIdle)
backoff.decrease() backoff.decrease()
else: else:
backoff.increase() backoff.increase()
@ -547,15 +546,13 @@ proc tryLeapfrog(ctx: var WorkerContext, awaitedTask: ptr Task): ptr Task =
var thiefID = SentinelThief var thiefID = SentinelThief
while true: while true:
debug: log("Worker %3d: leapfrogging - waiting for thief of task 0x%.08x to publish their ID\n", ctx.id, awaitedTask) debug: log("Worker %3d: leapfrogging - waiting for thief of task 0x%.08x to publish their ID (thiefID read %d)\n", ctx.id, awaitedTask, thiefID)
thiefID = awaitedTask.thiefID.load(moAcquire) thiefID = awaitedTask.getThief()
if thiefID != SentinelThief: if thiefID != SentinelThief:
break break
cpuRelax() cpuRelax()
ascertain: 0 <= thiefID and thiefID < ctx.threadpool.numThreads ascertain: 0 <= thiefID and thiefID < ctx.threadpool.numThreads
# Leapfrogging is used when completing a future, so steal only one task
# and don't leave tasks stranded in our queue.
let leapTask = ctx.id.steal(ctx.threadpool.workerQueues[thiefID]) let leapTask = ctx.id.steal(ctx.threadpool.workerQueues[thiefID])
if not leapTask.isNil(): if not leapTask.isNil():
# Theft successful, there might be more work for idle threads, wake one # Theft successful, there might be more work for idle threads, wake one
@ -588,9 +585,9 @@ proc eventLoop(ctx: var WorkerContext) {.raises:[], gcsafe.} =
break break
else: else:
# 2.c Park the thread until a new task enters the threadpool # 2.c Park the thread until a new task enters the threadpool
debug: log("Worker %3d: eventLoop 2.b - sleeping\n", ctx.id) debugTermination: log("Worker %3d: eventLoop 2.b - sleeping\n", ctx.id)
ctx.threadpool.globalBackoff.sleep(ticket) ctx.threadpool.globalBackoff.sleep(ticket)
debug: log("Worker %3d: eventLoop 2.b - waking\n", ctx.id) debugTermination: log("Worker %3d: eventLoop 2.b - waking\n", ctx.id)
# ############################################################ # ############################################################
# # # #
@ -603,7 +600,7 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
template ctx: untyped = workerContext template ctx: untyped = workerContext
template isFutReady(): untyped = template isFutReady(): untyped =
let isReady = fv.task.completed.load(moAcquire) let isReady = fv.task.isCompleted()
if isReady: if isReady:
parentResult = cast[ptr (ptr Task, T)](fv.task.env.addr)[1] parentResult = cast[ptr (ptr Task, T)](fv.task.env.addr)[1]
isReady isReady
@ -611,9 +608,8 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
if isFutReady(): if isFutReady():
return return
## 1. Process all the children of the current tasks. ## 1. Process all the children of the current tasks first, ignoring the rest.
## This ensures that we can give control back ASAP. debug: log("Worker %3d: sync 1 - searching task from local queue (awaitedTask 0x%.08x)\n", ctx.id, fv.task)
debug: log("Worker %3d: sync 1 - searching task from local queue\n", ctx.id)
while (let task = ctx.taskqueue[].pop(); not task.isNil): while (let task = ctx.taskqueue[].pop(); not task.isNil):
if task.parent != ctx.currentTask: if task.parent != ctx.currentTask:
debug: log("Worker %3d: sync 1 - skipping non-direct descendant task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask) debug: log("Worker %3d: sync 1 - skipping non-direct descendant task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask)
@ -622,7 +618,7 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
debug: log("Worker %3d: sync 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask) debug: log("Worker %3d: sync 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, task, task.parent, ctx.currentTask)
ctx.run(task) ctx.run(task)
if isFutReady(): if isFutReady():
debug: log("Worker %3d: sync 1 - future ready, exiting\n", ctx.id) debug: log("Worker %3d: sync 1 - future ready, exiting (awaitedTask 0x%.08x)\n", ctx.id, fv.task)
return return
# 2. We run out-of-tasks or out-of-direct-child of our current awaited task # 2. We run out-of-tasks or out-of-direct-child of our current awaited task
@ -636,6 +632,9 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
# #
# Design tradeoffs # Design tradeoffs
# ---------------- # ----------------
# see ./docs/design.md
#
# Rundown:
# #
# At this point, we have significant design decisions: # At this point, we have significant design decisions:
# - Do we steal from other workers in hope we advance our awaited task? # - Do we steal from other workers in hope we advance our awaited task?
@ -646,6 +645,10 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
# Note: With hyperthreading, real hardware resources are 2x less than the reported number of cores. # Note: With hyperthreading, real hardware resources are 2x less than the reported number of cores.
# Hence parking might free contended memory bandwitdh or execution ports. # Hence parking might free contended memory bandwitdh or execution ports.
# - Do we just not sleep, potentially wasting energy? # - Do we just not sleep, potentially wasting energy?
# Note: on "empty tasks" (like Fibonacci), the constant hammering of threads
# actually slows down performance by 2x.
# This is a realistic scenario for IO (waiting to copy a network buffer for example)
# but the second almost-empty benchmark, depth-first-search is actually faster by 17%.
# #
# - If we work, we maximize throughput, but we increase latency to handle the future's continuation. # - If we work, we maximize throughput, but we increase latency to handle the future's continuation.
# If that continuation would have created more parallel work, we would actually have restricted parallelism. # If that continuation would have created more parallel work, we would actually have restricted parallelism.
@ -657,31 +660,28 @@ proc completeFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
# in theory maintains throughput and minimize the latency of the future's continuation # in theory maintains throughput and minimize the latency of the future's continuation
# but in practice, performance can worsen significantly on fine-grained parallelism. # but in practice, performance can worsen significantly on fine-grained parallelism.
debug: log("Worker %3d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x)\n", ctx.id, ctx.currentTask) debug: log("Worker %3d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x, awaitedTask 0x%.08x)\n", ctx.id, ctx.currentTask, fv.task)
while not isFutReady(): while not isFutReady():
if (let leapTask = ctx.tryLeapfrog(fv.task); not leapTask.isNil): if (let leapTask = ctx.tryLeapfrog(fv.task); not leapTask.isNil):
# Leapfrogging, the thief had an empty queue, hence if there are tasks in its queue, it's generated by our blocked task. # Leapfrogging, the thief had an empty queue, hence if there are tasks in its queue, it's generated by our blocked task.
# Help the thief clear those, as if it did not finish, it's likely blocked on those children tasks. # Help the thief clear those, as if it did not finish, it's likely blocked on those children tasks.
debug: log("Worker %3d: sync 2.1 - leapfrog task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, leapTask, leapTask.parent, ctx.currentTask) debug: log("Worker %3d: sync 2.1 - leapfrog task 0x%.08x (parent 0x%.08x, current 0x%.08x, awaitedTask 0x%.08x)\n", ctx.id, leapTask, leapTask.parent, ctx.currentTask, fv.task)
ctx.run(leapTask) ctx.run(leapTask)
elif (let stolenTask = ctx.tryStealOne(); not stolenTask.isNil): elif (let stolenTask = ctx.tryStealOne(); not stolenTask.isNil):
# We stole a task, we hope we advance our awaited task. # We stole a task, we hope we advance our awaited task.
debug: log("Worker %3d: sync 2.2 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) debug: log("Worker %3d: sync 2.2 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x, awaitedTask 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask, fv.task)
ctx.run(stolenTask) ctx.run(stolenTask)
elif (let ownTask = ctx.taskqueue[].pop(); not ownTask.isNil): elif (let ownTask = ctx.taskqueue[].pop(); not ownTask.isNil):
# We advance our own queue, this increases global throughput but may impact latency on the awaited task. # We advance our own queue, this increases global throughput but may impact latency on the awaited task.
debug: log("Worker %3d: sync 2.3 - couldn't steal, running own task\n", ctx.id) debug: log("Worker %3d: sync 2.3 - couldn't steal, running own task (awaitedTask 0x%.08x)\n", ctx.id, fv.task)
ctx.run(ownTask) ctx.run(ownTask)
else: else:
# Nothing to do, we park. # Nothing to do, we park.
# - On today's hyperthreaded systems, this might reduce contention on a core resources like memory caches and execution ports # - On today's hyperthreaded systems, this might reduce contention on a core resources like memory caches and execution ports
# - If more work is created, we won't be notified as we need to park on a dedicated notifier for precise wakeup when future is ready # - If more work is created, we won't be notified as we need to park on a dedicated notifier for precise wakeup when future is ready
ctx.localBackoff.prepareToPark() debugTermination: log("Worker %3d: sync 2.4 - Empty runtime, parking (awaitedTask 0x%.08x)\n", ctx.id, fv.task)
fv.task.sleepUntilComplete(ctx.id)
var expected = (ptr EventNotifier)(nil) debugTermination: log("Worker %3d: sync 2.4 - signaled, waking (awaitedTask 0x%.08x)\n", ctx.id, fv.task)
if compareExchange(fv.task.waiter, expected, desired = ctx.localBackoff.addr, moAcquireRelease):
ctx.localBackoff.park()
proc syncAll*(tp: Threadpool) {.raises: [].} = proc syncAll*(tp: Threadpool) {.raises: [].} =
## Blocks until all pending tasks are completed ## Blocks until all pending tasks are completed