* add fibonacci bench https://github.com/status-im/nim-taskpools/issues/5

* unify allocs, don't use a mix of calloc malloc and wv_alloc

* Chase-Lev Deque: "unlimited" growth

* Remove affinity / CPU pinning support: does not work for ARM (Big.Little Arch), macOS, Alder Lake (P and E cores) and multiple instances of a program get the main thread pinned on the same core.

* Remove weave-specific things: WV_NUM_THREADS, the design-by-contract asserts

* avoid running destructors on freshly allocated tasks on Nim 1.6
This commit is contained in:
Mamy Ratsimbazafy 2022-01-16 08:57:06 +01:00 committed by GitHub
parent 26e3b1e15b
commit 79c18d7c94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 319 additions and 206 deletions

View File

@ -0,0 +1,77 @@
# Fibonacci benchmarks
⚠️ Disclaimer:
Please don't use parallel fibonacci in production!
Use the fast doubling method with memoization instead.
Fibonacci benchmark has 3 draws:
1. It's very simple to implement
2. It's unbalanced and efficiency requires distributions to avoid idle cores.
3. It's a very effective scheduler overhead benchmark, because the basic task is very trivial and the task spawning grows at 2^n scale.
Want to know the difference between low and high overhead?
Run the following C code (taken from [Oracle OpenMP example](https://docs.oracle.com/cd/E19205-01/820-7883/girtd/index.html))
```C
#include <stdio.h>
#include <omp.h>
int fib(int n)
{
int i, j;
if (n<2)
return n;
else
{
#pragma omp task shared(i) firstprivate(n)
{
i=fib(n-1);
}
j=fib(n-2);
#pragma omp taskwait
return i+j;
}
}
int main()
{
int n = 40;
#pragma omp parallel shared(n)
{
#pragma omp single
printf ("fib(%d) = %d\n", n, fib(n));
}
}
```
First compile with Clang and run it
```
clang -O3 -fopenmp benchmarks/fibonacci/omp_fib.c
time a.out
```
It should be fairly quick
Then compile with GCC and run it
```
gcc -O3 -fopenmp benchmarks/fibonacci/omp_fib.c
time a.out
```
Notice how some cores get idle as time goes on?
Don't forget to kill the benchmark, you'll be there all day.
What's happening?
GCC's OpenMP implementation uses a single queue for all tasks.
That queue gets constantly hammered by all threads and becomes a contention point.
Furthermore, it seems like there is no load balancing or that due to the contention/lock
threads are descheduled.
However Clang implementation uses a work-stealing scheduler with one deque per thread.
The only contention happens when a thread run out of work and has to look for more work,
in the deque of other threads. And which thread to check is chosen at random so
the potential contention is distributed among all threads instead of a single structure.

View File

@ -0,0 +1,35 @@
import
# STD lib
os, strutils, threadpool, strformat,
# bench
../wtime
# Using Nim's standard threadpool
# Compile with "nim c --threads:on -d:release -d:danger --outdir:build benchmarks/fibonacci/stdnim_fib.nim"
#
# Note: it breaks at fib 16.
proc parfib(n: uint64): uint64 =
if n < 2: # Note: be sure to compare n<2 -> return n
return n # instead of n<=2 -> return 1
let x = spawn parfib(n-1)
let y = parfib(n-2)
return ^x + y
proc main() =
if paramCount() != 1:
echo "Usage: fib <n-th fibonacci number requested>"
quit 0
let n = paramStr(1).parseUInt.uint64
let start = wtime_msec()
let f = parfib(n)
let stop = wtime_msec()
echo "Result: ", f
echo &"Elapsed wall time: {stop-start:.2} ms"
main()

View File

@ -0,0 +1,79 @@
import
# STD lib
os, strutils, cpuinfo, strformat, math,
# Library
../../taskpools
when not defined(windows):
# bench
import ../wtime, ../resources
var tp: Taskpool
proc fib(n: int): int =
# int64 on x86-64
if n < 2:
return n
let x = tp.spawn fib(n-1)
let y = fib(n-2)
result = sync(x) + y
proc main() =
var n = 40
var nthreads: int
if paramCount() == 0:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <n-th fibonacci number requested:{n}> "
echo &"Running with default n = {n}"
elif paramCount() == 1:
n = paramStr(1).parseInt
else:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <n-th fibonacci number requested:{n}>"
quit 1
if existsEnv"WEAVE_NUM_THREADS":
nthreads = getEnv"WEAVE_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()
tp = Taskpool.new()
# measure overhead during tasking
when not defined(windows):
var ru: Rusage
getrusage(RusageSelf, ru)
var
rss = ru.ru_maxrss
flt = ru.ru_minflt
let start = wtime_msec()
let f = fib(n)
when not defined(windows):
let stop = wtime_msec()
tp.shutdown()
when not defined(windows):
getrusage(RusageSelf, ru)
rss = ru.ru_maxrss - rss
flt = ru.ru_minflt - flt
echo "--------------------------------------------------------------------------"
echo "Scheduler: Taskpool"
echo "Benchmark: Fibonacci"
echo "Threads: ", nthreads
when not defined(windows):
echo "Time(ms) ", round(stop - start, 3)
echo "Max RSS (KB): ", ru.ru_maxrss
echo "Runtime RSS (KB): ", rss
echo "# of page faults: ", flt
echo "--------------------------------------------------------------------------"
echo "n requested: ", n
echo "result: ", f
main()

View File

@ -58,17 +58,11 @@ template alloca*(T: typedesc): ptr T =
template alloca*(T: typedesc, len: Natural): ptr UncheckedArray[T] =
cast[ptr UncheckedArray[T]](alloca(sizeof(T) * len))
proc wv_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} =
when defined(WV_useNimAlloc):
cast[type result](createSharedU(T, len))
else:
cast[type result](c_malloc(csize_t len*sizeof(T)))
proc tp_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} =
cast[type result](c_malloc(csize_t len*sizeof(T)))
proc wv_free*[T: ptr](p: T) {.inline.} =
when defined(WV_useNimAlloc):
freeShared(p)
else:
c_free(p)
proc tp_free*[T: ptr](p: T) {.inline.} =
c_free(p)
# We assume that Nim zeroMem vs C memset
# and Nim copyMem vs C memcpy have no difference
@ -99,7 +93,7 @@ proc nqueens_ser(n, j: int32, a: CharArray): int32 =
if n == j:
# Good solution count it
if example_solution.isNil:
example_solution = wv_alloc(char, n)
example_solution = tp_alloc(char, n)
copyMem(example_solution, a, n * sizeof(char))
return 1

View File

@ -99,8 +99,8 @@ proc main() =
quit 1
var nthreads: int
if existsEnv"WEAVE_NUM_THREADS":
nthreads = getEnv"WEAVE_NUM_THREADS".parseInt()
if existsEnv"TP_NUM_THREADS":
nthreads = getEnv"TP_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()

View File

@ -22,26 +22,17 @@ type
## - Padded to avoid false sharing in collections
## - No extra indirection to access the item, the buffer is inline the channel
## - Linearizable
## - Default usable size is 254 bytes (WV_MemBlockSize - 2).
## If used in an intrusive manner, it's 126 bytes due to the default 128 bytes padding.
##
## The channel should be the last field of an object if used in an intrusive manner
##
## Motivations for type erasure
## - when LazyFlowvar needs to be converted
## from stack-allocated memory to heap to extended their lifetime
## we have no type information at all as the whole runtime
## and especially tasks does not retain it.
##
## - When a task depends on a future that was generated from lazy loop-splitting
## we don't have type information either.
##
## - An extra benefit is probably easier embedding, or calling
## from C or JIT code.
full{.align: 64.}: Atomic[bool]
itemSize*: uint8
buffer*{.align: 8.}: UncheckedArray[byte]
when (NimMajor,NimMinor,NimPatch) <= (1,4,0):
type AssertionDefect = AssertionError
{.push raises: [AssertionDefect].} # Ensure no exceptions can happen
proc `=`(
dest: var ChannelSPSCSingle,
source: ChannelSPSCSingle
@ -90,6 +81,8 @@ func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
chan.full.store(true, moRelease)
return true
{.pop.} # raises: [AssertionDefect]
# Sanity checks
# ------------------------------------------------------------------------------
when isMainModule:

View File

@ -35,17 +35,21 @@
# To reduce contention, stealing is done on the opposite end from push/pop
# so that there is a race only for the very last task.
{.push raises: [].}
import
system/ansi_c,
std/[locks, typetraits, atomics],
./instrumentation/[contracts, loggers]
./instrumentation/[contracts, loggers],
./primitives/allocs
type
Buf[T] = object
## Backend buffer of a ChaseLevDeque
## `capacity` MUST be a power of 2
# Note: update tp_allocUnchecked allocation if any field changes.
# Unused. There is no memory reclamation scheme.
prev: ptr Buf[T]
capacity: int
mask: int # == capacity-1 implies (i and mask) == (i mod capacity)
rawBuffer: UncheckedArray[Atomic[T]]
@ -55,15 +59,19 @@ type
## The owning thread enqueues and dequeues at the bottom
## Foreign threads steal at the top.
##
## Default queue size is 8
## Queue can grow to handle up to 34 359 738 368 tasks in flights
## TODO:
## with --gc:arc / --gc:orc, use a seq instead of a fixed max size.
## There is no memory reclamation scheme for simplicity.
top {.align: 64.}: Atomic[int]
bottom: Atomic[int]
buf: Atomic[ptr Buf[T]]
garbage: array[32, ptr Buf[T]] # up to 34 359 738 368 sized buffer
garbageUsed: uint8
garbage: ptr Buf[T]
when (NimMajor,NimMinor,NimPatch) <= (1,4,0):
type AssertionDefect = AssertionError
{.push raises: [AssertionDefect].} # Ensure no exceptions can happen
{.push overflowChecks: off.} # We don't want exceptions (for Defect) in a multithreaded context
# but we don't to deal with underflow of unsigned int either
# say "if a < b - c" with c > b
func isPowerOfTwo(n: int): bool {.inline.} =
(n and (n - 1)) == 0 and (n != 0)
@ -75,13 +83,16 @@ proc newBuf(T: typedesc, capacity: int): ptr Buf[T] =
preCondition: capacity.isPowerOfTwo()
result = cast[ptr Buf[T]](
c_malloc(csize_t 2*sizeof(int) + sizeof(T)*capacity)
result = tp_allocUnchecked(
Buf[T],
1*sizeof(pointer) + 2*sizeof(int) + sizeof(T)*capacity,
zero = true
)
# result.prev = nil
result.capacity = capacity
result.mask = capacity - 1
result.rawBuffer.addr.zeroMem(sizeof(T)*capacity)
# result.rawBuffer.addr.zeroMem(sizeof(T)*capacity)
proc `[]=`[T](buf: var Buf[T], index: int, item: T) {.inline.} =
buf.rawBuffer[index and buf.mask].store(item, moRelaxed)
@ -102,25 +113,28 @@ proc grow[T](deque: var ChaseLevDeque[T], buf: var ptr Buf[T], top, bottom: int)
for i in top ..< bottom:
tmp[][i] = buf[][i]
# This requires 68+ billions tasks in flight (per-thread)
ascertain: deque.garbageUsed.int < deque.garbage.len
deque.garbage[deque.garbageUsed] = buf
buf.prev = deque.garbage
deque.garbage = buf
# publish globally
deque.buf.store(tmp, moRelaxed)
# publish locally
swap(buf, tmp)
deque.buf.store(buf, moRelaxed)
# Public API
# ---------------------------------------------------
proc init*[T](deque: var ChaseLevDeque[T]) =
proc init*[T](deque: var ChaseLevDeque[T], initialCapacity: int) =
## Initializes a new Chase-lev work-stealing deque.
deque.reset()
deque.buf.store(newBuf(T, 8), moRelaxed)
deque.buf.store(newBuf(T, initialCapacity), moRelaxed)
proc teardown*[T](deque: var ChaseLevDeque[T]) =
## Teardown a Chase-lev work-stealing deque.
for i in 0 ..< deque.garbageUsed.int:
c_free(deque.garbage[i])
var node = deque.garbage
while node != nil:
let tmp = node.prev
c_free(node)
node = tmp
c_free(deque.buf.load(moRelaxed))
proc push*[T](deque: var ChaseLevDeque[T], item: T) =
@ -179,3 +193,6 @@ proc steal*[T](deque: var ChaseLevDeque[T]): T =
if not compare_exchange(deque.top, t, t+1, moSequentiallyConsistent, moRelaxed):
# Failed race.
return default(T)
{.pop.} # overflowChecks
{.pop.} # raises: [AssertionDefect]

View File

@ -36,6 +36,14 @@ type
parked: int
signals: int
when (NimMajor,NimMinor,NimPatch) <= (1,4,0):
type AssertionDefect = AssertionError
{.push raises: [AssertionDefect].} # Ensure no exceptions can happen
{.push overflowChecks: off.} # We don't want exceptions (for Defect) in a multithreaded context
# but we don't to deal with underflow of unsigned int either
# say "if a < b - c" with c > b
func initialize*(en: var EventNotifier) {.inline.} =
## Initialize the event notifier
en.lock.initLock()
@ -84,3 +92,6 @@ proc getParked*(en: var EventNotifier): int {.inline.} =
en.lock.acquire()
result = en.parked
en.lock.release()
{.pop.} # overflowChecks
{.pop.} # raises: [AssertionDefect]

View File

@ -33,13 +33,13 @@ type
proc newFlowVar*(T: typedesc): Flowvar[T] {.inline.} =
let size = 2 + sizeof(T) # full flag + item size + buffer
result.chan = wv_allocAligned(ChannelSPSCSingle, size, alignment = 64)
result.chan = tp_allocAligned(ChannelSPSCSingle, size, alignment = 64)
result.chan[].initialize(sizeof(T))
proc cleanup(fv: Flowvar) {.inline.} =
# TODO: Nim v1.4+ can use "sink Flowvar"
if not fv.chan.isNil:
wv_freeAligned(fv.chan)
tp_freeAligned(fv.chan)
func isSpawned*(fv: Flowvar): bool {.inline.} =
## Returns true if a flowvar is spawned

View File

@ -13,7 +13,7 @@ import macros, os, strutils
# ----------------------------------------------------------------------------------
# Everything should be a template that doesn't produce any code
# when WV_Asserts is not defined.
# when TP_Asserts is not defined.
# Those checks are controlled by a custom flag instead of
# "--boundsChecks" or "--nilChecks" to decouple them from user code checks.
# Furthermore, we want them to be very lightweight on performance
@ -74,19 +74,24 @@ macro assertContract(
"\n The following values are contrary to expectations:" &
"\n "
let values = inspectInfix(strippedPredicate)
let myID = quote do:
when declared(myID):
$myID()
let workerID = quote do:
when declared(workerContext):
$workerContext.id
else:
"N/A"
let taskpoolID = quote do:
when declared(workerContext):
"0x" & cast[ByteAddress](workerContext.taskpool).toHex().toLowerAscii()
else:
"N/A"
result = quote do:
{.noSideEffect.}:
when compileOption("assertions"):
assert(`predicate`, `debug` & $`values` & " [Worker " & `myID` & "]\n")
elif defined(WV_Asserts):
assert(`predicate`, `debug` & $`values` & " [Worker " & `workerID` & " on taskpool " & `taskpoolID` & "]\n")
elif defined(TP_Asserts):
if unlikely(not(`predicate`)):
raise newException(AssertionError, `debug` & $`values` & '\n')
raise newException(AssertionError, `debug` & $`values` & " [Worker " & `workerID` & " on taskpool " & `taskpoolID` & "]\n")
# A way way to get the caller function would be nice.

View File

@ -1,52 +0,0 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# Thread primitives
# ----------------------------------------------------------------------------------
type
Pthread {.importc: "pthread_t", header: "<sys/types.h>".} = object
CpuSet {.byref, importc: "cpu_set_t", header: "<sched.h>".} = object
proc pthread_self(): Pthread {.header: "<pthread.h>".}
proc pthread_setaffinity_np(
thread: Pthread,
cpuset_size: int,
cpuset: CpuSet
) {.header: "<pthread.h>".}
## Limit specified `thread` to run only on the processors
## represented in `cpuset`
# Note CpuSet is always passed by (hidden) pointer
proc cpu_zero(cpuset: var CpuSet) {.importc: "CPU_ZERO", header: "<sched.h>".}
## Clears the set so that it contains no CPU
proc cpu_set(cpu: cint, cpuset: var CpuSet) {.importc: "CPU_SET", header: "<sched.h>".}
## Add CPU to set
# Affinity
# ----------------------------------------------------------------------------------
# Nim doesn't allow the main thread to set its own affinity
proc set_thread_affinity(t: Pthread, cpu: int32) {.inline.}=
when defined(osx) or defined(android):
{.warning: "To improve performance we should pin threads to cores.\n" &
"This is not possible with MacOS or Android.".}
# Note: on Android it's even more complex due to the Big.Little architecture
# with cores with different performance profiles to save on battery
else:
var cpuset {.noinit.}: CpuSet
cpu_zero(cpuset)
cpu_set(cpu, cpuset)
pthread_setaffinity_np(t, sizeof(CpuSet), cpuset)
proc pinToCpu*(cpu: int32) {.inline.} =
## Set the affinity of the main thread (the calling thread)
set_thread_affinity(pthread_self(), cpu)

View File

@ -1,18 +0,0 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import winlean
when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
proc setThreadAffinityMask(hThread: Handle, dwThreadAffinityMask: uint) {.
importc: "SetThreadAffinityMask", stdcall, header: "<windows.h>".}
proc pinToCpu*(cpu: int32) {.inline.} =
## Set the affinity of the main thread (the calling thread)
setThreadAffinityMask(getThreadID(), uint(1 shl cpu))

View File

@ -5,14 +5,13 @@
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
system/ansi_c
import system/ansi_c
# Helpers
# ----------------------------------------------------------------------------------
proc isPowerOfTwo*(n: int): bool {.inline.} =
(n and (n - 1)) == 0
func isPowerOfTwo(n: int): bool {.inline.} =
(n and (n - 1)) == 0 and (n != 0)
# TODO: cannot dispatch at compile-time due to https://github.com/nim-lang/Nim/issues/12726
# but all our use-case are for power of 2
@ -44,44 +43,48 @@ template deref*(T: typedesc): typedesc =
## Return the base object type behind a ptr type
typeof(default(T)[])
proc wv_alloc*(T: typedesc): ptr T {.inline.}=
## Default allocator for the Picasso library
proc tp_alloc*(T: typedesc, zero: static bool = false): ptr T {.inline.}=
## Default allocator for the Taskpools library
## This allocates memory to hold the type T
## and returns a pointer to it
##
## Can use Nim allocator to measure the overhead of its lock
## Memory is not zeroed
when defined(WV_useNimAlloc):
createSharedU(T)
else:
cast[ptr T](c_malloc(csize_t sizeof(T)))
result = cast[ptr T](c_malloc(csize_t sizeof(T)))
when zero:
zeroMem(result, sizeof(T))
proc wv_allocPtr*(T: typedesc[ptr], zero: static bool = false): T {.inline.}=
## Default allocator for the Picasso library
proc tp_allocPtr*(T: typedesc[ptr], zero: static bool = false): T {.inline.}=
## Default allocator for the Taskpools library
## This allocates memory to hold the
## underlying type of the pointer type T.
## i.e. if T is ptr int, this allocates an int
##
## Can use Nim allocator to measure the overhead of its lock
## Memory is not zeroed
result = wv_alloc(deref(T))
## Memory is zeroed if requested
result = tp_alloc(deref(T))
when zero:
zeroMem(result, sizeof(deref(T)))
proc wv_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} =
## Default allocator for the Picasso library.
proc tp_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} =
## Default allocator for the Taskpools library.
## This allocates a contiguous chunk of memory
## to hold ``len`` elements of type T
## and returns a pointer to it.
##
## Can use Nim allocator to measure the overhead of its lock
## Memory is not zeroed
when defined(WV_useNimAlloc):
cast[type result](createSharedU(T, len))
else:
cast[type result](c_malloc(csize_t len*sizeof(T)))
cast[type result](c_malloc(csize_t len*sizeof(T)))
proc wv_free*[T: ptr](p: T) {.inline.} =
proc tp_allocUnchecked*(T: typedesc, size: SomeInteger, zero: static bool = false): ptr T {.inline.} =
## Default allocator for the Taskpools library.
## This allocates "size" bytes.
## This is for datastructure which contained an UncheckedArray field
result = cast[type result](c_malloc(csize_t size))
when zero:
zeroMem(result, size)
proc tp_free*[T: ptr](p: T) {.inline.} =
when defined(WV_useNimAlloc):
freeShared(p)
else:
@ -101,19 +104,19 @@ template alloca*(T: typedesc, len: Natural): ptr UncheckedArray[T] =
when defined(windows):
proc aligned_alloc_windows(size, alignment: csize_t): pointer {.sideeffect,importc:"_aligned_malloc", header:"<malloc.h>".}
# Beware of the arg order!
proc wv_freeAligned*[T](p: ptr T){.sideeffect,importc:"_aligned_free", header:"<malloc.h>".}
proc tp_freeAligned*[T](p: ptr T){.sideeffect,importc:"_aligned_free", header:"<malloc.h>".}
elif defined(osx):
proc posix_memalign(mem: var pointer, alignment, size: csize_t){.sideeffect,importc, header:"<stdlib.h>".}
proc aligned_alloc(alignment, size: csize_t): pointer {.inline.} =
posix_memalign(result, alignment, size)
proc wv_freeAligned*[T](p: ptr T){.inline.} =
proc tp_freeAligned*[T](p: ptr T){.inline.} =
c_free(p)
else:
proc aligned_alloc(alignment, size: csize_t): pointer {.sideeffect,importc, header:"<stdlib.h>".}
proc wv_freeAligned*[T](p: ptr T){.inline.} =
proc tp_freeAligned*[T](p: ptr T){.inline.} =
c_free(p)
proc wv_allocAligned*(T: typedesc, alignment: static Natural): ptr T {.inline.} =
proc tp_allocAligned*(T: typedesc, alignment: static Natural): ptr T {.inline.} =
## aligned_alloc requires allocating in multiple of the alignment.
static:
assert alignment.isPowerOfTwo()
@ -126,7 +129,7 @@ proc wv_allocAligned*(T: typedesc, alignment: static Natural): ptr T {.inline.}
else:
cast[ptr T](aligned_alloc(csize_t alignment, csize_t requiredMem))
proc wv_allocAligned*(T: typedesc, size: int, alignment: static Natural): ptr T {.inline.} =
proc tp_allocAligned*(T: typedesc, size: int, alignment: static Natural): ptr T {.inline.} =
## aligned_alloc requires allocating in multiple of the alignment.
static:
assert alignment.isPowerOfTwo()
@ -138,7 +141,7 @@ proc wv_allocAligned*(T: typedesc, size: int, alignment: static Natural): ptr T
else:
cast[ptr T](aligned_alloc(csize_t alignment, csize_t requiredMem))
proc wv_allocArrayAligned*(T: typedesc, len: int, alignment: static Natural): ptr UncheckedArray[T] {.inline.} =
proc tp_allocArrayAligned*(T: typedesc, len: int, alignment: static Natural): ptr UncheckedArray[T] {.inline.} =
## aligned_alloc requires allocating in multiple of the alignment.
static:
assert alignment.isPowerOfTwo()

View File

@ -8,7 +8,8 @@
import
std/random,
system/ansi_c,
./instrumentation/contracts
./instrumentation/contracts,
./primitives/allocs
const TP_MaxWorkers = 255
type Setuint = uint8 # We support at most 255 threads (0xFF is kept as special value to signify absence in the set)
@ -40,7 +41,7 @@ func allocate*(s: var SparseSet, capacity: SomeInteger) {.inline.} =
preCondition: capacity <= TP_MaxWorkers
s.capacity = Setuint capacity
s.rawBuffer = cast[ptr UncheckedArray[Setuint]](c_calloc(csize_t 2*capacity, csize_t sizeof(Setuint)))
s.rawBuffer = tp_alloc(Setuint, 2*capacity)
s.indices = s.rawBuffer
s.values = cast[ptr UncheckedArray[Setuint]](s.rawBuffer[capacity].addr)

View File

@ -35,7 +35,10 @@
# In case a thread is blocked for IO, other threads can steal pending tasks in that thread.
# If all threads are pending for IO, the threadpool will not make any progress and be soft-locked.
{.push raises: [].}
when (NimMajor,NimMinor,NimPatch) <= (1,4,0):
type AssertionDefect = AssertionError
{.push raises: [AssertionDefect].} # Ensure no exceptions can happen
import
system/ansi_c,
@ -53,11 +56,6 @@ export
# flowvars
Flowvar, isSpawned, isReady, sync
when defined(windows):
import ./primitives/affinity_windows
else:
import ./primitives/affinity_posix
when (NimMajor,NimMinor,NimPatch) >= (1,6,0):
import std/[isolation, tasks]
export isolation
@ -142,7 +140,7 @@ proc setupWorker() =
ctx.currentTask = nil
# Init
ctx.taskDeque[].init()
ctx.taskDeque[].init(initialCapacity = 32)
proc teardownWorker() =
## Cleanup the thread-local context of a worker
@ -185,9 +183,9 @@ proc workerEntryFn(params: tuple[taskpool: Taskpool, id: WorkerID])
# ---------------------------------------------
proc new(T: type TaskNode, parent: TaskNode, task: sink Task): T =
type TaskNodeObj = typeof(default(T)[])
var tn = cast[TaskNode](c_calloc(1, csize_t sizeof(TaskNodeObj)))
var tn = tp_allocPtr(TaskNode)
tn.parent = parent
wasMoved(tn.task) # tn.task is uninitialized, prevent Nim from running the Task destructor
tn.task = task
return tn
@ -351,59 +349,29 @@ proc syncAll*(pool: Taskpool) {.raises: [Exception].} =
# Runtime
# ---------------------------------------------
proc new*(T: type Taskpool, numThreads = countProcessors(), pinThreadsToCores = false): T {.raises: [Exception].} =
proc new*(T: type Taskpool, numThreads = countProcessors()): T {.raises: [Exception].} =
## Initialize a threadpool that manages `numThreads` threads.
## Default to the number of logical processors available.
##
## If pinToCPU is set, threads spawned will be pinned to the core that spawned them.
## This improves performance of memory-intensive workloads by avoiding
## thrashing and reloading core caches when a thread moves around.
##
## pinThreadsToCores option is ignored in:
## - In C++ compilation with Microsoft Visual Studio Compiler
## - MacOS
## - Android
#
# pinThreadsToCores Status:
# - C++ MSVC: implementation missing (need to wrap reinterpret_cast)
# - Android: API missing and unclear benefits due to Big.Little architecture
# - MacOS: API missing
type TpObj = typeof(default(Taskpool)[])
# Event notifier requires an extra 64 bytes for alignment
var tp = wv_allocAligned(TpObj, sizeof(TpObj) + 64, 64)
var tp = tp_allocAligned(TpObj, sizeof(TpObj) + 64, 64)
tp.barrier.init(numThreads.int32)
tp.eventNotifier.initialize()
tp.numThreads = numThreads
tp.workerDeques = wv_allocArrayAligned(ChaseLevDeque[TaskNode], numThreads, alignment = 64)
tp.workers = wv_allocArrayAligned(Thread[(Taskpool, WorkerID)], numThreads, alignment = 64)
tp.workerSignals = wv_allocArrayAligned(Signal, numThreads, alignment = 64)
tp.workerDeques = tp_allocArrayAligned(ChaseLevDeque[TaskNode], numThreads, alignment = 64)
tp.workers = tp_allocArrayAligned(Thread[(Taskpool, WorkerID)], numThreads, alignment = 64)
tp.workerSignals = tp_allocArrayAligned(Signal, numThreads, alignment = 64)
# Setup master thread
workerContext.id = 0
workerContext.taskpool = tp
if pinThreadsToCores:
when not(defined(cpp) and defined(vcc)):
# TODO: Nim casts between Windows Handles but that requires reinterpret cast for C++
pinToCpu(0)
# Start worker threads
for i in 1 ..< numThreads:
createThread(tp.workers[i], worker_entry_fn, (tp, WorkerID(i)))
if pinThreadsToCores:
# TODO: we might want to take into account Hyper-Threading (HT)
# and allow spawning tasks and pinning to cores that are not HT-siblings.
# This is important for memory-bound workloads (like copy, addition, ...)
# where both sibling cores will compete for L1 and L2 cache, effectively
# halving the memory bandwidth or worse, flushing what the other put in cache.
# Note that while 2x siblings is common, Xeon Phi has 4x Hyper-Threading.
when not(defined(cpp) and defined(vcc)):
# TODO: Nim casts between Windows Handles but that requires reinterpret cast for C++
pinToCpu(tp.workers[i], i)
# Root worker
setupWorker()
@ -417,20 +385,20 @@ proc new*(T: type Taskpool, numThreads = countProcessors(), pinThreadsToCores =
discard tp.barrier.wait()
return tp
proc cleanup(tp: var TaskPool) {.raises: [OSError].} =
proc cleanup(tp: var TaskPool) {.raises: [AssertionDefect, OSError].} =
## Cleanup all resources allocated by the taskpool
preCondition: workerContext.currentTask.task.isRootTask()
for i in 1 ..< tp.numThreads:
joinThread(tp.workers[i])
tp.workerSignals.wv_freeAligned()
tp.workers.wv_freeAligned()
tp.workerDeques.wv_freeAligned()
tp.workerSignals.tp_freeAligned()
tp.workers.tp_freeAligned()
tp.workerDeques.tp_freeAligned()
`=destroy`(tp.eventNotifier)
tp.barrier.delete()
tp.wv_freeAligned()
tp.tp_freeAligned()
proc shutdown*(tp: var TaskPool) {.raises:[Exception].} =
## Wait until all tasks are processed and then shutdown the taskpool