initial commit

This commit is contained in:
Mamy André-Ratsimbazafy 2021-06-28 16:47:06 +02:00
commit 216aabe629
No known key found for this signature in database
GPG Key ID: 7B88AD1FE79492E1
41 changed files with 3976 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
nimcache/
# Executables shall be put in an ignored build/ directory
build/

35
README.md Normal file
View File

@ -0,0 +1,35 @@
# Taskpools
## API
The API spec follows https://github.com/nim-lang/RFCs/issues/347#task-parallelism-api
## Overview
This implements a lightweight, energy-efficient, easily auditable multithreaded taskpools.
This taskpools will be used in a highly security-sensitive blockchain application
targeted at resource-restricted devices hence desirable properties are:
- Ease of auditing and maintenance.
- Formally verified synchronization primitives are highly-sought after.
- Otherwise primitives are implemented from papers or ported from proven codebases
that can serve as reference for auditors.
- Resource-efficient. Threads spindown to save power, low memory use.
- Decent performance and scalability. The workload to parallelize are cryptography-related
and require at least 1ms runtime per thread.
This means that only a simple scheduler is required.
Non-goals:
- Supporting task priorities
- Being distributed
- Supporting GC-ed memory on Nim default GC (sequences and strings)
- Have async-awaitable tasks
In particular compared to [Weave](https://github.com/mratsim/weave), here are the tradeoffs:
- Taskpools only provide spawn/sync (task parallelism).\
There is no parallel for (data parallelism)\
or precise in/out dependencies (dataflow parallelism).
- Weave can handle trillions of small tasks that require only 10µs per task. (Load Balancing overhead)
- Weave maintains an adaptive memory pool to reduce memory allocation overhead,
Taskpools allocations are as-needed. (Scheduler overhead)

View File

@ -0,0 +1,11 @@
# BPC (Bouncing Producer-Consumer)
From [tasking-2.0](https://github.com/aprell/tasking-2.0) description
> **BPC**, short for **B**ouncing **P**roducer-**C**onsumer benchmark, as far
> as I know, first described by [Dinan et al][1]. There are two types of
> tasks, producer and consumer tasks. Each producer task creates another
> producer task followed by *n* consumer tasks, until a certain depth *d* is
> reached. Consumer tasks run for *t* microseconds. The smaller the values of
> *n* and *t*, the harder it becomes to exploit the available parallelism. A
> solid contender for the most antagonistic microbenchmark.

View File

@ -0,0 +1,156 @@
import
# STD lib
os, strutils, system/ansi_c, cpuinfo, strformat, math,
# Library
../../taskpools,
# bench
../wtime, ../resources
var
Depth: int32 # For example 10000
NumTasksPerDepth: int32 # For example 9
# The total number of tasks in the BPC benchmark is
# (NumTasksPerDepth + 1) * Depth
NumTasksTotal: int32
TaskGranularity: int32 # in microseconds
PollInterval: float64 # in microseconds
tp: Taskpool
var global_poll_elapsed {.threadvar.}: float64
template dummy_cpt(): untyped =
# Dummy computation
# Calculate fib(30) iteratively
var
fib = 0
f2 = 0
f1 = 1
for i in 2 .. 30:
fib = f1 + f2
f2 = f1
f1 = fib
proc bpc_consume(usec: int32) =
var pollElapsed = 0'f64
let start = wtime_usec()
let stop = usec.float64
global_poll_elapsed = PollInterval
while true:
var elapsed = wtime_usec() - start
elapsed -= pollElapsed
if elapsed >= stop:
break
dummy_cpt()
# if elapsed >= global_poll_elapsed:
# let pollStart = wtime_usec()
# loadBalance(Weave)
# pollElapsed += wtime_usec() - pollStart
# global_poll_elapsed += PollInterval
proc bpc_consume_nopoll(usec: int32) =
let start = wtime_usec()
let stop = usec.float64
while true:
var elapsed = wtime_usec() - start
if elapsed >= stop:
break
dummy_cpt()
proc bpc_produce(n, d: int32) =
if d > 0:
# Create producer task
tp.spawn bpc_produce(n, d-1)
else:
return
# Followed by n consumer tasks
for i in 0 ..< n:
tp.spawn bpc_consume(TaskGranularity)
proc main() =
Depth = 10000
NumTasksPerDepth = 999
TaskGranularity = 1
if paramCount() == 0:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <depth: {Depth}> " &
&"<# of tasks per depth: {NumTasksPerDepth}> " &
&"[task granularity (us): {TaskGranularity}] " &
&"[polling interval (us): task granularity]"
echo &"Running with default config Depth = {Depth}, NumTasksPerDepth = {NumTasksPerDepth}, granularity (us) = {TaskGranularity}, polling (us) = {PollInterval}"
if paramCount() >= 1:
Depth = paramStr(1).parseInt.int32
if paramCount() >= 2:
NumTasksPerDepth = paramStr(2). parseInt.int32
if paramCount() >= 3:
TaskGranularity = paramStr(3). parseInt.int32
if paramCount() == 4:
PollInterval = paramStr(4).parseInt.float64
else:
PollInterval = TaskGranularity.float64
if paramCount() > 4:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <depth: {Depth}> " &
&"<# of tasks per depth: {NumTasksPerDepth}> " &
&"[task granularity (us): {TaskGranularity}] " &
&"[polling interval (us): task granularity]"
quit 1
NumTasksTotal = (NumTasksPerDepth + 1) * Depth
var nthreads: int
if existsEnv"TASKPOOL_NUM_THREADS":
nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()
tp = Taskpool.new(numThreads = nthreads)
# measure overhead during tasking
var ru: Rusage
getrusage(RusageSelf, ru)
var
rss = ru.ru_maxrss
flt = ru.ru_minflt
let start = wtime_msec()
bpc_produce(NumTasksPerDepth, Depth)
tp.syncAll()
let stop = wtime_msec()
getrusage(RusageSelf, ru)
rss = ru.ru_maxrss - rss
flt = ru.ru_minflt - flt
tp.shutdown()
echo "--------------------------------------------------------------------------"
echo "Scheduler: Taskpool"
echo "Benchmark: BPC (Bouncing Producer-Consumer)"
echo "Threads: ", nthreads
echo "Time(ms) ", round(stop - start, 3)
echo "Max RSS (KB): ", ru.ru_maxrss
echo "Runtime RSS (KB): ", rss
echo "# of page faults: ", flt
echo "--------------------------------------------------------------------------"
echo "# of tasks: ", NumTasksTotal
echo "# of tasks/depth: ", NumTasksPerDepth
echo "Depth: ", Depth
echo "Task granularity (us): ", TaskGranularity
echo "Polling / manual load balancing interval (us): ", PollInterval
quit 0
main()

View File

@ -0,0 +1,85 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
# Stdlib
system/ansi_c, strformat, os, strutils, cpuinfo,
# Weave
../../weave
when not defined(windows):
# bench
import ../wtime
proc dfs(depth, breadth: int): uint32 =
if depth == 0:
return 1
# We could use alloca to avoid heap allocation here
var sums = newSeq[Flowvar[uint32]](breadth)
for i in 0 ..< breadth:
sums[i] = spawn dfs(depth - 1, breadth)
for i in 0 ..< breadth:
result += sync(sums[i])
proc test(depth, breadth: int): uint32 =
result = sync spawn dfs(depth, breadth)
proc main() =
var
depth = 8
breadth = 8
answer: uint32
nthreads: int
if existsEnv"WEAVE_NUM_THREADS":
nthreads = getEnv"WEAVE_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()
if paramCount() == 0:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <depth:{depth}> <breadth:{breadth}>"
echo &"Running with default config depth = {depth} and breadth = {breadth}"
if paramCount() >= 1:
depth = paramStr(1).parseInt()
if paramCount() == 2:
breadth = paramStr(2).parseInt()
if paramCount() > 2:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <depth:{depth}> <breadth:{breadth}>"
echo &"Up to 2 parameters are valid. Received {paramCount()}"
quit 1
# Staccato benches runtime init and exit as well
when not defined(windows):
let start = wtime_usec()
init(Weave)
answer = test(depth, breadth)
exit(Weave)
when not defined(windows):
let stop = wtime_usec()
const lazy = defined(WV_LazyFlowvar)
const config = if lazy: " (lazy flowvars)"
else: " (eager flowvars)"
echo "Scheduler: Weave", config
echo "Benchmark: dfs"
echo "Threads: ", nthreads
when not defined(windows):
echo "Time(us) ", stop - start
echo "Output: ", answer
quit 0
main()

View File

@ -0,0 +1,300 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# From fibril
#
# Original license
#
# /*
# * Heat diffusion (Jacobi-type iteration)
# *
# * Volker Strumpen, Boston August 1996
# *
# * Copyright (c) 1996 Massachusetts Institute of Technology
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# */
import
# Stdlib
strformat, os, strutils, math, system/ansi_c,
cpuinfo, threadpool,
# bench
../wtime, ../resources
# This deadlocks :/
# Helpers
# -------------------------------------------------------
# We need a thin wrapper around raw pointers for matrices,
# we can't pass "var seq[seq[float64]]" to other threads
# nor "var" for that matter
type
Matrix[T] = object
buffer: ptr UncheckedArray[T]
m, n: int
Row[T] = object
buffer: ptr UncheckedArray[T]
len: int
func newMatrix[T](m, n: int): Matrix[T] {.inline.} =
result.buffer = cast[ptr UncheckedArray[T]](c_malloc(csize_t m*n*sizeof(T)))
result.m = m
result.n = n
template `[]`[T](mat: Matrix[T], row, col: Natural): T =
# row-major storage
assert row < mat.m
assert col < mat.n
mat.buffer[row * mat.n + col]
template `[]=`[T](mat: Matrix[T], row, col: Natural, value: T) =
assert row < mat.m
assert col < mat.n
mat.buffer[row * mat.n + col] = value
func getRow[T](mat: Matrix[T], rowIdx: Natural): Row[T] {.inline.} =
# row-major storage, there are n columns in between each rows
assert rowIdx < mat.m
result.buffer = cast[ptr UncheckedArray[T]](mat.buffer[rowIdx * mat.n].addr)
result.len = mat.m
template `[]`[T](row: Row[T], idx: Natural): T =
assert idx < row.len
row.buffer[idx]
template `[]=`[T](row: Row[T], idx: Natural, value: T) =
assert idx < row.len
row.buffer[idx] = value
func delete[T](mat: sink Matrix[T]) =
c_free(mat.buffer)
# And an auto converter for int32 -> float64 so we don't have to convert
# all i, j indices manually
converter i32toF64(x: int32): float64 {.inline.} =
float64(x)
# -------------------------------------------------------
template f(x, y: SomeFloat): SomeFloat =
sin(x) * sin(y)
template randa[T: SomeFloat](x, t: T): T =
T(0.0)
proc randb(x, t: SomeFloat): SomeFloat {.inline.} =
# proc instead of template to avoid Nim constant folding bug:
# https://github.com/nim-lang/Nim/issues/12783
exp(-2 * t) * sin(x)
template randc[T: SomeFloat](y, t: T): T =
T(0.0)
proc randd(y, t: SomeFloat): SomeFloat {.inline.} =
# proc instead of template to avoid Nim constant folding bug:
# https://github.com/nim-lang/Nim/issues/12783
exp(-2 * t) * sin(y)
template solu(x, y, t: SomeFloat): SomeFloat =
exp(-2 * t) * sin(x) * sin(y)
const n = 4096'i32
var
nx, ny, nt: int32
xu, xo, yu, yo, tu, to: float64
dx, dy, dt: float64
dtdxsq, dtdysq: float64
odd: Matrix[float64]
even: Matrix[float64]
proc heat(m: Matrix[float64], il, iu: int32): bool {.discardable.}=
# TODO to allow awaiting `heat` we return a dummy bool
# The parallel spawns are updating the same matrix cells otherwise
if iu - il > 1:
let im = (il + iu) div 2
let h = spawn heat(m, il, im)
heat(m, im, iu)
discard ^h
return true
# ------------------------
let i = il
let row = m.getRow(i)
if i == 0:
for j in 0 ..< ny:
row[j] = randc(yu + j*dy, 0)
elif i == nx - 1:
for j in 0 ..< ny:
row[j] = randd(yu + j*dy, 0)
else:
row[0] = randa(xu + i*dx, 0)
for j in 1 ..< ny - 1:
row[j] = f(xu + i*dx, yu + j*dy)
row[ny - 1] = randb(xu + i*dx, 0)
proc diffuse(output: Matrix[float64], input: Matrix[float64], il, iu: int32, t: float64): bool {.discardable.} =
# TODO to allow awaiting `diffuse` we return a dummy bool
# The parallel spawns are updating the same matrix cells otherwise
if iu - il > 1:
let im = (il + iu) div 2
let d = spawn diffuse(output, input, il, im, t)
diffuse(output, input, im, iu, t)
discard ^d
return true
# ------------------------
let i = il
let row = output.getRow(i)
if i == 0:
for j in 0 ..< ny:
row[j] = randc(yu + j*dy, t)
elif i == nx - 1:
for j in 0 ..< ny:
row[j] = randd(yu + j*dy, t)
else:
row[0] = randa(xu + i*dx, t)
for j in 1 ..< ny - 1:
row[j] = input[i, j] + # The use of nested sequences here is a bad idea ...
dtdysq * (input[i, j+1] - 2 * input[i, j] + input[i, j-1]) +
dtdxsq * (input[i+1, j] - 2 * input[i, j] + input[i-1, j])
row[ny - 1] = randb(xu + i*dx, t)
proc initTest() =
nx = n
ny = 1024
nt = 100
xu = 0.0
xo = 1.570796326794896558
yu = 0.0
yo = 1.570796326794896558
tu = 0.0
to = 0.0000001
dx = (xo - xu) / float64(nx - 1)
dy = (yo - yu) / float64(ny - 1)
dt = (to - tu) / float64(nt)
dtdxsq = dt / (dx * dx)
dtdysq = dt / (dy * dy)
even = newMatrix[float64](nx, ny)
odd = newMatrix[float64](nx, ny)
proc prep() =
heat(even, 0, nx)
proc test() =
var t = tu
for _ in countup(1, nt.int, 2):
# nt included
t += dt
diffuse(odd, even, 0, nx, t)
t += dt
diffuse(even, odd, 0, nx, t)
if nt mod 2 != 0:
t += dt
diffuse(odd, even, 0, nx, t)
proc verify() =
var
mat: Matrix[float64]
mae: float64
mre: float64
me: float64
mat = if nt mod 2 != 0: odd else: even
for a in 0 ..< nx:
for b in 0 ..< ny:
var tmp = abs(mat[a, b] - solu(xu + a*dx, yu + b*dy, to))
if tmp > 1e-3:
echo "nx: ", nx, " - ny: ", ny
echo "mat[", a, ", ", b, "] = ", mat[a, b], ", expected sol = ", solu(xu + a*dx, yu + b*dy, to)
quit 1
me += tmp
if tmp > mae: mae = tmp
if mat[a, b] != 0.0: tmp /= mat[a, b]
if tmp > mre: mre = tmp
me /= nx * ny
if mae > 1e-12:
echo &"Local maximal absolute error {mae:1.3e}"
quit 1
if mre > 1e-12:
echo &"Local maximal relative error {mre:1.3e}"
quit 1
if me > 1e-12:
echo &"Global mean absolute error {me:1.3e}"
quit 1
echo "Verification successful"
proc main() =
var nthreads: int
nthreads = countProcessors()
var ru: Rusage
getrusage(RusageSelf, ru)
var
rss = ru.ru_maxrss
flt = ru.ru_minflt
initTest()
prep()
let start = wtime_usec()
test()
let stop = wtime_usec()
getrusage(RusageSelf, ru)
rss = ru.ru_maxrss - rss
flt = ru.ru_minflt - flt
sync()
verify()
delete(even)
delete(odd)
echo "Scheduler: Nim threadpool (standard lib)"
echo "Benchmark: heat"
echo "Threads: ", nthreads
echo "Time(us) ", stop - start
echo "Max RSS (KB): ", ru.ru_maxrss
echo "Runtime RSS (KB): ", rss
echo "# of page faults: ", flt
quit 0
main()

View File

@ -0,0 +1,313 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# From fibril
#
# Original license
#
# /*
# * Heat diffusion (Jacobi-type iteration)
# *
# * Volker Strumpen, Boston August 1996
# *
# * Copyright (c) 1996 Massachusetts Institute of Technology
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# */
import
# Stdlib
strformat, os, strutils, math, system/ansi_c,
cpuinfo,
# Taskpools
../../taskpools
when not defined(windows):
# bench
import ../wtime, ../resources
# Helpers
# -------------------------------------------------------
# We need a thin wrapper around raw pointers for matrices,
# we can't pass "var seq[seq[float64]]" to other threads
# nor "var" for that matter
type
Matrix[T] = object
buffer: ptr UncheckedArray[T]
m, n: int
Row[T] = object
buffer: ptr UncheckedArray[T]
len: int
var tp: Taskpool
func newMatrix[T](m, n: int): Matrix[T] {.inline.} =
result.buffer = cast[ptr UncheckedArray[T]](c_malloc(csize_t m*n*sizeof(T)))
result.m = m
result.n = n
template `[]`[T](mat: Matrix[T], row, col: Natural): T =
# row-major storage
assert row < mat.m
assert col < mat.n
mat.buffer[row * mat.n + col]
template `[]=`[T](mat: Matrix[T], row, col: Natural, value: T) =
assert row < mat.m
assert col < mat.n
mat.buffer[row * mat.n + col] = value
func getRow[T](mat: Matrix[T], rowIdx: Natural): Row[T] {.inline.} =
# row-major storage, there are n columns in between each rows
assert rowIdx < mat.m
result.buffer = cast[ptr UncheckedArray[T]](mat.buffer[rowIdx * mat.n].addr)
result.len = mat.m
template `[]`[T](row: Row[T], idx: Natural): T =
assert idx < row.len
row.buffer[idx]
template `[]=`[T](row: Row[T], idx: Natural, value: T) =
assert idx < row.len
row.buffer[idx] = value
func delete[T](mat: sink Matrix[T]) =
c_free(mat.buffer)
# And an auto converter for int32 -> float64 so we don't have to convert
# all i, j indices manually
converter i32toF64(x: int32): float64 {.inline.} =
float64(x)
# -------------------------------------------------------
template f(x, y: SomeFloat): SomeFloat =
sin(x) * sin(y)
template randa[T: SomeFloat](x, t: T): T =
T(0.0)
proc randb(x, t: SomeFloat): SomeFloat {.inline.} =
# proc instead of template to avoid Nim constant folding bug:
# https://github.com/nim-lang/Nim/issues/12783
exp(-2 * t) * sin(x)
template randc[T: SomeFloat](y, t: T): T =
T(0.0)
proc randd(y, t: SomeFloat): SomeFloat {.inline.} =
# proc instead of template to avoid Nim constant folding bug:
# https://github.com/nim-lang/Nim/issues/12783
exp(-2 * t) * sin(y)
template solu(x, y, t: SomeFloat): SomeFloat =
exp(-2 * t) * sin(x) * sin(y)
const n = 4096'i32
var
nx, ny, nt: int32
xu, xo, yu, yo, tu, to: float64
dx, dy, dt: float64
dtdxsq, dtdysq: float64
odd: Matrix[float64]
even: Matrix[float64]
proc heat(m: Matrix[float64], il, iu: int32): bool {.discardable.}=
# TODO to allow awaiting `heat` we return a dummy bool
# The parallel spawns are updating the same matrix cells otherwise
if iu - il > 1:
let im = (il + iu) div 2
let h = tp.spawn heat(m, il, im)
heat(m, im, iu)
discard sync(h)
return true
# ------------------------
let i = il
let row = m.getRow(i)
if i == 0:
for j in 0 ..< ny:
row[j] = randc(yu + j*dy, 0)
elif i == nx - 1:
for j in 0 ..< ny:
row[j] = randd(yu + j*dy, 0)
else:
row[0] = randa(xu + i*dx, 0)
for j in 1 ..< ny - 1:
row[j] = f(xu + i*dx, yu + j*dy)
row[ny - 1] = randb(xu + i*dx, 0)
proc diffuse(output: Matrix[float64], input: Matrix[float64], il, iu: int32, t: float64): bool {.discardable.} =
# TODO to allow awaiting `diffuse` we return a dummy bool
# The parallel spawns are updating the same matrix cells otherwise
if iu - il > 1:
let im = (il + iu) div 2
let d = tp.spawn diffuse(output, input, il, im, t)
diffuse(output, input, im, iu, t)
discard sync(d)
return true
# ------------------------
let i = il
let row = output.getRow(i)
if i == 0:
for j in 0 ..< ny:
row[j] = randc(yu + j*dy, t)
elif i == nx - 1:
for j in 0 ..< ny:
row[j] = randd(yu + j*dy, t)
else:
row[0] = randa(xu + i*dx, t)
for j in 1 ..< ny - 1:
row[j] = input[i, j] + # The use of nested sequences here is a bad idea ...
dtdysq * (input[i, j+1] - 2 * input[i, j] + input[i, j-1]) +
dtdxsq * (input[i+1, j] - 2 * input[i, j] + input[i-1, j])
row[ny - 1] = randb(xu + i*dx, t)
proc initTest() =
nx = n
ny = 1024
nt = 100
xu = 0.0
xo = 1.570796326794896558
yu = 0.0
yo = 1.570796326794896558
tu = 0.0
to = 0.0000001
dx = (xo - xu) / float64(nx - 1)
dy = (yo - yu) / float64(ny - 1)
dt = (to - tu) / float64(nt)
dtdxsq = dt / (dx * dx)
dtdysq = dt / (dy * dy)
even = newMatrix[float64](nx, ny)
odd = newMatrix[float64](nx, ny)
proc prep() =
heat(even, 0, nx)
proc test() =
var t = tu
for _ in countup(1, nt.int, 2):
# nt included
t += dt
diffuse(odd, even, 0, nx, t)
t += dt
diffuse(even, odd, 0, nx, t)
if nt mod 2 != 0:
t += dt
diffuse(odd, even, 0, nx, t)
proc verify() =
var
mat: Matrix[float64]
mae: float64
mre: float64
me: float64
mat = if nt mod 2 != 0: odd else: even
for a in 0 ..< nx:
for b in 0 ..< ny:
var tmp = abs(mat[a, b] - solu(xu + a*dx, yu + b*dy, to))
if tmp > 1e-3:
echo "nx: ", nx, " - ny: ", ny
echo "mat[", a, ", ", b, "] = ", mat[a, b], ", expected sol = ", solu(xu + a*dx, yu + b*dy, to)
quit 1
me += tmp
if tmp > mae: mae = tmp
if mat[a, b] != 0.0: tmp /= mat[a, b]
if tmp > mre: mre = tmp
me /= nx * ny
if mae > 1e-12:
echo &"Local maximal absolute error {mae:1.3e}"
quit 1
if mre > 1e-12:
echo &"Local maximal relative error {mre:1.3e}"
quit 1
if me > 1e-12:
echo &"Global mean absolute error {me:1.3e}"
quit 1
echo "Verification successful"
proc main() =
var nthreads: int
if existsEnv"TASKPOOL_NUM_THREADS":
nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()
when not defined(windows):
var ru: Rusage
getrusage(RusageSelf, ru)
var
rss = ru.ru_maxrss
flt = ru.ru_minflt
initTest()
# Fibril initializes before benching
tp = Taskpool.new(numThreads = nthreads)
prep()
when not defined(windows):
let start = wtime_usec()
test()
when not defined(windows):
let stop = wtime_usec()
getrusage(RusageSelf, ru)
rss = ru.ru_maxrss - rss
flt = ru.ru_minflt - flt
tp.shutdown()
verify()
delete(even)
delete(odd)
echo "Scheduler: Taskpools"
echo "Benchmark: heat"
echo "Threads: ", nthreads
when not defined(windows):
echo "Time(us) ", stop - start
echo "Max RSS (KB): ", ru.ru_maxrss
echo "Runtime RSS (KB): ", rss
echo "# of page faults: ", flt
quit 0
main()

View File

@ -0,0 +1,12 @@
# Cache-Oblivious Matrix Multiplication
From Staccato and Cilk
https://bradley.csail.mit.edu/svn/repos/cilk/5.4.3/examples/matmul.cilk
See the paper ``Cache-Oblivious Algorithms'', by
Matteo Frigo, Charles E. Leiserson, Harald Prokop, and
Sridhar Ramachandran, FOCS 1999, for an explanation of
why this algorithm is good for caches.
Note that the benchmarks output incorrect matrix traces
according to the check ...

View File

@ -0,0 +1,213 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# Rectangular matrix multiplication.
#
# Adapted from Cilk 5.4.3 example
#
# https://bradley.csail.mit.edu/svn/repos/cilk/5.4.3/examples/matmul.cilk;
# See the paper ``Cache-Oblivious Algorithms'', by
# Matteo Frigo, Charles E. Leiserson, Harald Prokop, and
# Sridhar Ramachandran, FOCS 1999, for an explanation of
# why this algorithm is good for caches.
import
# Stdlib
strformat, os, strutils, math, system/ansi_c,
cpuinfo,
# Taskpool
../../taskpools,
# bench
../wtime, ../resources
# Helpers
# -------------------------------------------------------
# We need a thin wrapper around raw pointers for matrices,
# we can't pass "var" to other threads
type
Matrix[T: SomeFloat] = object
buffer: ptr UncheckedArray[T]
ld: int
var tp: Taskpool
func newMatrixNxN[T](n: int): Matrix[T] {.inline.} =
result.buffer = cast[ptr UncheckedArray[T]](c_malloc(csize_t n*n*sizeof(T)))
result.ld = n
template `[]`[T](mat: Matrix[T], row, col: Natural): T =
# row-major storage
assert row < mat.ld, $i & " < " & $mat.ld
assert col < mat.ld, $i & " < " & $mat.ld
mat.buffer[row * mat.ld + col]
template `[]=`[T](mat: Matrix[T], row, col: Natural, value: T) =
assert row < mat.ld, $i & " < " & $mat.ld
assert col < mat.ld, $i & " < " & $mat.ld
mat.buffer[row * mat.ld + col] = value
func stride*[T](mat: Matrix[T], row, col: Natural): Matrix[T]{.inline.}=
## Returns a new view offset by the row and column stride
result.buffer = cast[ptr UncheckedArray[T]](
addr mat.buffer[row*mat.ld + col]
)
func delete[T](mat: sink Matrix[T]) =
c_free(mat.buffer)
# -------------------------------------------------------
proc xorshiftRand(): uint32 =
var x {.global.} = uint32(2463534242)
x = x xor (x shr 13)
x = x xor (x shl 17)
x = x xor (x shr 5)
return x
func zero[T](A: Matrix[T]) =
# zeroing is not timed
zeroMem(A.buffer, A.ld * A.ld * sizeof(T))
proc fill[T](A: Matrix[T]) =
for i in 0 ..< A.ld:
for j in 0 ..< A.ld:
A[i, j] = T(xorshiftRand() mod A.ld.uint32)
func maxError(A, B: Matrix): float64 =
assert A.ld == B.ld
for i in 0 ..< A.ld:
for j in 0 ..< A.ld:
var diff = (A[i, j] - B[i, j]) / A[i, j]
if diff < 0:
diff = -diff
if diff > result:
result = diff
func check[T](A, B, C: Matrix[T], n: int): bool =
var
tr_C = 0.T
tr_AB = 0.T
for i in 0 ..< n:
for j in 0 ..< n:
tr_AB += A[i, j] * B[j, i]
tr_C += C[i, i]
# Note, all benchmarks return false ‾\_(ツ)_/‾
return abs(tr_AB - tr_C) < 1e-3
proc matmul[T](A, B, C: Matrix[T], m, n, p: int, add: bool): bool =
# The original bench passes around a ``ld`` parameter (leading dimension?),
# we store it in the matrices
# We return a dummy bool to allow waiting on the matmul
# Threshold
if (m + n + p) <= 64:
if add:
for i in 0 ..< m:
for k in 0 ..< p:
var c = 0.T
for j in 0 ..< n:
c += A[i, j] * B[j, k]
C[i, k] += c
else:
for i in 0 ..< m:
for k in 0 ..< p:
var c = 0.T
for j in 0 ..< n:
c += A[i, j] * B[j, k]
C[i, k] = c
return
var h0, h1: FlowVar[bool]
## Each half of the computation
# matrix is larger than threshold
if m >= n and n >= p:
let m1 = m shr 1 # divide by 2
h0 = tp.spawn matmul(A, B, C, m1, n, p, add)
h1 = tp.spawn matmul(A.stride(m1, 0), B, C.stride(m1, 0), m - m1, n, p, add)
elif n >= m and n >= p:
let n1 = n shr 1 # divide by 2
h0 = tp.spawn matmul(A, B, C, m, n1, p, add)
h1 = tp.spawn matmul(A.stride(0, n1), B.stride(n1, 0), C, m, n - n1, p, add = true)
else:
let p1 = p shr 1
h0 = tp.spawn matmul(A, B, C, m, n, p1, add)
h1 = tp.spawn matmul(A, B.stride(0, p1), C.stride(0, p1), m, n, p - p1, add)
discard sync(h0)
discard sync(h1)
proc main() =
echo "Warning the benchmark seems to not be correct."
var
n = 3000
nthreads: int
if existsEnv"TASKPOOL_NUM_THREADS":
nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()
if paramCount() == 0:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <n (matrix size):{n}>"
echo &"Running with default config n = {n}"
elif paramCount() == 1:
n = paramStr(1).parseInt()
else:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <n (matrix size):{n}>"
echo &"Up to 1 parameter is valid. Received {paramCount()}"
quit 1
var A = newMatrixNxN[float32](n)
var B = newMatrixNxN[float32](n)
var C = newMatrixNxN[float32](n)
fill(A)
fill(B)
zero(C)
var ru: Rusage
getrusage(RusageSelf, ru)
var
rss = ru.ru_maxrss
flt = ru.ru_minflt
# Staccato benches runtime init and exit as well
let start = wtime_msec()
tp = Taskpool.new(numThreads = nthreads)
discard sync tp.spawn matmul(A, B, C, n, n, n, add = false)
tp.shutdown()
let stop = wtime_msec()
getrusage(RusageSelf, ru)
rss = ru.ru_maxrss - rss
flt = ru.ru_minflt - flt
echo "Scheduler: Taskpool"
echo "Benchmark: Matrix Multiplication (cache oblivious)"
echo "Threads: ", nthreads
echo "Time(ms) ", stop - start
echo "Max RSS (KB): ", ru.ru_maxrss
echo "Runtime RSS (KB): ", rss
echo "# of page faults: ", flt
echo "Input: ", n
echo "Error: ", check(A, B, C, n)
delete A
delete B
delete C
quit 0
main()

View File

@ -0,0 +1,187 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
#
# Original code licenses
# ------------------------------------------------------------------------------------------------
#
# /**********************************************************************************************/
# /* This program is part of the Barcelona OpenMP Tasks Suite */
# /* Copyright (C) 2009 Barcelona Supercomputing Center - Centro Nacional de Supercomputacion */
# /* Copyright (C) 2009 Universitat Politecnica de Catalunya */
# /* */
# /* This program is free software; you can redistribute it and/or modify */
# /* it under the terms of the GNU General Public License as published by */
# /* the Free Software Foundation; either version 2 of the License, or */
# /* (at your option) any later version. */
# /* */
# /* This program is distributed in the hope that it will be useful, */
# /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
# /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
# /* GNU General Public License for more details. */
# /* */
# /* You should have received a copy of the GNU General Public License */
# /* along with this program; if not, write to the Free Software */
# /* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */
# /**********************************************************************************************/
#
# /*
# * Original code from the Cilk project (by Keith Randall)
# *
# * Copyright (c) 2000 Massachusetts Institute of Technology
# * Copyright (c) 2000 Matteo Frigo
# */
import
# Stdlib
system/ansi_c, strformat, os, strutils,
threadpool,
# bench
../wtime
# This deadlocks :/
# Nim helpers
# -------------------------------------------------
when defined(windows):
proc alloca(size: csize): pointer {.header: "<malloc.h>".}
else:
proc alloca(size: csize): pointer {.header: "<alloca.h>".}
template alloca*(T: typedesc): ptr T =
cast[ptr T](alloca(sizeof(T)))
template alloca*(T: typedesc, len: Natural): ptr UncheckedArray[T] =
cast[ptr UncheckedArray[T]](alloca(sizeof(T) * len))
proc wv_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} =
when defined(WV_useNimAlloc):
cast[type result](createSharedU(T, len))
else:
cast[type result](c_malloc(csize_t len*sizeof(T)))
proc wv_free*[T: ptr](p: T) {.inline.} =
when defined(WV_useNimAlloc):
freeShared(p)
else:
c_free(p)
# We assume that Nim zeroMem vs C memset
# and Nim copyMem vs C memcpy have no difference
# Nim does have extra checks to handle GC-ed types
# but they should be eliminated by the Nim compiler.
# -------------------------------------------------
type CharArray = ptr UncheckedArray[char]
var example_solution: ptr UncheckedArray[char]
func isValid(n: int32, a: CharArray): bool =
## `a` contains an array of `n` queen positions.
## Returns true if none of the queens conflict and 0 otherwise.
for i in 0'i32 ..< n:
let p = cast[int32](a[i])
for j in i+1 ..< n:
let q = cast[int32](a[j])
if q == p or q == p - (j-i) or q == p + (j-i):
return false
return true
proc nqueens_ser(n, j: int32, a: CharArray): int32 =
# Serial nqueens
if n == j:
# Good solution count it
if example_solution.isNil:
example_solution = wv_alloc(char, n)
copyMem(example_solution, a, n * sizeof(char))
return 1
# Try each possible position for queen `j`
for i in 0 ..< n:
a[j] = cast[char](i)
if isValid(j+1, a):
result += nqueens_ser(n, j+1, a)
proc nqueens_par(n, j: int32, a: CharArray): int32 =
if n == j:
# Good solution, count it
return 1
var localCounts = alloca(Flowvar[int32], n)
zeroMem(localCounts, n * sizeof(Flowvar[int32]))
# Try each position for queen `j`
for i in 0 ..< n:
var b = alloca(char, j+1)
copyMem(b, a, j * sizeof(char))
b[j] = cast[char](i)
if isValid(j+1, b):
localCounts[i] = spawn nqueens_par(n, j+1, b)
for i in 0 ..< n:
if not localCounts[i].isNil():
result += ^localCounts[i]
const solutions = [
1,
0,
0,
2,
10, # 5x5
4,
10,
92, # 8x8
352,
724, # 10x10
2680,
14200,
73712,
365596,
2279184, # 15x15
14772512
]
proc verifyQueens(n, res: int32) =
if n > solutions.len:
echo &"Cannot verify result: {n} is out of range [1,{solutions.len}]"
return
if res != solutions[n-1]:
echo &"N-Queens failure: {res} is different from expected {solutions[n-1]}"
proc main() =
if paramCount() != 1:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <n: number of queens on a nxn board>"
quit 0
let n = paramStr(1).parseInt.int32
if n notin 1 .. solutions.len:
echo &"The number of queens N (on a NxN board) must be in the range [1, {solutions.len}]"
quit 1
let start = wtime_msec()
let count = nqueens_par(n, 0, alloca(char, n))
let stop = wtime_msec()
verifyQueens(n, count)
if not example_solution.isNil:
stdout.write("Example solution: ")
for i in 0 ..< n:
c_printf("%2d ", example_solution[i])
stdout.write('\n')
echo &"Elapsed wall time: {stop-start:2.4f} ms"
main()

View File

@ -0,0 +1,229 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
#
# Original code licenses
# ------------------------------------------------------------------------------------------------
#
# /**********************************************************************************************/
# /* This program is part of the Barcelona OpenMP Tasks Suite */
# /* Copyright (C) 2009 Barcelona Supercomputing Center - Centro Nacional de Supercomputacion */
# /* Copyright (C) 2009 Universitat Politecnica de Catalunya */
# /* */
# /* This program is free software; you can redistribute it and/or modify */
# /* it under the terms of the GNU General Public License as published by */
# /* the Free Software Foundation; either version 2 of the License, or */
# /* (at your option) any later version. */
# /* */
# /* This program is distributed in the hope that it will be useful, */
# /* but WITHOUT ANY WARRANTY; without even the implied warranty of */
# /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */
# /* GNU General Public License for more details. */
# /* */
# /* You should have received a copy of the GNU General Public License */
# /* along with this program; if not, write to the Free Software */
# /* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */
# /**********************************************************************************************/
#
# /*
# * Original code from the Cilk project (by Keith Randall)
# *
# * Copyright (c) 2000 Massachusetts Institute of Technology
# * Copyright (c) 2000 Matteo Frigo
# */
import
# Stdlib
system/ansi_c, strformat, os, strutils, cpuinfo,
# Taskpools
../../taskpools
when not defined(windows):
# bench
import ../wtime, ../resources
# Nim helpers
# -------------------------------------------------
when defined(windows):
proc alloca(size: int): pointer {.header: "<malloc.h>".}
else:
proc alloca(size: int): pointer {.header: "<alloca.h>".}
template alloca*(T: typedesc): ptr T =
cast[ptr T](alloca(sizeof(T)))
template alloca*(T: typedesc, len: Natural): ptr UncheckedArray[T] =
cast[ptr UncheckedArray[T]](alloca(sizeof(T) * len))
proc tp_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} =
when defined(TP_useNimAlloc):
cast[type result](createSharedU(T, len))
else:
cast[type result](c_malloc(csize_t len*sizeof(T)))
proc tp_free*[T: ptr](p: T) {.inline.} =
when defined(TP_useNimAlloc):
freeShared(p)
else:
c_free(p)
# We assume that Nim zeroMem vs C memset
# and Nim copyMem vs C memcpy have no difference
# Nim does have extra checks to handle GC-ed types
# but they should be eliminated by the Nim compiler.
# -------------------------------------------------
type CharArray = ptr UncheckedArray[char]
var tp: Taskpool
var example_solution: ptr UncheckedArray[char]
func isValid(n: int32, a: CharArray): bool =
## `a` contains an array of `n` queen positions.
## Returns true if none of the queens conflict and 0 otherwise.
for i in 0'i32 ..< n:
let p = cast[int32](a[i])
for j in i+1 ..< n:
let q = cast[int32](a[j])
if q == p or q == p - (j-i) or q == p + (j-i):
return false
return true
proc nqueens_ser(n, j: int32, a: CharArray): int32 =
# Serial nqueens
if n == j:
# Good solution count it
if example_solution.isNil:
example_solution = tp_alloc(char, n)
copyMem(example_solution, a, n * sizeof(char))
return 1
# Try each possible position for queen `j`
for i in 0 ..< n:
a[j] = cast[char](i)
if isValid(j+1, a):
result += nqueens_ser(n, j+1, a)
proc nqueens_par(n, j: int32, a: CharArray): int32 =
if n == j:
# Good solution, count it
return 1
var localCounts = alloca(Flowvar[int32], n)
zeroMem(localCounts, n * sizeof(Flowvar[int32]))
# Try each position for queen `j`
for i in 0 ..< n:
var b = alloca(char, j+1)
copyMem(b, a, j * sizeof(char))
b[j] = cast[char](i)
if isValid(j+1, b):
localCounts[i] = tp.spawn nqueens_par(n, j+1, b)
for i in 0 ..< n:
if localCounts[i].isSpawned():
result += sync(localCounts[i])
const solutions = [
1,
0,
0,
2,
10, # 5x5
4,
10,
92, # 8x8
352,
724, # 10x10
2680,
14200,
73712,
365596,
2279184, # 15x15
14772512
]
proc verifyQueens(n, res: int32) =
if n > solutions.len:
echo &"Cannot verify result: {n} is out of range [1,{solutions.len}]"
return
if res != solutions[n-1]:
echo &"N-Queens failure: {res} is different from expected {solutions[n-1]}"
proc main() =
var
n = 11'i32
nthreads: int
if existsEnv"TASKPOOL_NUM_THREADS":
nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()
if paramCount() == 0:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <N:{n}>"
echo &"Running with default config N = {n}\n"
if paramCount() >= 1:
n = paramStr(1).parseInt.int32
if n notin 1 .. solutions.len:
echo &"The number of queens N (on a NxN board) must be in the range [1, {solutions.len}]"
quit 1
when not defined(windows):
var ru: Rusage
getrusage(RusageSelf, ru)
var
rss = ru.ru_maxrss
flt = ru.ru_minflt
tp = Taskpool.new(numThreads = nthreads)
when not defined(windows):
let start = wtime_msec()
let count = nqueens_par(n, 0, alloca(char, n))
when not defined(windows):
let stop = wtime_msec()
when not defined(windows):
getrusage(RusageSelf, ru)
rss = ru.ru_maxrss - rss
flt = ru.ru_minflt - flt
tp.shutdown()
verifyQueens(n, count)
if not example_solution.isNil:
stdout.write("Example solution: ")
for i in 0 ..< n:
c_printf("%2d ", example_solution[i])
stdout.write('\n')
echo "Scheduler: Taskpool"
echo "Benchmark: N-queens"
echo "Threads: ", nthreads
when not defined(windows):
echo "Time(us) ", stop - start
echo "Max RSS (KB): ", ru.ru_maxrss
echo "Runtime RSS (KB): ", rss
echo "# of page faults: ", flt
echo "Problem size: ", n,"x",n, " board with ",n, " queens"
echo "Solutions found: ", count
quit 0
main()

24
benchmarks/resources.nim Normal file
View File

@ -0,0 +1,24 @@
type
Timeval {.importc: "timeval", header:"<sys/time.h>", bycopy.} = object
Rusage* {.importc: "struct rusage", header:"<sys/resource.h>", bycopy.} = object
ru_utime {.importc.}: Timeval
ru_stime {.importc.}: Timeval
ru_maxrss* {.importc.}: int32 # Maximum resident set size
# ...
ru_minflt* {.importc.}: int32 # page reclaims (soft page faults)
RusageWho* {.size: sizeof(cint).} = enum
RusageChildren = -1
RusageSelf = 0
RusageThread = 1
when defined(debug):
var H_RUSAGE_SELF{.importc, header:"<sys/resource.h".}: cint
var H_RUSAGE_CHILDREN{.importc, header:"<sys/resource.h".}: cint
var H_RUSAGE_THREAD{.importc, header:"<sys/resource.h".}: cint
assert H_RUSAGE_SELF == ord(RusageSelf)
assert H_RUSAGE_CHILDREN = ord(RusageChildren)
assert H_RUSAGE_THREAD = ord(RusageThread)
proc getrusage*(who: RusageWho, usage: var Rusage) {.importc, header: "sys/resource.h".}

View File

@ -0,0 +1,7 @@
# Simple single-producer multiple consumers benchmarks
SPC A Simple Producer-Consumer benchmark.
A single worker produces n tasks,
each running for t microseconds. This benchmark allows us to test how many
concurrent consumers a single producer can sustain.

View File

@ -0,0 +1,145 @@
import
# STD lib
os, strutils, system/ansi_c, cpuinfo, strformat, math,
# Library
../../taskpools,
# bench
../wtime, ../resources
var NumTasksTotal: int32
var TaskGranularity: int32 # microsecond
var PollInterval: float64 # microsecond
var tp: Taskpool
var global_poll_elapsed {.threadvar.}: float64
template dummy_cpt(): untyped =
# Dummy computation
# Calculate fib(30) iteratively
var
fib = 0
f2 = 0
f1 = 1
for i in 2 .. 30:
fib = f1 + f2
f2 = f1
f1 = fib
proc spc_consume(usec: int32) =
var pollElapsed = 0'f64
let start = wtime_usec()
let stop = usec.float64
global_poll_elapsed = PollInterval
while true:
var elapsed = wtime_usec() - start
elapsed = elapsed - pollElapsed
if elapsed >= stop:
break
dummy_cpt()
# if elapsed >= global_poll_elapsed:
# let pollStart = wtime_usec()
# loadBalance(Weave)
# pollElapsed += wtime_usec() - pollStart
# global_poll_elapsed += PollInterval
# c_printf("Elapsed: %.2lfus\n", elapsed)
proc spc_consume_nopoll(usec: int32) =
let start = wtime_usec()
let stop = usec.float64
while true:
var elapsed = wtime_usec() - start
if elapsed >= stop:
break
dummy_cpt()
# c_printf("Elapsed: %.2lfus\n", elapsed)
proc spc_produce(n: int32) =
for i in 0 ..< n:
tp.spawn spc_consume(TaskGranularity)
proc spc_produce_seq(n: int32) =
for i in 0 ..< n:
spc_consume_no_poll(TaskGranularity)
proc main() =
NumTasksTotal = 1000000
TaskGranularity = 10
PollInterval = 10
if paramCount() == 0:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <# of tasks:{NumTasksTotal}> " &
&"<task granularity (us): {TaskGranularity}> " &
&"[polling interval (us): task granularity]"
echo &"Running with default config tasks = {NumTasksTotal}, granularity (us) = {TaskGranularity}, polling (us) = {PollInterval}"
if paramCount() >= 1:
NumTasksTotal = paramStr(1).parseInt.int32
if paramCount() >= 2:
TaskGranularity = paramStr(2). parseInt.int32
if paramCount() == 3:
PollInterval = paramStr(3).parseInt.float64
else:
PollInterval = TaskGranularity.float64
if paramCount() > 3:
let exeName = getAppFilename().extractFilename()
echo &"Usage: {exeName} <# of tasks:{NumTasksTotal}> " &
&"<task granularity (us): {TaskGranularity}> " &
&"[polling interval (us): task granularity]"
quit 1
var nthreads: int
if existsEnv"WEAVE_NUM_THREADS":
nthreads = getEnv"WEAVE_NUM_THREADS".parseInt()
else:
nthreads = countProcessors()
tp = Taskpool.new(numThreads = nthreads)
# measure overhead during tasking
var ru: Rusage
getrusage(RusageSelf, ru)
var
rss = ru.ru_maxrss
flt = ru.ru_minflt
let start = wtime_msec()
# spc_produce_seq(NumTasksTotal)
spc_produce(NumTasksTotal)
tp.syncAll()
let stop = wtime_msec()
getrusage(RusageSelf, ru)
rss = ru.ru_maxrss - rss
flt = ru.ru_minflt - flt
tp.shutdown()
echo "--------------------------------------------------------------------------"
echo "Scheduler: Taskpool"
echo "Benchmark: SPC (Single task Producer - multi Consumer)"
echo "Threads: ", nthreads
echo "Time(ms) ", round(stop - start, 3)
echo "Max RSS (KB): ", ru.ru_maxrss
echo "Runtime RSS (KB): ", rss
echo "# of page faults: ", flt
echo "--------------------------------------------------------------------------"
echo "# of tasks: ", NumTasksTotal
echo "Task granularity (us): ", TaskGranularity
echo "Polling / manual load balancing interval (us): ", PollInterval
quit 0
main()

53
benchmarks/wtime.h Normal file
View File

@ -0,0 +1,53 @@
#ifndef WTIME_H
#define WTIME_H
#include <sys/time.h>
#include <time.h>
// Number of seconds since the Epoch
static inline double Wtime_sec(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec / 1e6;
}
// Number of milliseconds since the Epoch
static inline double Wtime_msec(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1e3 + tv.tv_usec / 1e3;
}
// Number of microseconds since the Epoch
static inline double Wtime_usec(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1e6 + tv.tv_usec;
}
// Read time stamp counter on x86
static inline unsigned long long readtsc(void)
{
unsigned int lo, hi;
// RDTSC copies contents of 64-bit TSC into EDX:EAX
asm volatile ("rdtsc" : "=a" (lo), "=d" (hi));
return (unsigned long long)hi << 32 | lo;
}
#define WTIME_unique_var_name_paste(id, n) id ## n
#define WTIME_unique_var_name(id, n) WTIME_unique_var_name_paste(id, n)
#define WTIME_unique_var(id) WTIME_unique_var_name(id, __LINE__)
// Convenience macro for time measurement
#define WTIME(unit) \
double WTIME_unique_var(_start_##unit##_) = Wtime_##unit##ec(); \
int WTIME_unique_var(_i_) = 0; \
for (; WTIME_unique_var(_i_) == 0 || \
(printf("Elapsed wall time: %.2lf "#unit"\n", \
Wtime_##unit##ec() - WTIME_unique_var(_start_##unit##_)), 0); \
WTIME_unique_var(_i_)++)
#endif // WTIME_H

10
benchmarks/wtime.nim Normal file
View File

@ -0,0 +1,10 @@
import strutils, os
const cSourcesPath = currentSourcePath.rsplit(DirSep, 1)[0]
const cHeader = csourcesPath / "wtime.h"
{.passC: "-I" & cSourcesPath .}
proc wtime_usec*: float64 {.importc: "Wtime_usec", header: cHeader.}
proc wtime_msec*: float64 {.importc: "Wtime_msec", header: cHeader.}

17
doc/README.md Normal file
View File

@ -0,0 +1,17 @@
# Taskpools architecture
Taskpools architecture is a simple threadpool with work-stealing to handle unbalanced workloads.
## Architecture
### Processing steps
1. On a `spawn` expression, thread i packages the function call in a task.
2. It enqueues it in it's own dequeue.
3. It notify_one a condition variable that holds all sleeping threads.
4. The notified thread wakes up and
5. The notified thread randomly tries to steal a task in a worker.
6. If no tasks are found, it goes back to sleep.
7. Otherwise it runs the task.
8. On a `sync` statement, it runs task in its own task dequeue or steal a task from another worker.
9. Once the `sync` task is ready, it can run the following statements (continuation).

View File

@ -0,0 +1,43 @@
import ../taskpools/taskpools
import std/macros
block: # Async without result
proc display_int(x: int) =
stdout.write(x)
stdout.write(" - SUCCESS\n")
proc main() =
echo "\nSanity check 1: Printing 123456 654321 in parallel"
var tp = Taskpool.new(numThreads = 4)
tp.spawn display_int(123456)
tp.spawn display_int(654321)
tp.shutdown()
main()
block: # Async/Await
var tp: Taskpool
proc async_fib(n: int): int =
if n < 2:
return n
let x = tp.spawn async_fib(n-1)
let y = async_fib(n-2)
result = sync(x) + y
proc main2() =
echo "\nSanity check 2: fib(20)"
tp = Taskpool.new()
let f = async_fib(20)
tp.shutdown()
doAssert f == 6765
main2()

9
taskpools.nim Normal file
View File

@ -0,0 +1,9 @@
# Nim-Taskpools
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import taskpools/taskpools
export taskpools

33
taskpools/ast_utils.nim Normal file
View File

@ -0,0 +1,33 @@
# Nim-Taskpools
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import macros
template letsGoDeeper =
var rTree = node.kind.newTree()
for child in node:
rTree.add inspect(child)
return rTree
proc replaceSymsByIdents*(ast: NimNode): NimNode =
proc inspect(node: NimNode): NimNode =
case node.kind:
of {nnkIdent, nnkSym}:
return ident($node)
of nnkEmpty:
return node
of nnkLiterals:
return node
of nnkHiddenStdConv:
if node[1].kind == nnkIntLit:
return node[1]
else:
expectKind(node[1], nnkSym)
return ident($node[1])
else:
letsGoDeeper()
result = inspect(ast)

View File

@ -0,0 +1,178 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/atomics,
./instrumentation/[contracts, loggers]
type
ChannelSPSCSingle* = object
## A type-erased SPSC channel.
##
## Wait-free bounded single-producer single-consumer channel
## that can only buffer a single item
## Properties:
## - wait-free
## - supports weak memory models
## - buffers a single item
## - Padded to avoid false sharing in collections
## - No extra indirection to access the item, the buffer is inline the channel
## - Linearizable
## - Default usable size is 254 bytes (WV_MemBlockSize - 2).
## If used in an intrusive manner, it's 126 bytes due to the default 128 bytes padding.
##
## The channel should be the last field of an object if used in an intrusive manner
##
## Motivations for type erasure
## - when LazyFlowvar needs to be converted
## from stack-allocated memory to heap to extended their lifetime
## we have no type information at all as the whole runtime
## and especially tasks does not retain it.
##
## - When a task depends on a future that was generated from lazy loop-splitting
## we don't have type information either.
##
## - An extra benefit is probably easier embedding, or calling
## from C or JIT code.
full{.align: 64.}: Atomic[bool]
itemSize*: uint8
buffer*{.align: 8.}: UncheckedArray[byte]
proc `=`(
dest: var ChannelSPSCSingle,
source: ChannelSPSCSingle
) {.error: "A channel cannot be copied".}
proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} =
## If ChannelSPSCSingle is used intrusive another data structure
## be aware that it should be the last part due to ending by UncheckedArray
preCondition: itemsize.int in 0 .. int high(uint8)
chan.itemSize = uint8 itemsize
chan.full.store(false, moRelaxed)
func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} =
not chan.full.load(moAcquire)
func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} =
## Try receiving the item buffered in the channel
## Returns true if successful (channel was not empty)
##
## ⚠ Use only in the consumer thread that reads from the channel.
preCondition: (sizeof(T) == chan.itemsize.int) or
# Support dummy object
(sizeof(T) == 0 and chan.itemsize == 1)
let full = chan.full.load(moAcquire)
if not full:
return false
dst = cast[ptr T](chan.buffer.addr)[]
chan.full.store(false, moRelease)
return true
func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
## Try sending an item into the channel
## Reurns true if successful (channel was empty)
##
## ⚠ Use only in the producer thread that writes from the channel.
preCondition: (sizeof(T) == chan.itemsize.int) or
# Support dummy object
(sizeof(T) == 0 and chan.itemsize == 1)
let full = chan.full.load(moAcquire)
if full:
return false
cast[ptr T](chan.buffer.addr)[] = src
chan.full.store(true, moRelease)
return true
# Sanity checks
# ------------------------------------------------------------------------------
when isMainModule:
import ../memory/memory_pools
when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
template sendLoop[T](chan: var ChannelSPSCSingle,
data: sink T,
body: untyped): untyped =
while not chan.trySend(data):
body
template recvLoop[T](chan: var ChannelSPSCSingle,
data: var T,
body: untyped): untyped =
while not chan.tryRecv(data):
body
type
ThreadArgs = object
ID: WorkerKind
chan: ptr ChannelSPSCSingle
WorkerKind = enum
Sender
Receiver
template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} =
if args.ID == id:
body
proc thread_func(args: ThreadArgs) =
# Worker RECEIVER:
# ---------
# <- chan
# <- chan
# <- chan
#
# Worker SENDER:
# ---------
# chan <- 42
# chan <- 53
# chan <- 64
Worker(Receiver):
var val: int
for j in 0 ..< 10:
args.chan[].recvLoop(val):
# Busy loop, in prod we might want to yield the core/thread timeslice
discard
echo " Receiver got: ", val
doAssert val == 42 + j*11
Worker(Sender):
doAssert args.chan.full.load(moRelaxed) == false
for j in 0 ..< 10:
let val = 42 + j*11
args.chan[].sendLoop(val):
# Busy loop, in prod we might want to yield the core/thread timeslice
discard
echo "Sender sent: ", val
proc main() =
echo "Testing if 2 threads can send data"
echo "-----------------------------------"
var threads: array[2, Thread[ThreadArgs]]
var pool: TLPoolAllocator
pool.initialize()
var chan = pool.borrow(ChannelSPSCSingle)
chan[].initialize(itemSize = sizeof(int))
createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan))
createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan))
joinThread(threads[0])
joinThread(threads[1])
recycle(chan)
echo "-----------------------------------"
echo "Success"
main()

View File

@ -0,0 +1,181 @@
# Nim-Taskpools
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# chase_lev_deques.nim
# --------------------
# This file implements a Chase-Lev deque
# This is a single-consumer multi-consumer concurrent queue
# for work-stealing schedulers.
#
# Papers:
# - Dynamic Circular Work-Stealing Deque
# David Chase, Yossi Lev, 1993
# https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf
#
# - Correct and Efficient Work-Stealing for Weak Memory Models
# Nhat Minh Lê, Antoniu Pop, Albert Cohen, Francesco Zappa Nardelli, 2013
# https://fzn.fr/readings/ppopp13.pdf
#
# We straight translate the second paper which includes formal proofs of correctness,
# and uses modern C++11 code.
#
# A Chase-lev dequeue implements the following push, pop, steal.
#
# top bottom
# ---------------------------------
# | | | | <- push()
# steal() <- | Task 0 | Task 1 | Task 2 | -> pop()
# any thread | | | | owner-only
# ---------------------------------
#
# To reduce contention, stealing is done on the opposite end from push/pop
# so that there is a race only for the very last task.
{.push raises: [].}
import
system/ansi_c,
std/[locks, typetraits, atomics],
./instrumentation/[contracts, loggers]
type
Buf[T] = object
## Backend buffer of a ChaseLevDeque
## `capacity` MUST be a power of 2
capacity: int
mask: int # == capacity-1 implies (i and mask) == (i mod capacity)
rawBuffer: UncheckedArray[Atomic[T]]
ChaseLevDeque*[T] = object
## This implements a lock-free, growable, work-stealing deque.
## The owning thread enqueues and dequeues at the bottom
## Foreign threads steal at the top.
##
## Default queue size is 8
## Queue can grow to handle up to 34 359 738 368 tasks in flights
## TODO:
## with --gc:arc / --gc:orc, use a seq instead of a fixed max size.
top {.align: 64.}: Atomic[int]
bottom: Atomic[int]
buf: Atomic[ptr Buf[T]]
garbage: array[32, ptr Buf[T]] # up to 34 359 738 368 sized buffer
garbageUsed: uint8
func isPowerOfTwo(n: int): bool {.inline.} =
(n and (n - 1)) == 0 and (n != 0)
proc newBuf(T: typedesc, capacity: int): ptr Buf[T] =
# Tasks have a destructor
# static:
# doAssert supportsCopyMem(T), $T & " must be a (POD) plain-old-data type: no seq, string, ref."
preCondition: capacity.isPowerOfTwo()
result = cast[ptr Buf[T]](
c_malloc(csize_t 2*sizeof(int) + sizeof(T)*capacity)
)
result.capacity = capacity
result.mask = capacity - 1
result.rawBuffer.addr.zeroMem(sizeof(T)*capacity)
proc `[]=`[T](buf: var Buf[T], index: int, item: T) {.inline.} =
buf.rawBuffer[index and buf.mask].store(item, moRelaxed)
proc `[]`[T](buf: var Buf[T], index: int): T {.inline.} =
result = buf.rawBuffer[index and buf.mask].load(moRelaxed)
proc grow[T](deque: var ChaseLevDeque[T], buf: var ptr Buf[T], top, bottom: int) {.inline.} =
## Double the buffer size
## bottom is the last item index
##
## To handle race-conditions the current "top", "bottom" and "buf"
## have to be saved before calling this procedure.
## It reads and writes the "deque.buf", "deque.garbage" and "deque.garbageUsed"
# Read -> Copy -> Update
var tmp = newBuf(T, buf.capacity*2)
for i in top ..< bottom:
tmp[][i] = buf[][i]
# This requires 68+ billions tasks in flight (per-thread)
ascertain: deque.garbageUsed.int < deque.garbage.len
deque.garbage[deque.garbageUsed] = buf
swap(buf, tmp)
deque.buf.store(buf, moRelaxed)
# Public API
# ---------------------------------------------------
proc init*[T](deque: var ChaseLevDeque[T]) =
## Initializes a new Chase-lev work-stealing deque.
deque.reset()
deque.buf.store(newBuf(T, 8), moRelaxed)
proc teardown*[T](deque: var ChaseLevDeque[T]) =
## Teardown a Chase-lev work-stealing deque.
for i in 0 ..< deque.garbageUsed.int:
c_free(deque.garbage[i])
c_free(deque.buf.load(moRelaxed))
proc push*[T](deque: var ChaseLevDeque[T], item: T) =
## Enqueue an item at the bottom
## The item should not be used afterwards.
let # Handle race conditions
b = deque.bottom.load(moRelaxed)
t = deque.top.load(moAcquire)
var a = deque.buf.load(moRelaxed)
if b-t > a.capacity - 1:
# Full queue
deque.grow(a, t, b)
a[][b] = item
fence(moRelease)
deque.bottom.store(b+1, moRelaxed)
proc pop*[T](deque: var ChaseLevDeque[T]): T =
## Deque an item at the bottom
let # Handle race conditions
b = deque.bottom.load(moRelaxed) - 1
a = deque.buf.load(moRelaxed)
deque.bottom.store(b, moRelaxed)
fence(moSequentiallyConsistent)
var t = deque.top.load(moRelaxed)
if t <= b:
# Non-empty queue.
result = a[][b]
if t == b:
# Single last element in queue.
if not compare_exchange(deque.top, t, t+1, moSequentiallyConsistent, moRelaxed):
# Failed race.
result = default(T)
deque.bottom.store(b+1, moRelaxed)
else:
# Empty queue.
result = default(T)
deque.bottom.store(b+1, moRelaxed)
proc steal*[T](deque: var ChaseLevDeque[T]): T =
## Deque an item at the top
var t = deque.top.load(moAcquire)
fence(moSequentiallyConsistent)
let b = deque.bottom.load(moAcquire)
result = default(T)
if t < b:
# Non-empty queue.
let a = deque.buf.load(moConsume)
result = a[][t]
if not compare_exchange(deque.top, t, t+1, moSequentiallyConsistent, moRelaxed):
# Failed race.
return default(T)

View File

@ -0,0 +1,82 @@
# Nim-Taskpools
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# event_notifier.nim
# ------------------
# This file implements an event notifier.
# It allows putting idle threads to sleep or waking them up.
# Design
# Currently it is a shared lock + condition variable (a.k.a. a semaphore)
#
# In the future an eventcount might be considered, an event count significantly
# reduces scheduler overhead by removing lock acquisition from critical path.
# See overview and implementations at
# https://gist.github.com/mratsim/04a29bdd98d6295acda4d0677c4d0041
#
# Weave "one event-notifier per thread" further reduces overhead
# but requires the threadpool to be message-passing based.
# https://github.com/mratsim/weave/blob/a230cce98a8524b2680011e496ec17de3c1039f2/weave/cross_thread_com/event_notifiers.nim
import
std/locks,
./instrumentation/contracts
type
EventNotifier* = object
## This data structure allows threads to be parked when no events are pending
## and woken up when a new event is.
# Lock must be aligned to a cache-line to avoid false-sharing.
lock{.align: 64.}: Lock
cond: Cond
parked: int
signals: int
func initialize*(en: var EventNotifier) {.inline.} =
## Initialize the event notifier
en.lock.initLock()
en.cond.initCond()
en.parked = 0
en.signals = 0
func `=destroy`*(en: var EventNotifier) {.inline.} =
en.cond.deinitCond()
en.lock.deinitLock()
func `=`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be copied".}
func `=sink`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be moved".}
proc park*(en: var EventNotifier) {.inline.} =
## Wait until we are signaled of an event
## Thread is parked and does not consume CPU resources
en.lock.acquire()
preCondition: en.signals == 0
en.parked += 1
while en.signals == 0: # handle spurious wakeups
en.cond.wait(en.lock)
en.parked -= 1
en.signals -= 1
postCondition: en.signals >= 0
en.lock.release()
proc notify*(en: var EventNotifier) {.inline.} =
## Unpark a thread if any is available
en.lock.acquire()
if en.parked > 0:
en.signals += 1
en.cond.signal()
en.lock.release()
proc getParked*(en: var EventNotifier): int {.inline.} =
## Get the number of parked thread
en.lock.acquire()
result = en.parked
en.lock.release()

71
taskpools/flowvars.nim Normal file
View File

@ -0,0 +1,71 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
./channels_spsc_single,
system/ansi_c,
./instrumentation/contracts,
std/os
{.push gcsafe.}
type
Flowvar*[T] = object
## A Flowvar is a placeholder for a future result that may be computed in parallel
# Flowvar are optimized when containing a ptr type.
# They take less size in memory by testing isNil
# instead of having an extra atomic bool
# They also use type-erasure to avoid having duplicate code
# due to generic monomorphization.
chan: ptr ChannelSPSCSingle
# proc `=copy`*[T](dst: var Flowvar[T], src: Flowvar[T]) {.error: "Futures/Flowvars cannot be copied".}
#
# Unfortunately we cannot prevent this easily as internally
# we need a copy:
# - nim-taskpools level when doing toTask(fnCall(args, fut)) and then returning fut. (Can be worked around with copyMem)
# - in std/tasks (need upstream workaround)
proc newFlowVar*(T: typedesc): Flowvar[T] {.inline.} =
let size = 2 + sizeof(T) # full flag + item size + buffer
result.chan = cast[ptr ChannelSPSCSingle](c_calloc(1, csize_t size))
result.chan[].initialize(sizeof(T))
proc cleanup(fv: sink Flowvar) {.inline.} =
if not fv.chan.isNil:
c_free(fv.chan)
func isSpawned*(fv: Flowvar): bool {.inline.} =
## Returns true if a flowvar is spawned
## This may be useful for recursive algorithms that
## may or may not spawn a flowvar depending on a condition.
## This is similar to Option or Maybe types
return not fv.chan.isNil
proc readyWith*[T](fv: Flowvar[T], childResult: T) {.inline.} =
## Send the Flowvar result from the child thread processing the task
## to its parent thread.
let resultSent {.used.} = fv.chan[].trySend(childResult)
postCondition: resultSent
template tryComplete*[T](fv: Flowvar, parentResult: var T): bool =
fv.chan[].tryRecv(parentResult)
func isReady*[T](fv: Flowvar[T]): bool {.inline.} =
## Returns true if the result of a Flowvar is ready.
## In that case `sync` will not block.
## Otherwise the current will block to help on all the pending tasks
## until the Flowvar is ready.
not fv.chan[].isEmpty()
proc sync*[T](fv: sink Flowvar[T]): T {.inline, gcsafe.} =
## Blocks the current thread until the flowvar is available
## and returned.
## The thread is not idle and will complete pending tasks.
mixin forceFuture
forceFuture(fv, result)
cleanup(fv)

View File

@ -0,0 +1,113 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import macros, os, strutils
{.used.}
# A simple design-by-contract API
# ----------------------------------------------------------------------------------
# Everything should be a template that doesn't produce any code
# when WV_Asserts is not defined.
# Those checks are controlled by a custom flag instead of
# "--boundsChecks" or "--nilChecks" to decouple them from user code checks.
# Furthermore, we want them to be very lightweight on performance
# TODO auto-add documentation
proc inspectInfix(node: NimNode): NimNode =
## Inspect an expression,
## Returns the AST as string with runtime values inlined
## from infix operators inlined.
# TODO: pointer and custom type need a default repr
# otherwise we can only resulve simple expressions
proc inspect(node: NimNode): NimNode =
case node.kind:
of nnkInfix:
return newCall(
bindSym"&",
newCall(
bindSym"&",
newCall(ident"$", inspect(node[1])),
newLit(" " & $node[0] & " ")
),
newCall(ident"$", inspect(node[2]))
)
of {nnkIdent, nnkSym}:
return node
of nnkDotExpr:
return quote do:
when `node` is pointer or
`node` is ptr or
`node` is (proc):
toHex(cast[ByteAddress](`node`) and 0xffff_ffff)
else:
$(`node`)
of nnkPar:
result = nnkPar.newTree()
for sub in node:
result.add inspect(sub)
else:
return node.toStrLit()
return inspect(node)
macro assertContract(
checkName: static string,
predicate: untyped) =
let lineinfo = lineinfoObj(predicate)
let file = extractFilename(lineinfo.filename)
var strippedPredicate: NimNode
if predicate.kind == nnkStmtList:
assert predicate.len == 1, "Only one-liner conditions are supported"
strippedPredicate = predicate[0]
else:
strippedPredicate = predicate
let debug = "\n Contract violated for " & checkName & " at " & file & ":" & $lineinfo.line &
"\n " & $strippedPredicate.toStrLit &
"\n The following values are contrary to expectations:" &
"\n "
let values = inspectInfix(strippedPredicate)
let myID = quote do:
when declared(myID):
$myID()
else:
"N/A"
result = quote do:
{.noSideEffect.}:
when compileOption("assertions"):
assert(`predicate`, `debug` & $`values` & " [Worker " & `myID` & "]\n")
elif defined(WV_Asserts):
if unlikely(not(`predicate`)):
raise newException(AssertionError, `debug` & $`values` & '\n')
# A way way to get the caller function would be nice.
template preCondition*(require: untyped) =
## Optional runtime check before returning from a function
assertContract("pre-condition", require)
template postCondition*(ensure: untyped) =
## Optional runtime check at the start of a function
assertContract("post-condition", ensure)
template ascertain*(check: untyped) =
## Optional runtime check in the middle of processing
assertContract("transient condition", check)
# Sanity checks
# ----------------------------------------------------------------------------------
when isMainModule:
proc assertGreater(x, y: int) =
postcondition(x > y)
# We should get a nicely formatted exception
assertGreater(10, 12)

View File

@ -0,0 +1,22 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import system/ansi_c
{.used.}
template log*(args: varargs[untyped]): untyped =
c_printf(args)
flushFile(stdout)
template debugTermination*(body: untyped): untyped =
when defined(TP_DebugTermination) or defined(TP_Debug):
{.noSideEffect, gcsafe.}: body
template debug*(body: untyped): untyped =
when defined(TP_Debug):
{.noSideEffect, gcsafe.}: body

View File

@ -0,0 +1,52 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# Thread primitives
# ----------------------------------------------------------------------------------
type
Pthread {.importc: "pthread_t", header: "<sys/types.h>".} = distinct culong
CpuSet {.byref, importc: "cpu_set_t", header: "<sched.h>".} = object
proc pthread_self(): Pthread {.header: "<pthread.h>".}
proc pthread_setaffinity_np(
thread: Pthread,
cpuset_size: int,
cpuset: CpuSet
) {.header: "<pthread.h>".}
## Limit specified `thread` to run only on the processors
## represented in `cpuset`
# Note CpuSet is always passed by (hidden) pointer
proc cpu_zero(cpuset: var CpuSet) {.importc: "CPU_ZERO", header: "<sched.h>".}
## Clears the set so that it contains no CPU
proc cpu_set(cpu: cint, cpuset: var CpuSet) {.importc: "CPU_SET", header: "<sched.h>".}
## Add CPU to set
# Affinity
# ----------------------------------------------------------------------------------
# Nim doesn't allow the main thread to set its own affinity
proc set_thread_affinity(t: Pthread, cpu: int32) {.inline.}=
when defined(osx) or defined(android):
{.warning: "To improve performance we should pin threads to cores.\n" &
"This is not possible with MacOS or Android.".}
# Note: on Android it's even more complex due to the Big.Little architecture
# with cores with different performance profiles to save on battery
else:
var cpuset {.noinit.}: CpuSet
cpu_zero(cpuset)
cpu_set(cpu, cpuset)
pthread_setaffinity_np(t, sizeof(CpuSet), cpuset)
proc pinToCpu*(cpu: int32) {.inline.} =
## Set the affinity of the main thread (the calling thread)
set_thread_affinity(pthread_self(), cpu)

View File

@ -0,0 +1,18 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import winlean
when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
proc setThreadAffinityMask(hThread: Handle, dwThreadAffinityMask: uint) {.
importc: "SetThreadAffinityMask", stdcall, header: "<windows.h>".}
proc pinToCpu*(cpu: int32) {.inline.} =
## Set the affinity of the main thread (the calling thread)
setThreadAffinityMask(getThreadID(), uint(1 shl cpu))

View File

@ -0,0 +1,53 @@
# Synchronization Barriers
OSX does not implement pthread_barrier as its an optional part
of the POSIX standard and they probably want to drive people to libdispatch/Grand Central Dispatch.
So we need to roll our own with a POSIX compatible API.
## Glibc barriers, design bug and implementation
> Note: due to GPL licensing, do not lift the code.
> Not that we can as it is heavily dependent on futexes
> which are not available on OSX
We need to make sure that we don't hit the same bug
as glibc: https://sourceware.org/bugzilla/show_bug.cgi?id=13065
which seems to be an issue in some of the barrier implementations
in the wild.
The design of Glibc barriers is here:
https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/DESIGN-barrier.txt;h=23463c6b7e77231697db3e13933b36ce295365b1;hb=HEAD
And implementation:
- https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/pthread_barrier_destroy.c;h=76957adef3ee751e5b0cfa429fcf4dd3cfd80b2b;hb=HEAD
- https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/pthread_barrier_init.c;h=c8ebab3a3cb5cbbe469c0d05fb8d9ca0c365b2bb;hb=HEAD`
- https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/pthread_barrier_wait.c;h=49fcfd370c1c4929fdabdf420f2f19720362e4a0;hb=HEAD
## Synchronization barrier techniques
This article goes over the techniques of
"pool barrier" and "ticket barrier"
https://locklessinc.com/articles/barriers/
to reach 2x to 20x the speed of pthreads barrier
This course https://cs.anu.edu.au/courses/comp8320/lectures/aux/comp422-Lecture21-Barriers.pdf
goes over
- centralized barrier with sense reversal
- combining tree barrier
- dissemination barrier
- tournament barrier
- scalable tree barrier
More courses:
- http://www.cs.rochester.edu/u/sandhya/csc458/seminars/jb_Barrier_Methods.pdf
It however requires lightweight mutexes like Linux futexes
that OSX lacks.
This post goes over lightweight mutexes like Benaphores (from BeOS)
https://preshing.com/20120226/roll-your-own-lightweight-mutex/
This gives a few barrier implementations
http://gallium.inria.fr/~maranget/MPRI/02.pdf
and refers to Cubible paper for formally verifying synchronization barriers
http://cubicle.lri.fr/papers/jfla2014.pdf (in French)

View File

@ -0,0 +1,69 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
when defined(windows):
import ./barriers_windows
when compileOption("assertions"):
import os
type SyncBarrier* = SynchronizationBarrier
proc init*(syncBarrier: var SyncBarrier, threadCount: range[0'i32..high(int32)]) {.inline.} =
## Initialize a synchronization barrier that will block ``threadCount`` threads
## before release.
let err {.used.} = InitializeSynchronizationBarrier(syncBarrier, threadCount, -1)
when compileOption("assertions"):
if err != 1:
assert err == 0
raiseOSError(osLastError())
proc wait*(syncBarrier: var SyncBarrier): bool {.inline.} =
## Blocks thread at a synchronization barrier.
## Returns true for one of the threads (the last one on Windows, undefined on Posix)
## and false for the others.
result = bool EnterSynchronizationBarrier(syncBarrier, SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE)
proc delete*(syncBarrier: sink SyncBarrier) {.inline.} =
## Deletes a synchronization barrier.
## This assumes no race between waiting at a barrier and deleting it,
## and reuse of the barrier requires initialization.
DeleteSynchronizationBarrier(syncBarrier.addr)
else:
import ./barriers_posix
when compileOption("assertions"):
import os
type SyncBarrier* = PthreadBarrier
proc init*(syncBarrier: var SyncBarrier, threadCount: range[0'i32..high(int32)]) {.inline.} =
## Initialize a synchronization barrier that will block ``threadCount`` threads
## before release.
let err {.used.} = pthread_barrier_init(syncBarrier, nil, threadCount)
when compileOption("assertions"):
if err != 0:
raiseOSError(OSErrorCode(err))
proc wait*(syncBarrier: var SyncBarrier): bool {.inline.} =
## Blocks thread at a synchronization barrier.
## Returns true for one of the threads (the last one on Windows, undefined on Posix)
## and false for the others.
let err {.used.} = pthread_barrier_wait(syncBarrier)
when compileOption("assertions"):
if err != PTHREAD_BARRIER_SERIAL_THREAD and err < 0:
raiseOSError(OSErrorCode(err))
result = if err == PTHREAD_BARRIER_SERIAL_THREAD: true
else: false
proc delete*(syncBarrier: sink SyncBarrier) {.inline.} =
## Deletes a synchronization barrier.
## This assumes no race between waiting at a barrier and deleting it,
## and reuse of the barrier requires initialization.
let err {.used.} = pthread_barrier_destroy(syncBarrier)
when compileOption("assertions"):
if err < 0:
raiseOSError(OSErrorCode(err))

View File

@ -0,0 +1,88 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# OSX doesn't implement pthread_barrier_t
# It's an optional part of the POSIX standard
#
# This is a manual implementation of a sense reversing barrier
import locks
type
Natural32 = range[0'i32..high(int32)]
Errno* = cint
PthreadAttr* = object
## Dummy
PthreadBarrier* = object
## Implementation of a sense reversing barrier
## (The Art of Multiprocessor Programming by Maurice Herlihy & Nir Shavit)
lock: Lock # Alternatively spinlock on Atomic
cond {.guard: lock.}: Cond
sense {.guard: lock.}: bool # Choose int32 to avoid zero-expansion cost in registers?
left {.guard: lock.}: Natural32 # Number of threads missing at the barrier before opening
count: Natural32 # Total number of threads that need to arrive before opening the barrier
const
PTHREAD_BARRIER_SERIAL_THREAD* = Errno(1)
proc pthread_cond_broadcast(cond: var Cond): Errno {.header:"<pthread.h>".}
## Nim only signal one thread in locks
## We need to unblock all
proc broadcast(cond: var Cond) {.inline.}=
discard pthread_cond_broadcast(cond)
func pthread_barrier_init*(
barrier: var PthreadBarrier,
attr: ptr PthreadAttr,
count: range[0'i32..high(int32)]
): Errno =
barrier.lock.initLock()
{.locks: [barrier.lock].}:
barrier.cond.initCond()
barrier.left = count
barrier.count = count
# barrier.sense = false
proc pthread_barrier_wait*(barrier: var PthreadBarrier): Errno =
## Wait on `barrier`
## Returns PTHREAD_BARRIER_SERIAL_THREAD for a single arbitrary thread
## Returns 0 for the other
## Returns Errno if there is an error
barrier.lock.acquire()
{.locks: [barrier.lock].}:
var local_sense = barrier.sense # Thread local sense
dec barrier.left
if barrier.left == 0:
# Last thread to arrive at the barrier
# Reverse phase and release it
barrier.left = barrier.count
barrier.sense = not barrier.sense
barrier.cond.broadcast()
barrier.lock.release()
return PTHREAD_BARRIER_SERIAL_THREAD
while barrier.sense == local_sense:
# We are waiting for threads
# Wait for the sense to reverse
# while loop because we might have spurious wakeups
barrier.cond.wait(barrier.lock)
# Reversed, we can leave the barrier
barrier.lock.release()
return Errno(0)
proc pthread_barrier_destroy*(barrier: var PthreadBarrier): Errno =
{.locks: [barrier.lock].}:
barrier.cond.deinitCond()
barrier.lock.deinitLock()
# TODO: tests

View File

@ -0,0 +1,51 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# Abstractions over POSIX barriers (non-)implementations
when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
# Types
# -------------------------------------------------------
when defined(osx):
import ./barriers_macos
export PthreadAttr, PthreadBarrier, Errno, PTHREAD_BARRIER_SERIAL_THREAD
else:
type
PthreadAttr* {.byref, importc: "pthread_attr_t", header: "<sys/types.h>".} = object
PthreadBarrier* {.byref, importc: "pthread_barrier_t", header: "<sys/types.h>".} = object
Errno* = cint
var PTHREAD_BARRIER_SERIAL_THREAD* {.importc, header:"<pthread.h>".}: Errno
# Pthread
# -------------------------------------------------------
when defined(osx):
export pthread_barrier_init, pthread_barrier_wait, pthread_barrier_destroy
else:
proc pthread_barrier_init*(
barrier: PthreadBarrier,
attr: PthreadAttr or ptr PthreadAttr,
count: range[0'i32..high(int32)]
): Errno {.header: "<pthread.h>".}
## Initialize `barrier` with the attributes `attr`.
## The barrier is opened when `count` waiters arrived.
proc pthread_barrier_destroy*(
barrier: sink PthreadBarrier): Errno {.header: "<pthread.h>".}
## Destroy a previously dynamically initialized `barrier`.
proc pthread_barrier_wait*(
barrier: var PthreadBarrier
): Errno {.header: "<pthread.h>".}
## Wait on `barrier`
## Returns PTHREAD_BARRIER_SERIAL_THREAD for a single arbitrary thread
## Returns 0 for the other
## Returns Errno if there is an error

View File

@ -0,0 +1,31 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import winlean
# Technically in <synchapi.h> but MSVC complains with
# @m..@s..@sweave@sscheduler.nim.cpp
# C:\Program Files (x86)\Windows Kits\10\include\10.0.17763.0\um\winnt.h(154): fatal error C1189: #error: "No Target Architecture
type
SynchronizationBarrier*{.importc:"SYNCHRONIZATION_BARRIER", header:"<windows.h>".} = object
var SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE* {.importc, header: "<windows.h>".}: DWORD
## Skip expensive checks on barrier enter if a barrier is never deleted.
proc EnterSynchronizationBarrier*(lpBarrier: var SynchronizationBarrier, dwFlags: DWORD): WINBOOL {.importc, stdcall, header: "<windows.h>".}
proc DeleteSynchronizationBarrier*(lpBarrier: ptr SynchronizationBarrier) {.importc, stdcall, header: "<windows.h>".}
proc InitializeSynchronizationBarrier*(lpBarrier: var SynchronizationBarrier, lTotalThreads: LONG, lSpinCount: LONG): WINBOOL {.importc, stdcall, header: "<windows.h>".}
when isMainModule:
import os
var x{.noinit.}: SynchronizationBarrier
let err = InitializeSynchronizationBarrier(x, 2, -1)
if err != 1:
assert err == 0
raiseOSError(osLastError())

View File

@ -0,0 +1,12 @@
# Versions
## std/tasks
- https://github.com/nim-lang/Nim/blob/3619a5a2aa1c7387ec7df01b195bc683943654ff/lib/std/tasks.nim
We don't support aborting if there is a closure as this requires [#17501](https://github.com/nim-lang/Nim/pull/17501/files)
## std/isolation
- https://github.com/nim-lang/Nim/blob/603af22b7ca46ac566f8c7c15402028f3f976a4e/lib/std/isolation.nim
## std/effecttraits
- https://github.com/nim-lang/Nim/blob/603af22b7ca46ac566f8c7c15402028f3f976a4e/lib/std/effecttraits.nim

View File

@ -0,0 +1,54 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2020 Nim contributors
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module provides access to the inferred .raises effects
## for Nim's macro system.
## **Since**: Version 1.4.
##
## One can test for the existance of this standard module
## via `defined(nimHasEffectTraitsModule)`.
import macros
proc getRaisesListImpl(n: NimNode): NimNode = discard "see compiler/vmops.nim"
proc getTagsListImpl(n: NimNode): NimNode = discard "see compiler/vmops.nim"
proc isGcSafeImpl(n: NimNode): bool = discard "see compiler/vmops.nim"
proc hasNoSideEffectsImpl(n: NimNode): bool = discard "see compiler/vmops.nim"
proc getRaisesList*(fn: NimNode): NimNode =
## Extracts the `.raises` list of the func/proc/etc `fn`.
## `fn` has to be a resolved symbol of kind `nnkSym`. This
## implies that the macro that calls this proc should accept `typed`
## arguments and not `untyped` arguments.
expectKind fn, nnkSym
result = getRaisesListImpl(fn)
proc getTagsList*(fn: NimNode): NimNode =
## Extracts the `.tags` list of the func/proc/etc `fn`.
## `fn` has to be a resolved symbol of kind `nnkSym`. This
## implies that the macro that calls this proc should accept `typed`
## arguments and not `untyped` arguments.
expectKind fn, nnkSym
result = getTagsListImpl(fn)
proc isGcSafe*(fn: NimNode): bool =
## Return true if the func/proc/etc `fn` is `gcsafe`.
## `fn` has to be a resolved symbol of kind `nnkSym`. This
## implies that the macro that calls this proc should accept `typed`
## arguments and not `untyped` arguments.
expectKind fn, nnkSym
result = isGcSafeImpl(fn)
proc hasNoSideEffects*(fn: NimNode): bool =
## Return true if the func/proc/etc `fn` has `noSideEffect`.
## `fn` has to be a resolved symbol of kind `nnkSym`. This
## implies that the macro that calls this proc should accept `typed`
## arguments and not `untyped` arguments.
expectKind fn, nnkSym
result = hasNoSideEffectsImpl(fn)

View File

@ -0,0 +1,50 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2020 Nim contributors
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module implements the `Isolated[T]` type for
## safe construction of isolated subgraphs that can be
## passed efficiently to different channels and threads.
##
## .. warning:: This module is experimental and its interface may change.
##
type
Isolated*[T] = object ## Isolated data can only be moved, not copied.
value: T
proc `=copy`*[T](dest: var Isolated[T]; src: Isolated[T]) {.error.}
proc `=sink`*[T](dest: var Isolated[T]; src: Isolated[T]) {.inline.} =
# delegate to value's sink operation
`=sink`(dest.value, src.value)
proc `=destroy`*[T](dest: var Isolated[T]) {.inline.} =
# delegate to value's destroy operation
`=destroy`(dest.value)
# XXX: removed the {.magic: "Isolate".}
func isolate*[T](value: sink T): Isolated[T] =
## Creates an isolated subgraph from the expression `value`.
## Isolation is checked at compile time.
##
## Please read https://github.com/nim-lang/RFCs/issues/244
## for more details.
Isolated[T](value: value)
func unsafeIsolate*[T](value: sink T): Isolated[T] =
## Creates an isolated subgraph from the expression `value`.
##
## .. warning:: The proc doesn't check whether `value` is isolated.
##
Isolated[T](value: value)
func extract*[T](src: var Isolated[T]): T =
## Returns the internal value of `src`.
## The value is moved from `src`.
result = move(src.value)

View File

@ -0,0 +1,284 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2021 Nim contributors
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## This module provides basic primitives for creating parallel programs.
## A `Task` should be only owned by a single Thread, it cannot be shared by threads.
import std/[macros, typetraits]
import system/ansi_c
import ./isolation
export isolation
when compileOption("threads"):
from ./effecttraits import isGcSafe
#
# proc hello(a: int, b: string) =
# echo $a & b
#
# let literal = "Nim"
# let t = toTask(hello(521, literal))
#
#
# is roughly converted to
#
# type
# ScratchObj_369098780 = object
# a: int
# b: string
#
# let scratch_369098762 = cast[ptr ScratchObj_369098780](c_calloc(csize_t 1,
# csize_t sizeof(ScratchObj_369098780)))
# if scratch_369098762.isNil:
# raise newException(OutOfMemDefect, "Could not allocate memory")
# block:
# var isolate_369098776 = isolate(521)
# scratch_369098762.a = extract(isolate_369098776)
# var isolate_369098778 = isolate(literal)
# scratch_369098762.b = extract(isolate_369098778)
# proc hello_369098781(args`gensym3: pointer) {.nimcall.} =
# let objTemp_369098775 = cast[ptr ScratchObj_369098780](args`gensym3)
# let :tmp_369098777 = objTemp_369098775.a
# let :tmp_369098779 = objTemp_369098775.b
# hello(a = :tmp_369098777, b = :tmp_369098779)
#
# proc destroyScratch_369098782(args`gensym3: pointer) {.nimcall.} =
# let obj_369098783 = cast[ptr ScratchObj_369098780](args`gensym3)
# =destroy(obj_369098783[])
# let t = Task(callback: hello_369098781, args: scratch_369098762, destroy: destroyScratch_369098782)
#
type
Task* = object ## `Task` contains the callback and its arguments.
callback: proc (args: pointer) {.nimcall, gcsafe.}
args: pointer
destroy: proc (args: pointer) {.nimcall.}
proc `=copy`*(x: var Task, y: Task) {.error.}
proc `=destroy`*(t: var Task) {.inline.} =
## Frees the resources allocated for a `Task`.
if t.args != nil:
if t.destroy != nil:
t.destroy(t.args)
c_free(t.args)
proc invoke*(task: Task) {.inline, gcsafe.} =
## Invokes the `task`.
assert task.callback != nil
task.callback(task.args)
template checkIsolate(scratchAssignList: seq[NimNode], procParam, scratchDotExpr: NimNode) =
# block:
# var isoTempA = isolate(521)
# scratch.a = extract(isolateA)
# var isoTempB = isolate(literal)
# scratch.b = extract(isolateB)
let isolatedTemp = genSym(nskTemp, "isoTemp")
# XXX: Fix sym bindings
# scratchAssignList.add newVarStmt(isolatedTemp, newCall(newidentNode("isolate"), procParam))
# scratchAssignList.add newAssignment(scratchDotExpr,
# newcall(newIdentNode("extract"), isolatedTemp))
scratchAssignList.add newVarStmt(isolatedTemp, newCall(bindSym("isolate"), procParam))
scratchAssignList.add newAssignment(scratchDotExpr,
newcall(bindSym("extract"), isolatedTemp))
template addAllNode(assignParam: NimNode, procParam: NimNode) =
let scratchDotExpr = newDotExpr(scratchIdent, formalParams[i][0])
checkIsolate(scratchAssignList, procParam, scratchDotExpr)
let tempNode = genSym(kind = nskTemp, ident = formalParams[i][0].strVal)
callNode.add nnkExprEqExpr.newTree(formalParams[i][0], tempNode)
tempAssignList.add newLetStmt(tempNode, newDotExpr(objTemp, formalParams[i][0]))
scratchRecList.add newIdentDefs(newIdentNode(formalParams[i][0].strVal), assignParam)
macro toTask*(e: typed{nkCall | nkInfix | nkPrefix | nkPostfix | nkCommand | nkCallStrLit}): Task =
## Converts the call and its arguments to `Task`.
runnableExamples("--gc:orc"):
proc hello(a: int) = echo a
let b = toTask hello(13)
assert b is Task
doAssert getTypeInst(e).typeKind == ntyVoid
# requires 1.6
# when compileOption("threads"):
# if not isGcSafe(e[0]):
# error("'toTask' takes a GC safe call expression")
# TODO
# https://github.com/nim-lang/Nim/pull/17501/files
#
# if hasClosure(e[0]):
# error("closure call is not allowed")
if e.len > 1:
let scratchIdent = genSym(kind = nskTemp, ident = "scratch")
let impl = e[0].getTypeInst
when defined(nimTasksDebug):
echo impl.treeRepr
echo e.treeRepr
let formalParams = impl[0]
var
scratchRecList = newNimNode(nnkRecList)
scratchAssignList: seq[NimNode]
tempAssignList: seq[NimNode]
callNode: seq[NimNode]
let
objTemp = genSym(nskTemp, ident = "objTemp")
for i in 1 ..< formalParams.len:
var param = formalParams[i][1]
if param.kind == nnkBracketExpr and param[0].eqIdent("sink"):
param = param[0]
if param.typeKind in {ntyExpr, ntyStmt}:
error("'toTask'ed function cannot have a 'typed' or 'untyped' parameter")
case param.kind
of nnkVarTy:
error("'toTask'ed function cannot have a 'var' parameter")
of nnkBracketExpr:
if param[0].typeKind == ntyTypeDesc:
callNode.add nnkExprEqExpr.newTree(formalParams[i][0], e[i])
elif param[0].typeKind in {ntyVarargs, ntyOpenArray}:
if param[1].typeKind in {ntyExpr, ntyStmt}:
error("'toTask'ed function cannot have a 'typed' or 'untyped' parameter")
let
seqType = nnkBracketExpr.newTree(newIdentNode("seq"), param[1])
seqCallNode = newcall("@", e[i])
addAllNode(seqType, seqCallNode)
else:
addAllNode(param, e[i])
of nnkBracket, nnkObjConstr:
# passing by static parameters
# so we pass them directly instead of passing by scratchObj
callNode.add nnkExprEqExpr.newTree(formalParams[i][0], e[i])
of nnkSym, nnkPtrTy:
addAllNode(param, e[i])
of nnkCharLit..nnkNilLit:
callNode.add nnkExprEqExpr.newTree(formalParams[i][0], e[i])
else:
error("not supported type kinds")
let scratchObjType = genSym(kind = nskType, ident = "ScratchObj")
let scratchObj = nnkTypeSection.newTree(
nnkTypeDef.newTree(
scratchObjType,
newEmptyNode(),
nnkObjectTy.newTree(
newEmptyNode(),
newEmptyNode(),
scratchRecList
)
)
)
let scratchObjPtrType = quote do:
cast[ptr `scratchObjType`](c_calloc(csize_t 1, csize_t sizeof(`scratchObjType`)))
let scratchLetSection = newLetStmt(
scratchIdent,
scratchObjPtrType
)
let scratchCheck = quote do:
if `scratchIdent`.isNil:
# Renamed in 1.4
# raise newException(OutOfMemDefect, "Could not allocate memory")
raise newException(OutOfMemError, "Could not allocate memory")
var stmtList = newStmtList()
stmtList.add(scratchObj)
stmtList.add(scratchLetSection)
stmtList.add(scratchCheck)
stmtList.add(nnkBlockStmt.newTree(newEmptyNode(), newStmtList(scratchAssignList)))
var functionStmtList = newStmtList()
let funcCall = newCall(e[0], callNode)
functionStmtList.add tempAssignList
functionStmtList.add funcCall
let funcName = genSym(nskProc, e[0].strVal)
let destroyName = genSym(nskProc, "destroyScratch")
let objTemp2 = genSym(ident = "obj")
let tempNode = quote("@") do:
`=destroy`(@objTemp2[])
result = quote do:
`stmtList`
proc `funcName`(args: pointer) {.gcsafe, nimcall.} =
let `objTemp` = cast[ptr `scratchObjType`](args)
`functionStmtList`
proc `destroyName`(args: pointer) {.nimcall.} =
let `objTemp2` = cast[ptr `scratchObjType`](args)
`tempNode`
Task(callback: `funcName`, args: `scratchIdent`, destroy: `destroyName`)
else:
let funcCall = newCall(e[0])
let funcName = genSym(nskProc, e[0].strVal)
result = quote do:
proc `funcName`(args: pointer) {.gcsafe, nimcall.} =
`funcCall`
Task(callback: `funcName`, args: nil)
when defined(nimTasksDebug):
echo result.repr
runnableExamples("--gc:orc"):
block:
var num = 0
proc hello(a: int) = inc num, a
let b = toTask hello(13)
b.invoke()
assert num == 13
# A task can be invoked multiple times
b.invoke()
assert num == 26
block:
type
Runnable = ref object
data: int
var data: int
proc hello(a: Runnable) {.nimcall.} =
a.data += 2
data = a.data
when false:
# the parameters of call must be isolated.
let x = Runnable(data: 12)
let b = toTask hello(x) # error ----> expression cannot be isolated: x
b.invoke()
let b = toTask(hello(Runnable(data: 12)))
b.invoke()
assert data == 14
b.invoke()
assert data == 16

151
taskpools/sparsesets.nim Normal file
View File

@ -0,0 +1,151 @@
# Weave
# Copyright (c) 2019 Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/random,
system/ansi_c,
./instrumentation/contracts
const TP_MaxWorkers = 255
type Setuint = uint8 # We support at most 255 threads (0xFF is kept as special value to signify absence in the set)
const Empty = high(Setuint)
type
SparseSet* = object
## Stores efficiently a set of integers in the range [0 .. Capacity)
## Supports:
## - O(1) inclusion, exclusion and contains
## - O(1) random pick
## - O(1) length
## - O(length) iteration
##
## Space: Capacity * sizeof(words)
##
## This is contrary to bitsets which requires:
## - random picking: multiple random "contains" + a fallback to uncompressing the set
## - O(Capacity/sizeof(words)) length (via popcounts)
## - O(capacity) iteration
indices: ptr UncheckedArray[Setuint]
values: ptr UncheckedArray[Setuint]
rawBuffer: ptr UncheckedArray[Setuint]
len*: Setuint
capacity*: Setuint
func allocate*(s: var SparseSet, capacity: SomeInteger) {.inline.} =
preCondition: capacity <= TP_MaxWorkers
s.capacity = Setuint capacity
s.rawBuffer = cast[ptr UncheckedArray[Setuint]](c_calloc(csize_t 2*capacity, csize_t sizeof(Setuint)))
s.indices = s.rawBuffer
s.values = cast[ptr UncheckedArray[Setuint]](s.rawBuffer[capacity].addr)
func delete*(s: var SparseSet) {.inline.} =
s.indices = nil
s.values = nil
c_free(s.rawBuffer)
func refill*(s: var SparseSet) {.inline.} =
## Reset the sparseset by including all integers
## in the range [0 .. Capacity)
preCondition: not s.indices.isNil
preCondition: not s.values.isNil
preCondition: not s.rawBuffer.isNil
preCondition: s.capacity != 0
s.len = s.capacity
for i in Setuint(0) ..< s.len:
s.indices[i] = i
s.values[i] = i
func isEmpty*(s: SparseSet): bool {.inline.} =
s.len == 0
func contains*(s: SparseSet, n: SomeInteger): bool {.inline.} =
assert n.int != Empty.int
s.indices[n] != Empty
func incl*(s: var SparseSet, n: SomeInteger) {.inline.} =
preCondition: n < Empty
if n in s: return
preCondition: s.len < s.capacity
s.indices[n] = s.len
s.values[s.len] = n
s.len += 1
func peek*(s: SparseSet): int32 {.inline.} =
## Returns the last point in the set
## Note: if an item is deleted this is not the last inserted point
preCondition: s.len.int > 0
int32 s.values[s.len - 1]
func excl*(s: var SparseSet, n: SomeInteger) {.inline.} =
if n notin s: return
# We do constant time deletion by replacing the deleted
# integer by the last value in the array of values
let delIdx = s.indices[n]
s.len -= 1
let lastVal = s.values[s.len]
s.indices[lastVal] = del_idx # Last value now points to deleted index
s.values[delIdx] = s.values[lastVal] # Deleted item is now last value
# Erase the item
s.indices[n] = Empty
func randomPick*(s: SparseSet, rng: var Rand): int {.inline.} =
## Randomly pick from the set.
# The value is NOT removed from it.
let pickIdx = rng.rand(s.len-1)
result = s.values[pickIdx].int
func `$`*(s: SparseSet): string =
$toOpenArray(s.values, 0, s.len.int - 1)
# Sanity checks
# ------------------------------------------------------------------------------
when isMainModule:
const Size = 10
const Picked = 5
var S: SparseSet
S.allocate(Size)
S.refill()
echo S
var rngState = initRand(123)
var picked: seq[int]
for _ in 0 ..< Picked:
let p = S.randomPick(rngState)
picked.add p
S.excl p
echo "---"
echo "picked: ", p
echo "S indices: ", toOpenArray(S.indices, 0, S.capacity.int - 1)
echo "---"
echo "picked: ", picked
echo "S: ", S
echo "S indices: ", toOpenArray(S.indices, 0, S.capacity.int - 1)
for x in 0 ..< Size:
if x notin picked:
echo x, " notin picked -> in S"
doAssert x in S
else:
echo x, " in picked -> notin S"
doAssert x notin S

530
taskpools/taskpools.nim Normal file
View File

@ -0,0 +1,530 @@
# Nim-Taskpools
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# Taskpools
#
# This file implements a taskpool
#
# Implementation:
#
# It is a simple shared memory based work-stealing threadpool.
# The primary focus is:
# - Delegate compute intensive tasks to the threadpool.
# - Simple to audit by staying close to foundational papers
# and using simple datastructures otherwise.
# - Low energy consumption:
# threads should be put to sleep ASAP
# instead of polling/spinning (energy vs latency tradeoff)
# - Decent performance:
# Work-stealing has optimal asymptotic parallel speedup.
# Work-stealing has significantly reduced contention
# when many tasks are created,
# for example by divide-and-conquer algorithms, compared to a global task queue
#
# Not a priority:
# - Handling trillions of very short tasks (less than 100µs).
# - Advanced task dependencies or events API.
# - Unbalanced parallel-for loops.
# - Handling services that should run for the lifetime of the program.
#
# Doing IO on a compute threadpool should be avoided
# In case a thread is blocked for IO, other threads can steal pending tasks in that thread.
# If all threads are pending for IO, the threadpool will not make any progress and be soft-locked.
{.push raises: [].}
import
system/ansi_c,
std/[random, cpuinfo, atomics, macros],
./channels_spsc_single,
./chase_lev_deques,
./event_notifiers,
./primitives/barriers,
./instrumentation/[contracts, loggers],
./sparsesets,
./flowvars,
./ast_utils
export
# flowvars
Flowvar, isSpawned, isReady, sync
when defined(windows):
import ./primitives/affinity_windows
else:
import ./primitives/affinity_posix
when (NimMajor,NimMinor,NimPatch) >= (1,6,0):
import std/tasks
else:
import ./shims_pre_1_6/tasks
type
WorkerID = int32
TaskNode = ptr object
# Linked list of tasks
parent: TaskNode
task: Task
Signal = object
terminate {.align: 64.}: Atomic[bool]
WorkerContext = object
## Thread-local worker context
# Params
id: WorkerID
taskpool: Taskpool
# Tasks
taskDeque: ptr ChaseLevDeque[TaskNode] # owned task deque
currentTask: TaskNode
# Synchronization
eventNotifier: ptr EventNotifier # shared event notifier
signal: ptr Signal # owned signal
# Thefts
rng: Rand # RNG state to select victims
numThreads: int
otherDeques: ptr UncheckedArray[ChaseLevDeque[TaskNode]]
victims: SparseSet
Taskpool* = ptr object
barrier: SyncBarrier
## Barrier for initialization and teardown
eventNotifier: EventNotifier
## Puts thread to sleep
numThreads{.align: 64.}: int
workerDeques: ptr UncheckedArray[ChaseLevDeque[TaskNode]]
## Direct access for task stealing
workers: ptr UncheckedArray[Thread[(Taskpool, WorkerID)]]
workerSignals: ptr UncheckedArray[Signal]
## Access signaledTerminate
# Thread-local config
# ---------------------------------------------
var workerContext {.threadvar.}: WorkerContext
## Thread-local Worker context
proc setupWorker() =
## Initialize the thread-local context of a worker
## Requires the ID and taskpool fields to be initialized
template ctx: untyped = workerContext
preCondition: not ctx.taskpool.isNil()
preCondition: 0 <= ctx.id and ctx.id < ctx.taskpool.numThreads
preCondition: not ctx.taskpool.workerDeques.isNil()
preCondition: not ctx.taskpool.workerSignals.isNil()
# Thefts
ctx.rng = initRand(0xEFFACED + ctx.id)
ctx.numThreads = ctx.taskpool.numThreads
ctx.otherDeques = ctx.taskpool.workerDeques
ctx.victims.allocate(ctx.taskpool.numThreads)
# Synchronization
ctx.eventNotifier = addr ctx.taskpool.eventNotifier
ctx.signal = addr ctx.taskpool.workerSignals[ctx.id]
ctx.signal.terminate.store(false, moRelaxed)
# Tasks
ctx.taskDeque = addr ctx.taskpool.workerDeques[ctx.id]
ctx.currentTask = nil
# Init
ctx.taskDeque[].init()
proc teardownWorker() =
## Cleanup the thread-local context of a worker
template ctx: untyped = workerContext
ctx.taskDeque[].teardown()
ctx.victims.delete()
proc eventLoop(ctx: var WorkerContext) {.raises:[Exception].}
proc workerEntryFn(params: tuple[taskpool: Taskpool, id: WorkerID])
{.raises: [Exception].} =
## On the start of the threadpool workers will execute this
## until they receive a termination signal
# We assume that thread_local variables start all at their binary zero value
preCondition: workerContext == default(WorkerContext)
template ctx: untyped = workerContext
# If the following crashes, you need --tlsEmulation:off
ctx.id = params.id
ctx.taskpool = params.taskpool
setupWorker()
# 1 matching barrier in Taskpool.new() for root thread
discard params.taskpool.barrier.wait()
{.gcsafe.}: # Not GC-safe when multi-threaded due to thread-local variables
ctx.eventLoop()
debugTermination:
log(">>> Worker %2d shutting down <<<\n", ctx.id)
# 1 matching barrier in taskpool.shutdown() for root thread
discard params.taskpool.barrier.wait()
teardownWorker()
# Tasks
# ---------------------------------------------
proc new(T: type TaskNode, parent: TaskNode, task: sink Task): T =
type TaskNodeObj = typeof(default(T)[])
var tn = cast[TaskNode](c_calloc(1, csize_t sizeof(TaskNodeObj)))
tn.parent = parent
tn.task = task
return tn
proc runTask(tn: var TaskNode) {.raises:[Exception], inline.} =
## Run a task and consumes the taskNode
tn.task.invoke()
tn.c_free()
proc schedule(ctx: WorkerContext, tn: sink TaskNode) {.inline.} =
## Schedule a task in the taskpool
debug: log("Worker %2d: schedule task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, tn, tn.parent, ctx.currentTask)
ctx.taskDeque[].push(tn)
ctx.taskpool.eventNotifier.notify()
# Scheduler
# ---------------------------------------------
proc trySteal(ctx: var WorkerContext): TaskNode =
## Try to steal a task.
ctx.victims.refill()
ctx.victims.excl(ctx.id)
while not ctx.victims.isEmpty():
let target = ctx.victims.randomPick(ctx.rng)
let stolenTask = ctx.otherDeques[target].steal()
if not stolenTask.isNil:
return stolenTask
ctx.victims.excl(target)
return nil
proc eventLoop(ctx: var WorkerContext) {.raises:[Exception].} =
## Each worker thread executes this loop over and over.
while not ctx.signal.terminate.load(moRelaxed):
# 1. Pick from local deque
debug: log("Worker %2d: eventLoop 1 - searching task from local deque\n", ctx.id)
while (var taskNode = ctx.taskDeque[].pop(); not taskNode.isNil):
debug: log("Worker %2d: eventLoop 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask)
taskNode.runTask()
# 2. Run out of tasks, become a thief
debug: log("Worker %2d: eventLoop 2 - becoming a thief\n", ctx.id)
var stolenTask = ctx.trySteal()
if not stolenTask.isNil:
# 2.a Run task
debug: log("Worker %2d: eventLoop 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask)
stolenTask.runTask()
else:
# 2.b Park the thread until a new task enters the taskpool
debug: log("Worker %2d: eventLoop 2.b - sleeping\n", ctx.id)
ctx.eventNotifier[].park()
debug: log("Worker %2d: eventLoop 2.b - waking\n", ctx.id)
# Tasking
# ---------------------------------------------
const RootTask = default(Task) # TODO: sentinel value different from null task
template isRootTask(task: Task): bool =
task == RootTask
proc forceFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[Exception].} =
## Eagerly complete an awaited FlowVar
template ctx: untyped = workerContext
template isFutReady(): untyped =
fv.chan[].tryRecv(parentResult)
if isFutReady():
return
## 1. Process all the children of the current tasks.
## This ensures that we can give control back ASAP.
debug: log("Worker %2d: sync 1 - searching task from local deque\n", ctx.id)
while (var taskNode = ctx.taskDeque[].pop(); not taskNode.isNil):
if taskNode.parent != ctx.currentTask:
debug: log("Worker %2d: sync 1 - skipping non-direct descendant task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask)
ctx.schedule(taskNode)
break
debug: log("Worker %2d: sync 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask)
taskNode.runTask()
if isFutReady():
debug: log("Worker %2d: sync 1 - future ready, exiting\n", ctx.id)
return
## 2. We run out-of-tasks or out-of-direct-child of our current awaited task
## So the task is bottlenecked by dependencies in other threads,
## hence we abandon our enqueued work and steal in the others' queues
## in hope it advances our awaited task. This prioritizes latency over throughput.
debug: log("Worker %2d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x)\n", ctx.id, ctx.currentTask)
while not isFutReady():
var taskNode = ctx.trySteal()
if not taskNode.isNil:
# We stole some task, we hope we advance our awaited task
debug: log("Worker %2d: sync 2.1 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask)
taskNode.runTask()
# elif (taskNode = ctx.taskDeque[].pop(); not taskNode.isNil):
# # We advance our own queue, this increases throughput but may impact latency on the awaited task
# debug: log("Worker %2d: sync 2.2 - couldn't steal, running own task\n", ctx.id)
# taskNode.runTask()
else:
# We don't park as there is no notif for task completion
cpuRelax()
proc syncAll*(pool: Taskpool) {.raises: [Exception].} =
## Blocks until all pending tasks are completed
## This MUST only be called from
## the root scope that created the taskpool
template ctx: untyped = workerContext
debugTermination:
log(">>> Worker %2d enters barrier <<<\n", ctx.id)
preCondition: ctx.id == 0
preCondition: ctx.currentTask.task.isRootTask()
# Empty all tasks
var foreignThreadsParked = false
while not foreignThreadsParked:
# 1. Empty local tasks
debug: log("Worker %2d: syncAll 1 - searching task from local deque\n", ctx.id)
while (var taskNode = ctx.taskDeque[].pop(); not taskNode.isNil):
debug: log("Worker %2d: syncAll 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask)
taskNode.runTask()
if ctx.numThreads == 1 or foreignThreadsParked:
break
# 2. Help other threads
debug: log("Worker %2d: syncAll 2 - becoming a thief\n", ctx.id)
var taskNode = ctx.trySteal()
if not taskNode.isNil:
# 2.1 We stole some task
debug: log("Worker %2d: syncAll 2.1 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask)
taskNode.runTask()
else:
# 2.2 No task to steal
if pool.eventNotifier.getParked() == pool.numThreads - 1:
# 2.2.1 all threads besides the current are parked
debugTermination:
log("Worker %2d: syncAll 2.2.1 - termination, all other threads sleeping\n", ctx.id)
foreignThreadsParked = true
else:
# 2.2.2 We don't park as there is no notif for task completion
cpuRelax()
debugTermination:
log(">>> Worker %2d leaves barrier <<<\n", ctx.id)
# Runtime
# ---------------------------------------------
proc new*(T: type Taskpool, numThreads = countProcessors()): T {.raises: [Exception].} =
## Initialize a threadpool that manages `numThreads` threads.
## Default to the number of logical processors available.
var tp = cast[T](c_calloc(1, csize_t sizeof(default(Taskpool)[])))
tp.barrier.init(numThreads.int32)
tp.eventNotifier.initialize()
tp.numThreads = numThreads
tp.workerDeques = cast[ptr UncheckedArray[ChaseLevDeque[TaskNode]]](c_calloc(csize_t numThreads, csize_t sizeof ChaseLevDeque[TaskNode]))
tp.workers = cast[ptr UncheckedArray[Thread[(Taskpool, WorkerID)]]](c_calloc(csize_t numThreads, csize_t sizeof Thread[(Taskpool, WorkerID)]))
tp.workerSignals = cast[ptr UncheckedArray[Signal]](c_calloc(csize_t numThreads, csize_t sizeof Signal))
# Setup master thread
workerContext.id = 0
workerContext.taskpool = tp
when not(defined(cpp) and defined(vcc)):
# TODO: Nim casts between Windows Handles but that requires reinterpret cast for C++
pinToCpu(0)
# Start worker threads
for i in 1 ..< numThreads:
createThread(tp.workers[i], worker_entry_fn, (tp, WorkerID(i)))
# TODO: we might want to take into account Hyper-Threading (HT)
# and allow spawning tasks and pinning to cores that are not HT-siblings.
# This is important for memory-bound workloads (like copy, addition, ...)
# where both sibling cores will compete for L1 and L2 cache, effectively
# halving the memory bandwidth or worse, flushing what the other put in cache.
# Note that while 2x siblings is common, Xeon Phi has 4x Hyper-Threading.
when not(defined(cpp) and defined(vcc)):
# TODO: Nim casts between Windows Handles but that requires reinterpret cast for C++
pinToCpu(tp.workers[i], i)
# Root worker
setupWorker()
# Root task, this is a sentinel task that is never called.
workerContext.currentTask = TaskNode.new(
parent = nil,
task = default(Task) # TODO RootTask, somehow this uses `=copy`
)
# Wait for the child threads
discard tp.barrier.wait()
return tp
proc cleanup(tp: var TaskPool) {.raises: [OSError].} =
## Cleanup all resources allocated by the taskpool
preCondition: workerContext.currentTask.task.isRootTask()
for i in 1 ..< tp.numThreads:
joinThread(tp.workers[i])
tp.workerSignals.c_free()
tp.workers.c_free()
tp.workerDeques.c_free()
`=destroy`(tp.eventNotifier)
tp.barrier.delete()
tp.c_free()
proc shutdown*(tp: var TaskPool) {.raises:[Exception].} =
## Wait until all tasks are processed and then shutdown the taskpool
preCondition: workerContext.currentTask.task.isRootTask()
tp.syncAll()
# Signal termination to all threads
for i in 0 ..< tp.numThreads:
tp.workerSignals[i].terminate.store(true, moRelaxed)
let parked = tp.eventNotifier.getParked()
for i in 0 ..< parked:
tp.eventNotifier.notify()
# 1 matching barrier in worker_entry_fn
discard tp.barrier.wait()
teardownWorker()
tp.cleanup()
# Dealloc dummy task
workerContext.currentTask.c_free()
# Task parallelism
# ---------------------------------------------
{.pop.} # raises:[]
macro spawn*(tp: TaskPool, fnCall: typed): untyped =
## Spawns the input function call asynchronously, potentially on another thread of execution.
##
## If the function calls returns a result, spawn will wrap it in a Flowvar.
## You can use `sync` to block the current thread and extract the asynchronous result from the flowvar.
## You can use `isReady` to check if result is available and if subsequent
## `spawn` returns immediately.
##
## Tasks are processed approximately in Last-In-First-Out (LIFO) order
result = newStmtList()
let fn = fnCall[0]
let fnName = $fn
# Get the return type if any
let retType = fnCall[0].getImpl[3][0]
let needFuture = retType.kind != nnkEmpty
# Package in a task
let taskNode = ident("taskNode")
let task = ident("task")
if not needFuture:
result.add quote do:
let `task` = toTask(`fnCall`)
let `taskNode` = TaskNode.new(workerContext.currentTask, `task`)
schedule(workerContext, `taskNode`)
else:
# tasks have no return value.
# 1. We create a channel/flowvar to transmit the return value to awaiter/sync
# 2. We create a wrapper async_fn without return value that send the return value in the channel
# 3. We package that wrapper function in a task
# 1. Create the channel
let fut = ident("fut")
let futTy = nnkBracketExpr.newTree(
bindSym"FlowVar",
retType
)
result.add quote do:
let `fut` = newFlowVar(type `retType`)
# 2. Create a wrapper function that sends result to the channel
# TODO, upstream "getImpl" doesn't return the generic params
let genericParams = fn.getImpl()[2].replaceSymsByIdents()
let formalParams = fn.getImpl()[3].replaceSymsByIdents()
var asyncParams = nnkFormalParams.newTree(
newEmptyNode()
)
var fnCallIdents = nnkCall.newTree(
fnCall[0]
)
for i in 1 ..< formalParams.len:
let ident = formalParams[i].replaceSymsByIdents()
asyncParams.add ident
for j in 0 ..< ident.len - 2:
# Handle "a, b: int"
fnCallIdents.add ident[j]
let futFnParam = ident("fut")
asyncParams.add newIdentDefs(futFnParam, futTy)
let asyncBody = quote do:
# XXX: can't test that when the RootTask is default(Task) instead of a sentinel value
# preCondition: not isRootTask(workerContext.currentTask.task)
let res = `fnCallIdents`
readyWith(`futFnParam`, res)
let asyncFn = ident("taskpool_" & fnName)
result.add nnkProcDef.newTree(
asyncFn,
newEmptyNode(),
genericParams,
asyncParams,
nnkPragma.newTree(ident("nimcall")),
newEmptyNode(),
asyncBody
)
var asyncCall = newCall(asyncFn)
for i in 1 ..< fnCall.len:
asyncCall.add fnCall[i].replaceSymsByIdents()
asyncCall.add fut
result.add quote do:
let `task` = toTask(`asyncCall`)
let `taskNode` = TaskNode.new(workerContext.currentTask, `task`)
schedule(workerContext, `taskNode`)
# Return the future / flowvar
`fut`
# Wrap in a block for namespacing
result = nnkBlockStmt.newTree(newEmptyNode(), result)
echo result.toStrLit()