commit 216aabe629b9ecaab56c00a30b8e5ed7a4c988a9 Author: Mamy André-Ratsimbazafy Date: Mon Jun 28 16:47:06 2021 +0200 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..72edbab --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +nimcache/ + +# Executables shall be put in an ignored build/ directory +build/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..64cfd90 --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# Taskpools + +## API + +The API spec follows https://github.com/nim-lang/RFCs/issues/347#task-parallelism-api + +## Overview + +This implements a lightweight, energy-efficient, easily auditable multithreaded taskpools. + +This taskpools will be used in a highly security-sensitive blockchain application +targeted at resource-restricted devices hence desirable properties are: + +- Ease of auditing and maintenance. + - Formally verified synchronization primitives are highly-sought after. + - Otherwise primitives are implemented from papers or ported from proven codebases + that can serve as reference for auditors. +- Resource-efficient. Threads spindown to save power, low memory use. +- Decent performance and scalability. The workload to parallelize are cryptography-related + and require at least 1ms runtime per thread. + This means that only a simple scheduler is required. + +Non-goals: +- Supporting task priorities +- Being distributed +- Supporting GC-ed memory on Nim default GC (sequences and strings) +- Have async-awaitable tasks + +In particular compared to [Weave](https://github.com/mratsim/weave), here are the tradeoffs: +- Taskpools only provide spawn/sync (task parallelism).\ + There is no parallel for (data parallelism)\ + or precise in/out dependencies (dataflow parallelism). +- Weave can handle trillions of small tasks that require only 10µs per task. (Load Balancing overhead) +- Weave maintains an adaptive memory pool to reduce memory allocation overhead, + Taskpools allocations are as-needed. (Scheduler overhead) diff --git a/benchmarks/bouncing_producer_consumer/README.md b/benchmarks/bouncing_producer_consumer/README.md new file mode 100644 index 0000000..99abc9c --- /dev/null +++ b/benchmarks/bouncing_producer_consumer/README.md @@ -0,0 +1,11 @@ +# BPC (Bouncing Producer-Consumer) + +From [tasking-2.0](https://github.com/aprell/tasking-2.0) description + +> **BPC**, short for **B**ouncing **P**roducer-**C**onsumer benchmark, as far +> as I know, first described by [Dinan et al][1]. There are two types of +> tasks, producer and consumer tasks. Each producer task creates another +> producer task followed by *n* consumer tasks, until a certain depth *d* is +> reached. Consumer tasks run for *t* microseconds. The smaller the values of +> *n* and *t*, the harder it becomes to exploit the available parallelism. A +> solid contender for the most antagonistic microbenchmark. diff --git a/benchmarks/bouncing_producer_consumer/taskpool_bpc.nim b/benchmarks/bouncing_producer_consumer/taskpool_bpc.nim new file mode 100644 index 0000000..ee6ca1d --- /dev/null +++ b/benchmarks/bouncing_producer_consumer/taskpool_bpc.nim @@ -0,0 +1,156 @@ +import + # STD lib + os, strutils, system/ansi_c, cpuinfo, strformat, math, + # Library + ../../taskpools, + # bench + ../wtime, ../resources + +var + Depth: int32 # For example 10000 + NumTasksPerDepth: int32 # For example 9 + # The total number of tasks in the BPC benchmark is + # (NumTasksPerDepth + 1) * Depth + NumTasksTotal: int32 + TaskGranularity: int32 # in microseconds + PollInterval: float64 # in microseconds + + tp: Taskpool + +var global_poll_elapsed {.threadvar.}: float64 + +template dummy_cpt(): untyped = + # Dummy computation + # Calculate fib(30) iteratively + var + fib = 0 + f2 = 0 + f1 = 1 + for i in 2 .. 30: + fib = f1 + f2 + f2 = f1 + f1 = fib + +proc bpc_consume(usec: int32) = + + var pollElapsed = 0'f64 + + let start = wtime_usec() + let stop = usec.float64 + global_poll_elapsed = PollInterval + + while true: + var elapsed = wtime_usec() - start + elapsed -= pollElapsed + if elapsed >= stop: + break + + dummy_cpt() + + # if elapsed >= global_poll_elapsed: + # let pollStart = wtime_usec() + # loadBalance(Weave) + # pollElapsed += wtime_usec() - pollStart + # global_poll_elapsed += PollInterval + +proc bpc_consume_nopoll(usec: int32) = + + let start = wtime_usec() + let stop = usec.float64 + + while true: + var elapsed = wtime_usec() - start + if elapsed >= stop: + break + + dummy_cpt() + +proc bpc_produce(n, d: int32) = + if d > 0: + # Create producer task + tp.spawn bpc_produce(n, d-1) + else: + return + + # Followed by n consumer tasks + for i in 0 ..< n: + tp.spawn bpc_consume(TaskGranularity) + +proc main() = + Depth = 10000 + NumTasksPerDepth = 999 + TaskGranularity = 1 + + if paramCount() == 0: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " & + &"<# of tasks per depth: {NumTasksPerDepth}> " & + &"[task granularity (us): {TaskGranularity}] " & + &"[polling interval (us): task granularity]" + echo &"Running with default config Depth = {Depth}, NumTasksPerDepth = {NumTasksPerDepth}, granularity (us) = {TaskGranularity}, polling (us) = {PollInterval}" + if paramCount() >= 1: + Depth = paramStr(1).parseInt.int32 + if paramCount() >= 2: + NumTasksPerDepth = paramStr(2). parseInt.int32 + if paramCount() >= 3: + TaskGranularity = paramStr(3). parseInt.int32 + if paramCount() == 4: + PollInterval = paramStr(4).parseInt.float64 + else: + PollInterval = TaskGranularity.float64 + if paramCount() > 4: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " & + &"<# of tasks per depth: {NumTasksPerDepth}> " & + &"[task granularity (us): {TaskGranularity}] " & + &"[polling interval (us): task granularity]" + quit 1 + + NumTasksTotal = (NumTasksPerDepth + 1) * Depth + + var nthreads: int + if existsEnv"TASKPOOL_NUM_THREADS": + nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt() + else: + nthreads = countProcessors() + + tp = Taskpool.new(numThreads = nthreads) + + # measure overhead during tasking + var ru: Rusage + getrusage(RusageSelf, ru) + var + rss = ru.ru_maxrss + flt = ru.ru_minflt + + let start = wtime_msec() + + bpc_produce(NumTasksPerDepth, Depth) + tp.syncAll() + + let stop = wtime_msec() + + getrusage(RusageSelf, ru) + rss = ru.ru_maxrss - rss + flt = ru.ru_minflt - flt + + tp.shutdown() + + echo "--------------------------------------------------------------------------" + echo "Scheduler: Taskpool" + echo "Benchmark: BPC (Bouncing Producer-Consumer)" + echo "Threads: ", nthreads + echo "Time(ms) ", round(stop - start, 3) + echo "Max RSS (KB): ", ru.ru_maxrss + echo "Runtime RSS (KB): ", rss + echo "# of page faults: ", flt + echo "--------------------------------------------------------------------------" + echo "# of tasks: ", NumTasksTotal + echo "# of tasks/depth: ", NumTasksPerDepth + echo "Depth: ", Depth + echo "Task granularity (us): ", TaskGranularity + echo "Polling / manual load balancing interval (us): ", PollInterval + + quit 0 + +main() diff --git a/benchmarks/dfs/weave_dfs.nim b/benchmarks/dfs/weave_dfs.nim new file mode 100644 index 0000000..8bbca33 --- /dev/null +++ b/benchmarks/dfs/weave_dfs.nim @@ -0,0 +1,85 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + # Stdlib + system/ansi_c, strformat, os, strutils, cpuinfo, + # Weave + ../../weave +when not defined(windows): + # bench + import ../wtime + +proc dfs(depth, breadth: int): uint32 = + if depth == 0: + return 1 + + # We could use alloca to avoid heap allocation here + var sums = newSeq[Flowvar[uint32]](breadth) + + for i in 0 ..< breadth: + sums[i] = spawn dfs(depth - 1, breadth) + + for i in 0 ..< breadth: + result += sync(sums[i]) + +proc test(depth, breadth: int): uint32 = + result = sync spawn dfs(depth, breadth) + +proc main() = + + var + depth = 8 + breadth = 8 + answer: uint32 + nthreads: int + + if existsEnv"WEAVE_NUM_THREADS": + nthreads = getEnv"WEAVE_NUM_THREADS".parseInt() + else: + nthreads = countProcessors() + + if paramCount() == 0: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " + echo &"Running with default config depth = {depth} and breadth = {breadth}" + + if paramCount() >= 1: + depth = paramStr(1).parseInt() + if paramCount() == 2: + breadth = paramStr(2).parseInt() + if paramCount() > 2: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " + echo &"Up to 2 parameters are valid. Received {paramCount()}" + quit 1 + + # Staccato benches runtime init and exit as well + when not defined(windows): + let start = wtime_usec() + + init(Weave) + answer = test(depth, breadth) + exit(Weave) + + when not defined(windows): + let stop = wtime_usec() + + const lazy = defined(WV_LazyFlowvar) + const config = if lazy: " (lazy flowvars)" + else: " (eager flowvars)" + + echo "Scheduler: Weave", config + echo "Benchmark: dfs" + echo "Threads: ", nthreads + when not defined(windows): + echo "Time(us) ", stop - start + echo "Output: ", answer + + quit 0 + +main() diff --git a/benchmarks/heat/stdnim_heat.nim b/benchmarks/heat/stdnim_heat.nim new file mode 100644 index 0000000..50e1661 --- /dev/null +++ b/benchmarks/heat/stdnim_heat.nim @@ -0,0 +1,300 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# From fibril +# +# Original license +# +# /* +# * Heat diffusion (Jacobi-type iteration) +# * +# * Volker Strumpen, Boston August 1996 +# * +# * Copyright (c) 1996 Massachusetts Institute of Technology +# * +# * This program is free software; you can redistribute it and/or modify +# * it under the terms of the GNU General Public License as published by +# * the Free Software Foundation; either version 2 of the License, or +# * (at your option) any later version. +# * +# * This program is distributed in the hope that it will be useful, +# * but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# * GNU General Public License for more details. +# * +# * You should have received a copy of the GNU General Public License +# * along with this program; if not, write to the Free Software +# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# */ + +import + # Stdlib + strformat, os, strutils, math, system/ansi_c, + cpuinfo, threadpool, + # bench + ../wtime, ../resources + +# This deadlocks :/ + +# Helpers +# ------------------------------------------------------- + +# We need a thin wrapper around raw pointers for matrices, +# we can't pass "var seq[seq[float64]]" to other threads +# nor "var" for that matter +type + Matrix[T] = object + buffer: ptr UncheckedArray[T] + m, n: int + + Row[T] = object + buffer: ptr UncheckedArray[T] + len: int + +func newMatrix[T](m, n: int): Matrix[T] {.inline.} = + result.buffer = cast[ptr UncheckedArray[T]](c_malloc(csize_t m*n*sizeof(T))) + result.m = m + result.n = n + +template `[]`[T](mat: Matrix[T], row, col: Natural): T = + # row-major storage + assert row < mat.m + assert col < mat.n + mat.buffer[row * mat.n + col] + +template `[]=`[T](mat: Matrix[T], row, col: Natural, value: T) = + assert row < mat.m + assert col < mat.n + mat.buffer[row * mat.n + col] = value + +func getRow[T](mat: Matrix[T], rowIdx: Natural): Row[T] {.inline.} = + # row-major storage, there are n columns in between each rows + assert rowIdx < mat.m + result.buffer = cast[ptr UncheckedArray[T]](mat.buffer[rowIdx * mat.n].addr) + result.len = mat.m + +template `[]`[T](row: Row[T], idx: Natural): T = + assert idx < row.len + row.buffer[idx] + +template `[]=`[T](row: Row[T], idx: Natural, value: T) = + assert idx < row.len + row.buffer[idx] = value + +func delete[T](mat: sink Matrix[T]) = + c_free(mat.buffer) + +# And an auto converter for int32 -> float64 so we don't have to convert +# all i, j indices manually + +converter i32toF64(x: int32): float64 {.inline.} = + float64(x) + +# ------------------------------------------------------- + +template f(x, y: SomeFloat): SomeFloat = + sin(x) * sin(y) + +template randa[T: SomeFloat](x, t: T): T = + T(0.0) + +proc randb(x, t: SomeFloat): SomeFloat {.inline.} = + # proc instead of template to avoid Nim constant folding bug: + # https://github.com/nim-lang/Nim/issues/12783 + exp(-2 * t) * sin(x) + +template randc[T: SomeFloat](y, t: T): T = + T(0.0) + +proc randd(y, t: SomeFloat): SomeFloat {.inline.} = + # proc instead of template to avoid Nim constant folding bug: + # https://github.com/nim-lang/Nim/issues/12783 + exp(-2 * t) * sin(y) + +template solu(x, y, t: SomeFloat): SomeFloat = + exp(-2 * t) * sin(x) * sin(y) + +const n = 4096'i32 + +var + nx, ny, nt: int32 + xu, xo, yu, yo, tu, to: float64 + + dx, dy, dt: float64 + dtdxsq, dtdysq: float64 + + odd: Matrix[float64] + even: Matrix[float64] + +proc heat(m: Matrix[float64], il, iu: int32): bool {.discardable.}= + # TODO to allow awaiting `heat` we return a dummy bool + # The parallel spawns are updating the same matrix cells otherwise + if iu - il > 1: + let im = (il + iu) div 2 + + let h = spawn heat(m, il, im) + heat(m, im, iu) + discard ^h + return true + # ------------------------ + + let i = il + let row = m.getRow(i) + + if i == 0: + for j in 0 ..< ny: + row[j] = randc(yu + j*dy, 0) + elif i == nx - 1: + for j in 0 ..< ny: + row[j] = randd(yu + j*dy, 0) + else: + row[0] = randa(xu + i*dx, 0) + for j in 1 ..< ny - 1: + row[j] = f(xu + i*dx, yu + j*dy) + row[ny - 1] = randb(xu + i*dx, 0) + +proc diffuse(output: Matrix[float64], input: Matrix[float64], il, iu: int32, t: float64): bool {.discardable.} = + # TODO to allow awaiting `diffuse` we return a dummy bool + # The parallel spawns are updating the same matrix cells otherwise + if iu - il > 1: + let im = (il + iu) div 2 + + let d = spawn diffuse(output, input, il, im, t) + diffuse(output, input, im, iu, t) + discard ^d + return true + # ------------------------ + + let i = il + let row = output.getRow(i) + + if i == 0: + for j in 0 ..< ny: + row[j] = randc(yu + j*dy, t) + elif i == nx - 1: + for j in 0 ..< ny: + row[j] = randd(yu + j*dy, t) + else: + row[0] = randa(xu + i*dx, t) + for j in 1 ..< ny - 1: + row[j] = input[i, j] + # The use of nested sequences here is a bad idea ... + dtdysq * (input[i, j+1] - 2 * input[i, j] + input[i, j-1]) + + dtdxsq * (input[i+1, j] - 2 * input[i, j] + input[i-1, j]) + row[ny - 1] = randb(xu + i*dx, t) + +proc initTest() = + nx = n + ny = 1024 + nt = 100 + xu = 0.0 + xo = 1.570796326794896558 + yu = 0.0 + yo = 1.570796326794896558 + tu = 0.0 + to = 0.0000001 + + dx = (xo - xu) / float64(nx - 1) + dy = (yo - yu) / float64(ny - 1) + dt = (to - tu) / float64(nt) + + dtdxsq = dt / (dx * dx) + dtdysq = dt / (dy * dy) + + even = newMatrix[float64](nx, ny) + odd = newMatrix[float64](nx, ny) + +proc prep() = + heat(even, 0, nx) + +proc test() = + var t = tu + + for _ in countup(1, nt.int, 2): + # nt included + t += dt + diffuse(odd, even, 0, nx, t) + t += dt + diffuse(even, odd, 0, nx, t) + + if nt mod 2 != 0: + t += dt + diffuse(odd, even, 0, nx, t) + +proc verify() = + var + mat: Matrix[float64] + mae: float64 + mre: float64 + me: float64 + + mat = if nt mod 2 != 0: odd else: even + + for a in 0 ..< nx: + for b in 0 ..< ny: + var tmp = abs(mat[a, b] - solu(xu + a*dx, yu + b*dy, to)) + if tmp > 1e-3: + echo "nx: ", nx, " - ny: ", ny + echo "mat[", a, ", ", b, "] = ", mat[a, b], ", expected sol = ", solu(xu + a*dx, yu + b*dy, to) + quit 1 + + me += tmp + if tmp > mae: mae = tmp + if mat[a, b] != 0.0: tmp /= mat[a, b] + if tmp > mre: mre = tmp + + me /= nx * ny + + if mae > 1e-12: + echo &"Local maximal absolute error {mae:1.3e}" + quit 1 + if mre > 1e-12: + echo &"Local maximal relative error {mre:1.3e}" + quit 1 + if me > 1e-12: + echo &"Global mean absolute error {me:1.3e}" + quit 1 + + echo "Verification successful" + +proc main() = + var nthreads: int + nthreads = countProcessors() + + var ru: Rusage + getrusage(RusageSelf, ru) + var + rss = ru.ru_maxrss + flt = ru.ru_minflt + + initTest() + + prep() + let start = wtime_usec() + test() + let stop = wtime_usec() + + getrusage(RusageSelf, ru) + rss = ru.ru_maxrss - rss + flt = ru.ru_minflt - flt + + sync() + + verify() + delete(even) + delete(odd) + + echo "Scheduler: Nim threadpool (standard lib)" + echo "Benchmark: heat" + echo "Threads: ", nthreads + echo "Time(us) ", stop - start + echo "Max RSS (KB): ", ru.ru_maxrss + echo "Runtime RSS (KB): ", rss + echo "# of page faults: ", flt + + quit 0 + +main() diff --git a/benchmarks/heat/taskpool_heat.nim b/benchmarks/heat/taskpool_heat.nim new file mode 100644 index 0000000..8275bfe --- /dev/null +++ b/benchmarks/heat/taskpool_heat.nim @@ -0,0 +1,313 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# From fibril +# +# Original license +# +# /* +# * Heat diffusion (Jacobi-type iteration) +# * +# * Volker Strumpen, Boston August 1996 +# * +# * Copyright (c) 1996 Massachusetts Institute of Technology +# * +# * This program is free software; you can redistribute it and/or modify +# * it under the terms of the GNU General Public License as published by +# * the Free Software Foundation; either version 2 of the License, or +# * (at your option) any later version. +# * +# * This program is distributed in the hope that it will be useful, +# * but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# * GNU General Public License for more details. +# * +# * You should have received a copy of the GNU General Public License +# * along with this program; if not, write to the Free Software +# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# */ + +import + # Stdlib + strformat, os, strutils, math, system/ansi_c, + cpuinfo, + # Taskpools + ../../taskpools +when not defined(windows): + # bench + import ../wtime, ../resources + +# Helpers +# ------------------------------------------------------- + +# We need a thin wrapper around raw pointers for matrices, +# we can't pass "var seq[seq[float64]]" to other threads +# nor "var" for that matter +type + Matrix[T] = object + buffer: ptr UncheckedArray[T] + m, n: int + + Row[T] = object + buffer: ptr UncheckedArray[T] + len: int + +var tp: Taskpool + +func newMatrix[T](m, n: int): Matrix[T] {.inline.} = + result.buffer = cast[ptr UncheckedArray[T]](c_malloc(csize_t m*n*sizeof(T))) + result.m = m + result.n = n + +template `[]`[T](mat: Matrix[T], row, col: Natural): T = + # row-major storage + assert row < mat.m + assert col < mat.n + mat.buffer[row * mat.n + col] + +template `[]=`[T](mat: Matrix[T], row, col: Natural, value: T) = + assert row < mat.m + assert col < mat.n + mat.buffer[row * mat.n + col] = value + +func getRow[T](mat: Matrix[T], rowIdx: Natural): Row[T] {.inline.} = + # row-major storage, there are n columns in between each rows + assert rowIdx < mat.m + result.buffer = cast[ptr UncheckedArray[T]](mat.buffer[rowIdx * mat.n].addr) + result.len = mat.m + +template `[]`[T](row: Row[T], idx: Natural): T = + assert idx < row.len + row.buffer[idx] + +template `[]=`[T](row: Row[T], idx: Natural, value: T) = + assert idx < row.len + row.buffer[idx] = value + +func delete[T](mat: sink Matrix[T]) = + c_free(mat.buffer) + +# And an auto converter for int32 -> float64 so we don't have to convert +# all i, j indices manually + +converter i32toF64(x: int32): float64 {.inline.} = + float64(x) + +# ------------------------------------------------------- + +template f(x, y: SomeFloat): SomeFloat = + sin(x) * sin(y) + +template randa[T: SomeFloat](x, t: T): T = + T(0.0) + +proc randb(x, t: SomeFloat): SomeFloat {.inline.} = + # proc instead of template to avoid Nim constant folding bug: + # https://github.com/nim-lang/Nim/issues/12783 + exp(-2 * t) * sin(x) + +template randc[T: SomeFloat](y, t: T): T = + T(0.0) + +proc randd(y, t: SomeFloat): SomeFloat {.inline.} = + # proc instead of template to avoid Nim constant folding bug: + # https://github.com/nim-lang/Nim/issues/12783 + exp(-2 * t) * sin(y) + +template solu(x, y, t: SomeFloat): SomeFloat = + exp(-2 * t) * sin(x) * sin(y) + +const n = 4096'i32 + +var + nx, ny, nt: int32 + xu, xo, yu, yo, tu, to: float64 + + dx, dy, dt: float64 + dtdxsq, dtdysq: float64 + + odd: Matrix[float64] + even: Matrix[float64] + +proc heat(m: Matrix[float64], il, iu: int32): bool {.discardable.}= + # TODO to allow awaiting `heat` we return a dummy bool + # The parallel spawns are updating the same matrix cells otherwise + if iu - il > 1: + let im = (il + iu) div 2 + + let h = tp.spawn heat(m, il, im) + heat(m, im, iu) + discard sync(h) + return true + # ------------------------ + + let i = il + let row = m.getRow(i) + + if i == 0: + for j in 0 ..< ny: + row[j] = randc(yu + j*dy, 0) + elif i == nx - 1: + for j in 0 ..< ny: + row[j] = randd(yu + j*dy, 0) + else: + row[0] = randa(xu + i*dx, 0) + for j in 1 ..< ny - 1: + row[j] = f(xu + i*dx, yu + j*dy) + row[ny - 1] = randb(xu + i*dx, 0) + +proc diffuse(output: Matrix[float64], input: Matrix[float64], il, iu: int32, t: float64): bool {.discardable.} = + # TODO to allow awaiting `diffuse` we return a dummy bool + # The parallel spawns are updating the same matrix cells otherwise + if iu - il > 1: + let im = (il + iu) div 2 + + let d = tp.spawn diffuse(output, input, il, im, t) + diffuse(output, input, im, iu, t) + discard sync(d) + return true + # ------------------------ + + let i = il + let row = output.getRow(i) + + if i == 0: + for j in 0 ..< ny: + row[j] = randc(yu + j*dy, t) + elif i == nx - 1: + for j in 0 ..< ny: + row[j] = randd(yu + j*dy, t) + else: + row[0] = randa(xu + i*dx, t) + for j in 1 ..< ny - 1: + row[j] = input[i, j] + # The use of nested sequences here is a bad idea ... + dtdysq * (input[i, j+1] - 2 * input[i, j] + input[i, j-1]) + + dtdxsq * (input[i+1, j] - 2 * input[i, j] + input[i-1, j]) + row[ny - 1] = randb(xu + i*dx, t) + +proc initTest() = + nx = n + ny = 1024 + nt = 100 + xu = 0.0 + xo = 1.570796326794896558 + yu = 0.0 + yo = 1.570796326794896558 + tu = 0.0 + to = 0.0000001 + + dx = (xo - xu) / float64(nx - 1) + dy = (yo - yu) / float64(ny - 1) + dt = (to - tu) / float64(nt) + + dtdxsq = dt / (dx * dx) + dtdysq = dt / (dy * dy) + + even = newMatrix[float64](nx, ny) + odd = newMatrix[float64](nx, ny) + +proc prep() = + heat(even, 0, nx) + +proc test() = + var t = tu + + for _ in countup(1, nt.int, 2): + # nt included + t += dt + diffuse(odd, even, 0, nx, t) + t += dt + diffuse(even, odd, 0, nx, t) + + if nt mod 2 != 0: + t += dt + diffuse(odd, even, 0, nx, t) + +proc verify() = + var + mat: Matrix[float64] + mae: float64 + mre: float64 + me: float64 + + mat = if nt mod 2 != 0: odd else: even + + for a in 0 ..< nx: + for b in 0 ..< ny: + var tmp = abs(mat[a, b] - solu(xu + a*dx, yu + b*dy, to)) + if tmp > 1e-3: + echo "nx: ", nx, " - ny: ", ny + echo "mat[", a, ", ", b, "] = ", mat[a, b], ", expected sol = ", solu(xu + a*dx, yu + b*dy, to) + quit 1 + + me += tmp + if tmp > mae: mae = tmp + if mat[a, b] != 0.0: tmp /= mat[a, b] + if tmp > mre: mre = tmp + + me /= nx * ny + + if mae > 1e-12: + echo &"Local maximal absolute error {mae:1.3e}" + quit 1 + if mre > 1e-12: + echo &"Local maximal relative error {mre:1.3e}" + quit 1 + if me > 1e-12: + echo &"Global mean absolute error {me:1.3e}" + quit 1 + + echo "Verification successful" + +proc main() = + var nthreads: int + if existsEnv"TASKPOOL_NUM_THREADS": + nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt() + else: + nthreads = countProcessors() + + when not defined(windows): + var ru: Rusage + getrusage(RusageSelf, ru) + var + rss = ru.ru_maxrss + flt = ru.ru_minflt + + initTest() + + # Fibril initializes before benching + tp = Taskpool.new(numThreads = nthreads) + + prep() + when not defined(windows): + let start = wtime_usec() + test() + when not defined(windows): + let stop = wtime_usec() + + getrusage(RusageSelf, ru) + rss = ru.ru_maxrss - rss + flt = ru.ru_minflt - flt + + tp.shutdown() + + verify() + delete(even) + delete(odd) + + echo "Scheduler: Taskpools" + echo "Benchmark: heat" + echo "Threads: ", nthreads + when not defined(windows): + echo "Time(us) ", stop - start + echo "Max RSS (KB): ", ru.ru_maxrss + echo "Runtime RSS (KB): ", rss + echo "# of page faults: ", flt + + quit 0 + +main() diff --git a/benchmarks/matmul_cache_oblivious/README.md b/benchmarks/matmul_cache_oblivious/README.md new file mode 100644 index 0000000..dae7756 --- /dev/null +++ b/benchmarks/matmul_cache_oblivious/README.md @@ -0,0 +1,12 @@ +# Cache-Oblivious Matrix Multiplication + +From Staccato and Cilk + +https://bradley.csail.mit.edu/svn/repos/cilk/5.4.3/examples/matmul.cilk +See the paper ``Cache-Oblivious Algorithms'', by +Matteo Frigo, Charles E. Leiserson, Harald Prokop, and +Sridhar Ramachandran, FOCS 1999, for an explanation of +why this algorithm is good for caches. + +Note that the benchmarks output incorrect matrix traces +according to the check ... diff --git a/benchmarks/matmul_cache_oblivious/taskpool_matmul_co.nim b/benchmarks/matmul_cache_oblivious/taskpool_matmul_co.nim new file mode 100644 index 0000000..1eafd75 --- /dev/null +++ b/benchmarks/matmul_cache_oblivious/taskpool_matmul_co.nim @@ -0,0 +1,213 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# Rectangular matrix multiplication. +# +# Adapted from Cilk 5.4.3 example +# +# https://bradley.csail.mit.edu/svn/repos/cilk/5.4.3/examples/matmul.cilk; +# See the paper ``Cache-Oblivious Algorithms'', by +# Matteo Frigo, Charles E. Leiserson, Harald Prokop, and +# Sridhar Ramachandran, FOCS 1999, for an explanation of +# why this algorithm is good for caches. + +import + # Stdlib + strformat, os, strutils, math, system/ansi_c, + cpuinfo, + # Taskpool + ../../taskpools, + # bench + ../wtime, ../resources + +# Helpers +# ------------------------------------------------------- + +# We need a thin wrapper around raw pointers for matrices, +# we can't pass "var" to other threads +type + Matrix[T: SomeFloat] = object + buffer: ptr UncheckedArray[T] + ld: int + +var tp: Taskpool + +func newMatrixNxN[T](n: int): Matrix[T] {.inline.} = + result.buffer = cast[ptr UncheckedArray[T]](c_malloc(csize_t n*n*sizeof(T))) + result.ld = n + +template `[]`[T](mat: Matrix[T], row, col: Natural): T = + # row-major storage + assert row < mat.ld, $i & " < " & $mat.ld + assert col < mat.ld, $i & " < " & $mat.ld + mat.buffer[row * mat.ld + col] + +template `[]=`[T](mat: Matrix[T], row, col: Natural, value: T) = + assert row < mat.ld, $i & " < " & $mat.ld + assert col < mat.ld, $i & " < " & $mat.ld + mat.buffer[row * mat.ld + col] = value + +func stride*[T](mat: Matrix[T], row, col: Natural): Matrix[T]{.inline.}= + ## Returns a new view offset by the row and column stride + result.buffer = cast[ptr UncheckedArray[T]]( + addr mat.buffer[row*mat.ld + col] + ) + +func delete[T](mat: sink Matrix[T]) = + c_free(mat.buffer) + +# ------------------------------------------------------- + +proc xorshiftRand(): uint32 = + var x {.global.} = uint32(2463534242) + x = x xor (x shr 13) + x = x xor (x shl 17) + x = x xor (x shr 5) + return x + +func zero[T](A: Matrix[T]) = + # zeroing is not timed + zeroMem(A.buffer, A.ld * A.ld * sizeof(T)) + +proc fill[T](A: Matrix[T]) = + for i in 0 ..< A.ld: + for j in 0 ..< A.ld: + A[i, j] = T(xorshiftRand() mod A.ld.uint32) + +func maxError(A, B: Matrix): float64 = + assert A.ld == B.ld + for i in 0 ..< A.ld: + for j in 0 ..< A.ld: + var diff = (A[i, j] - B[i, j]) / A[i, j] + if diff < 0: + diff = -diff + if diff > result: + result = diff + +func check[T](A, B, C: Matrix[T], n: int): bool = + var + tr_C = 0.T + tr_AB = 0.T + for i in 0 ..< n: + for j in 0 ..< n: + tr_AB += A[i, j] * B[j, i] + tr_C += C[i, i] + + # Note, all benchmarks return false ‾\_(ツ)_/‾ + return abs(tr_AB - tr_C) < 1e-3 + +proc matmul[T](A, B, C: Matrix[T], m, n, p: int, add: bool): bool = + # The original bench passes around a ``ld`` parameter (leading dimension?), + # we store it in the matrices + # We return a dummy bool to allow waiting on the matmul + + # Threshold + if (m + n + p) <= 64: + if add: + for i in 0 ..< m: + for k in 0 ..< p: + var c = 0.T + for j in 0 ..< n: + c += A[i, j] * B[j, k] + C[i, k] += c + else: + for i in 0 ..< m: + for k in 0 ..< p: + var c = 0.T + for j in 0 ..< n: + c += A[i, j] * B[j, k] + C[i, k] = c + + return + + var h0, h1: FlowVar[bool] + ## Each half of the computation + + # matrix is larger than threshold + if m >= n and n >= p: + let m1 = m shr 1 # divide by 2 + h0 = tp.spawn matmul(A, B, C, m1, n, p, add) + h1 = tp.spawn matmul(A.stride(m1, 0), B, C.stride(m1, 0), m - m1, n, p, add) + elif n >= m and n >= p: + let n1 = n shr 1 # divide by 2 + h0 = tp.spawn matmul(A, B, C, m, n1, p, add) + h1 = tp.spawn matmul(A.stride(0, n1), B.stride(n1, 0), C, m, n - n1, p, add = true) + else: + let p1 = p shr 1 + h0 = tp.spawn matmul(A, B, C, m, n, p1, add) + h1 = tp.spawn matmul(A, B.stride(0, p1), C.stride(0, p1), m, n, p - p1, add) + + discard sync(h0) + discard sync(h1) + +proc main() = + echo "Warning the benchmark seems to not be correct." + var + n = 3000 + nthreads: int + + if existsEnv"TASKPOOL_NUM_THREADS": + nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt() + else: + nthreads = countProcessors() + + if paramCount() == 0: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " + echo &"Running with default config n = {n}" + elif paramCount() == 1: + n = paramStr(1).parseInt() + else: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " + echo &"Up to 1 parameter is valid. Received {paramCount()}" + quit 1 + + var A = newMatrixNxN[float32](n) + var B = newMatrixNxN[float32](n) + var C = newMatrixNxN[float32](n) + + fill(A) + fill(B) + zero(C) + + var ru: Rusage + getrusage(RusageSelf, ru) + var + rss = ru.ru_maxrss + flt = ru.ru_minflt + + # Staccato benches runtime init and exit as well + let start = wtime_msec() + + tp = Taskpool.new(numThreads = nthreads) + discard sync tp.spawn matmul(A, B, C, n, n, n, add = false) + tp.shutdown() + + let stop = wtime_msec() + + getrusage(RusageSelf, ru) + rss = ru.ru_maxrss - rss + flt = ru.ru_minflt - flt + + echo "Scheduler: Taskpool" + echo "Benchmark: Matrix Multiplication (cache oblivious)" + echo "Threads: ", nthreads + echo "Time(ms) ", stop - start + echo "Max RSS (KB): ", ru.ru_maxrss + echo "Runtime RSS (KB): ", rss + echo "# of page faults: ", flt + echo "Input: ", n + echo "Error: ", check(A, B, C, n) + + delete A + delete B + delete C + + quit 0 + +main() diff --git a/benchmarks/nqueens/stdnim_nqueens.nim b/benchmarks/nqueens/stdnim_nqueens.nim new file mode 100644 index 0000000..837169d --- /dev/null +++ b/benchmarks/nqueens/stdnim_nqueens.nim @@ -0,0 +1,187 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. +# +# Original code licenses +# ------------------------------------------------------------------------------------------------ +# +# /**********************************************************************************************/ +# /* This program is part of the Barcelona OpenMP Tasks Suite */ +# /* Copyright (C) 2009 Barcelona Supercomputing Center - Centro Nacional de Supercomputacion */ +# /* Copyright (C) 2009 Universitat Politecnica de Catalunya */ +# /* */ +# /* This program is free software; you can redistribute it and/or modify */ +# /* it under the terms of the GNU General Public License as published by */ +# /* the Free Software Foundation; either version 2 of the License, or */ +# /* (at your option) any later version. */ +# /* */ +# /* This program is distributed in the hope that it will be useful, */ +# /* but WITHOUT ANY WARRANTY; without even the implied warranty of */ +# /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ +# /* GNU General Public License for more details. */ +# /* */ +# /* You should have received a copy of the GNU General Public License */ +# /* along with this program; if not, write to the Free Software */ +# /* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +# /**********************************************************************************************/ +# +# /* +# * Original code from the Cilk project (by Keith Randall) +# * +# * Copyright (c) 2000 Massachusetts Institute of Technology +# * Copyright (c) 2000 Matteo Frigo +# */ + +import + # Stdlib + system/ansi_c, strformat, os, strutils, + threadpool, + # bench + ../wtime + +# This deadlocks :/ + +# Nim helpers +# ------------------------------------------------- + +when defined(windows): + proc alloca(size: csize): pointer {.header: "".} +else: + proc alloca(size: csize): pointer {.header: "".} + +template alloca*(T: typedesc): ptr T = + cast[ptr T](alloca(sizeof(T))) + +template alloca*(T: typedesc, len: Natural): ptr UncheckedArray[T] = + cast[ptr UncheckedArray[T]](alloca(sizeof(T) * len)) + +proc wv_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} = + when defined(WV_useNimAlloc): + cast[type result](createSharedU(T, len)) + else: + cast[type result](c_malloc(csize_t len*sizeof(T))) + +proc wv_free*[T: ptr](p: T) {.inline.} = + when defined(WV_useNimAlloc): + freeShared(p) + else: + c_free(p) + +# We assume that Nim zeroMem vs C memset +# and Nim copyMem vs C memcpy have no difference +# Nim does have extra checks to handle GC-ed types +# but they should be eliminated by the Nim compiler. + +# ------------------------------------------------- + +type CharArray = ptr UncheckedArray[char] + +var example_solution: ptr UncheckedArray[char] + +func isValid(n: int32, a: CharArray): bool = + ## `a` contains an array of `n` queen positions. + ## Returns true if none of the queens conflict and 0 otherwise. + + for i in 0'i32 ..< n: + let p = cast[int32](a[i]) + + for j in i+1 ..< n: + let q = cast[int32](a[j]) + if q == p or q == p - (j-i) or q == p + (j-i): + return false + return true + +proc nqueens_ser(n, j: int32, a: CharArray): int32 = + # Serial nqueens + if n == j: + # Good solution count it + if example_solution.isNil: + example_solution = wv_alloc(char, n) + copyMem(example_solution, a, n * sizeof(char)) + return 1 + + # Try each possible position for queen `j` + for i in 0 ..< n: + a[j] = cast[char](i) + if isValid(j+1, a): + result += nqueens_ser(n, j+1, a) + +proc nqueens_par(n, j: int32, a: CharArray): int32 = + + if n == j: + # Good solution, count it + return 1 + + var localCounts = alloca(Flowvar[int32], n) + zeroMem(localCounts, n * sizeof(Flowvar[int32])) + + # Try each position for queen `j` + for i in 0 ..< n: + var b = alloca(char, j+1) + copyMem(b, a, j * sizeof(char)) + b[j] = cast[char](i) + if isValid(j+1, b): + localCounts[i] = spawn nqueens_par(n, j+1, b) + + for i in 0 ..< n: + if not localCounts[i].isNil(): + result += ^localCounts[i] + +const solutions = [ + 1, + 0, + 0, + 2, + 10, # 5x5 + 4, + 10, + 92, # 8x8 + 352, + 724, # 10x10 + 2680, + 14200, + 73712, + 365596, + 2279184, # 15x15 + 14772512 +] + +proc verifyQueens(n, res: int32) = + if n > solutions.len: + echo &"Cannot verify result: {n} is out of range [1,{solutions.len}]" + return + + if res != solutions[n-1]: + echo &"N-Queens failure: {res} is different from expected {solutions[n-1]}" + +proc main() = + if paramCount() != 1: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " + quit 0 + + let n = paramStr(1).parseInt.int32 + + if n notin 1 .. solutions.len: + echo &"The number of queens N (on a NxN board) must be in the range [1, {solutions.len}]" + quit 1 + + + let start = wtime_msec() + let count = nqueens_par(n, 0, alloca(char, n)) + let stop = wtime_msec() + + verifyQueens(n, count) + + if not example_solution.isNil: + stdout.write("Example solution: ") + for i in 0 ..< n: + c_printf("%2d ", example_solution[i]) + stdout.write('\n') + + echo &"Elapsed wall time: {stop-start:2.4f} ms" + +main() diff --git a/benchmarks/nqueens/taskpool_nqueens.nim b/benchmarks/nqueens/taskpool_nqueens.nim new file mode 100644 index 0000000..1c55262 --- /dev/null +++ b/benchmarks/nqueens/taskpool_nqueens.nim @@ -0,0 +1,229 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. +# +# Original code licenses +# ------------------------------------------------------------------------------------------------ +# +# /**********************************************************************************************/ +# /* This program is part of the Barcelona OpenMP Tasks Suite */ +# /* Copyright (C) 2009 Barcelona Supercomputing Center - Centro Nacional de Supercomputacion */ +# /* Copyright (C) 2009 Universitat Politecnica de Catalunya */ +# /* */ +# /* This program is free software; you can redistribute it and/or modify */ +# /* it under the terms of the GNU General Public License as published by */ +# /* the Free Software Foundation; either version 2 of the License, or */ +# /* (at your option) any later version. */ +# /* */ +# /* This program is distributed in the hope that it will be useful, */ +# /* but WITHOUT ANY WARRANTY; without even the implied warranty of */ +# /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the */ +# /* GNU General Public License for more details. */ +# /* */ +# /* You should have received a copy of the GNU General Public License */ +# /* along with this program; if not, write to the Free Software */ +# /* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +# /**********************************************************************************************/ +# +# /* +# * Original code from the Cilk project (by Keith Randall) +# * +# * Copyright (c) 2000 Massachusetts Institute of Technology +# * Copyright (c) 2000 Matteo Frigo +# */ + +import + # Stdlib + system/ansi_c, strformat, os, strutils, cpuinfo, + # Taskpools + ../../taskpools + +when not defined(windows): + # bench + import ../wtime, ../resources + +# Nim helpers +# ------------------------------------------------- + +when defined(windows): + proc alloca(size: int): pointer {.header: "".} +else: + proc alloca(size: int): pointer {.header: "".} + +template alloca*(T: typedesc): ptr T = + cast[ptr T](alloca(sizeof(T))) + +template alloca*(T: typedesc, len: Natural): ptr UncheckedArray[T] = + cast[ptr UncheckedArray[T]](alloca(sizeof(T) * len)) + +proc tp_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} = + when defined(TP_useNimAlloc): + cast[type result](createSharedU(T, len)) + else: + cast[type result](c_malloc(csize_t len*sizeof(T))) + +proc tp_free*[T: ptr](p: T) {.inline.} = + when defined(TP_useNimAlloc): + freeShared(p) + else: + c_free(p) + +# We assume that Nim zeroMem vs C memset +# and Nim copyMem vs C memcpy have no difference +# Nim does have extra checks to handle GC-ed types +# but they should be eliminated by the Nim compiler. + +# ------------------------------------------------- + +type CharArray = ptr UncheckedArray[char] + +var tp: Taskpool +var example_solution: ptr UncheckedArray[char] + +func isValid(n: int32, a: CharArray): bool = + ## `a` contains an array of `n` queen positions. + ## Returns true if none of the queens conflict and 0 otherwise. + + for i in 0'i32 ..< n: + let p = cast[int32](a[i]) + + for j in i+1 ..< n: + let q = cast[int32](a[j]) + if q == p or q == p - (j-i) or q == p + (j-i): + return false + return true + +proc nqueens_ser(n, j: int32, a: CharArray): int32 = + # Serial nqueens + if n == j: + # Good solution count it + if example_solution.isNil: + example_solution = tp_alloc(char, n) + copyMem(example_solution, a, n * sizeof(char)) + return 1 + + # Try each possible position for queen `j` + for i in 0 ..< n: + a[j] = cast[char](i) + if isValid(j+1, a): + result += nqueens_ser(n, j+1, a) + +proc nqueens_par(n, j: int32, a: CharArray): int32 = + + if n == j: + # Good solution, count it + return 1 + + var localCounts = alloca(Flowvar[int32], n) + zeroMem(localCounts, n * sizeof(Flowvar[int32])) + + # Try each position for queen `j` + for i in 0 ..< n: + var b = alloca(char, j+1) + copyMem(b, a, j * sizeof(char)) + b[j] = cast[char](i) + if isValid(j+1, b): + localCounts[i] = tp.spawn nqueens_par(n, j+1, b) + + for i in 0 ..< n: + if localCounts[i].isSpawned(): + result += sync(localCounts[i]) + +const solutions = [ + 1, + 0, + 0, + 2, + 10, # 5x5 + 4, + 10, + 92, # 8x8 + 352, + 724, # 10x10 + 2680, + 14200, + 73712, + 365596, + 2279184, # 15x15 + 14772512 +] + +proc verifyQueens(n, res: int32) = + if n > solutions.len: + echo &"Cannot verify result: {n} is out of range [1,{solutions.len}]" + return + + if res != solutions[n-1]: + echo &"N-Queens failure: {res} is different from expected {solutions[n-1]}" + +proc main() = + var + n = 11'i32 + nthreads: int + + if existsEnv"TASKPOOL_NUM_THREADS": + nthreads = getEnv"TASKPOOL_NUM_THREADS".parseInt() + else: + nthreads = countProcessors() + + if paramCount() == 0: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} " + echo &"Running with default config N = {n}\n" + + if paramCount() >= 1: + n = paramStr(1).parseInt.int32 + + if n notin 1 .. solutions.len: + echo &"The number of queens N (on a NxN board) must be in the range [1, {solutions.len}]" + quit 1 + + when not defined(windows): + var ru: Rusage + getrusage(RusageSelf, ru) + var + rss = ru.ru_maxrss + flt = ru.ru_minflt + + tp = Taskpool.new(numThreads = nthreads) + + when not defined(windows): + let start = wtime_msec() + + let count = nqueens_par(n, 0, alloca(char, n)) + + when not defined(windows): + let stop = wtime_msec() + + when not defined(windows): + getrusage(RusageSelf, ru) + rss = ru.ru_maxrss - rss + flt = ru.ru_minflt - flt + + tp.shutdown() + + verifyQueens(n, count) + + if not example_solution.isNil: + stdout.write("Example solution: ") + for i in 0 ..< n: + c_printf("%2d ", example_solution[i]) + stdout.write('\n') + + echo "Scheduler: Taskpool" + echo "Benchmark: N-queens" + echo "Threads: ", nthreads + when not defined(windows): + echo "Time(us) ", stop - start + echo "Max RSS (KB): ", ru.ru_maxrss + echo "Runtime RSS (KB): ", rss + echo "# of page faults: ", flt + echo "Problem size: ", n,"x",n, " board with ",n, " queens" + echo "Solutions found: ", count + + quit 0 + +main() diff --git a/benchmarks/resources.nim b/benchmarks/resources.nim new file mode 100644 index 0000000..4441e47 --- /dev/null +++ b/benchmarks/resources.nim @@ -0,0 +1,24 @@ +type + Timeval {.importc: "timeval", header:"", bycopy.} = object + + Rusage* {.importc: "struct rusage", header:"", bycopy.} = object + ru_utime {.importc.}: Timeval + ru_stime {.importc.}: Timeval + ru_maxrss* {.importc.}: int32 # Maximum resident set size + # ... + ru_minflt* {.importc.}: int32 # page reclaims (soft page faults) + + RusageWho* {.size: sizeof(cint).} = enum + RusageChildren = -1 + RusageSelf = 0 + RusageThread = 1 + +when defined(debug): + var H_RUSAGE_SELF{.importc, header:"= stop: + break + + dummy_cpt() + + # if elapsed >= global_poll_elapsed: + # let pollStart = wtime_usec() + # loadBalance(Weave) + # pollElapsed += wtime_usec() - pollStart + # global_poll_elapsed += PollInterval + + # c_printf("Elapsed: %.2lfus\n", elapsed) + +proc spc_consume_nopoll(usec: int32) = + + let start = wtime_usec() + let stop = usec.float64 + + while true: + var elapsed = wtime_usec() - start + if elapsed >= stop: + break + + dummy_cpt() + + # c_printf("Elapsed: %.2lfus\n", elapsed) + +proc spc_produce(n: int32) = + for i in 0 ..< n: + tp.spawn spc_consume(TaskGranularity) + +proc spc_produce_seq(n: int32) = + for i in 0 ..< n: + spc_consume_no_poll(TaskGranularity) + +proc main() = + NumTasksTotal = 1000000 + TaskGranularity = 10 + PollInterval = 10 + + if paramCount() == 0: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} <# of tasks:{NumTasksTotal}> " & + &" " & + &"[polling interval (us): task granularity]" + echo &"Running with default config tasks = {NumTasksTotal}, granularity (us) = {TaskGranularity}, polling (us) = {PollInterval}" + if paramCount() >= 1: + NumTasksTotal = paramStr(1).parseInt.int32 + if paramCount() >= 2: + TaskGranularity = paramStr(2). parseInt.int32 + if paramCount() == 3: + PollInterval = paramStr(3).parseInt.float64 + else: + PollInterval = TaskGranularity.float64 + if paramCount() > 3: + let exeName = getAppFilename().extractFilename() + echo &"Usage: {exeName} <# of tasks:{NumTasksTotal}> " & + &" " & + &"[polling interval (us): task granularity]" + quit 1 + + var nthreads: int + if existsEnv"WEAVE_NUM_THREADS": + nthreads = getEnv"WEAVE_NUM_THREADS".parseInt() + else: + nthreads = countProcessors() + + tp = Taskpool.new(numThreads = nthreads) + + # measure overhead during tasking + var ru: Rusage + getrusage(RusageSelf, ru) + var + rss = ru.ru_maxrss + flt = ru.ru_minflt + + let start = wtime_msec() + + # spc_produce_seq(NumTasksTotal) + spc_produce(NumTasksTotal) + tp.syncAll() + + let stop = wtime_msec() + + getrusage(RusageSelf, ru) + rss = ru.ru_maxrss - rss + flt = ru.ru_minflt - flt + + tp.shutdown() + + echo "--------------------------------------------------------------------------" + echo "Scheduler: Taskpool" + echo "Benchmark: SPC (Single task Producer - multi Consumer)" + echo "Threads: ", nthreads + echo "Time(ms) ", round(stop - start, 3) + echo "Max RSS (KB): ", ru.ru_maxrss + echo "Runtime RSS (KB): ", rss + echo "# of page faults: ", flt + echo "--------------------------------------------------------------------------" + echo "# of tasks: ", NumTasksTotal + echo "Task granularity (us): ", TaskGranularity + echo "Polling / manual load balancing interval (us): ", PollInterval + + quit 0 + +main() diff --git a/benchmarks/wtime.h b/benchmarks/wtime.h new file mode 100644 index 0000000..b645135 --- /dev/null +++ b/benchmarks/wtime.h @@ -0,0 +1,53 @@ +#ifndef WTIME_H +#define WTIME_H + +#include +#include + +// Number of seconds since the Epoch +static inline double Wtime_sec(void) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec + tv.tv_usec / 1e6; +} + +// Number of milliseconds since the Epoch +static inline double Wtime_msec(void) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * 1e3 + tv.tv_usec / 1e3; +} + +// Number of microseconds since the Epoch +static inline double Wtime_usec(void) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * 1e6 + tv.tv_usec; +} + +// Read time stamp counter on x86 +static inline unsigned long long readtsc(void) +{ + unsigned int lo, hi; + // RDTSC copies contents of 64-bit TSC into EDX:EAX + asm volatile ("rdtsc" : "=a" (lo), "=d" (hi)); + return (unsigned long long)hi << 32 | lo; +} + +#define WTIME_unique_var_name_paste(id, n) id ## n +#define WTIME_unique_var_name(id, n) WTIME_unique_var_name_paste(id, n) +#define WTIME_unique_var(id) WTIME_unique_var_name(id, __LINE__) + +// Convenience macro for time measurement +#define WTIME(unit) \ + double WTIME_unique_var(_start_##unit##_) = Wtime_##unit##ec(); \ + int WTIME_unique_var(_i_) = 0; \ + for (; WTIME_unique_var(_i_) == 0 || \ + (printf("Elapsed wall time: %.2lf "#unit"\n", \ + Wtime_##unit##ec() - WTIME_unique_var(_start_##unit##_)), 0); \ + WTIME_unique_var(_i_)++) + +#endif // WTIME_H diff --git a/benchmarks/wtime.nim b/benchmarks/wtime.nim new file mode 100644 index 0000000..420a198 --- /dev/null +++ b/benchmarks/wtime.nim @@ -0,0 +1,10 @@ + +import strutils, os + +const cSourcesPath = currentSourcePath.rsplit(DirSep, 1)[0] +const cHeader = csourcesPath / "wtime.h" + +{.passC: "-I" & cSourcesPath .} + +proc wtime_usec*: float64 {.importc: "Wtime_usec", header: cHeader.} +proc wtime_msec*: float64 {.importc: "Wtime_msec", header: cHeader.} diff --git a/doc/README.md b/doc/README.md new file mode 100644 index 0000000..7fdcd58 --- /dev/null +++ b/doc/README.md @@ -0,0 +1,17 @@ +# Taskpools architecture + +Taskpools architecture is a simple threadpool with work-stealing to handle unbalanced workloads. + +## Architecture + +### Processing steps + +1. On a `spawn` expression, thread i packages the function call in a task. +2. It enqueues it in it's own dequeue. +3. It notify_one a condition variable that holds all sleeping threads. +4. The notified thread wakes up and +5. The notified thread randomly tries to steal a task in a worker. +6. If no tasks are found, it goes back to sleep. +7. Otherwise it runs the task. +8. On a `sync` statement, it runs task in its own task dequeue or steal a task from another worker. +9. Once the `sync` task is ready, it can run the following statements (continuation). diff --git a/examples/e01_simple_tasks.nim b/examples/e01_simple_tasks.nim new file mode 100644 index 0000000..ffacb22 --- /dev/null +++ b/examples/e01_simple_tasks.nim @@ -0,0 +1,43 @@ +import ../taskpools/taskpools +import std/macros + +block: # Async without result + + proc display_int(x: int) = + stdout.write(x) + stdout.write(" - SUCCESS\n") + + proc main() = + echo "\nSanity check 1: Printing 123456 654321 in parallel" + + var tp = Taskpool.new(numThreads = 4) + tp.spawn display_int(123456) + tp.spawn display_int(654321) + tp.shutdown() + + main() + +block: # Async/Await + + var tp: Taskpool + + + proc async_fib(n: int): int = + if n < 2: + return n + + let x = tp.spawn async_fib(n-1) + let y = async_fib(n-2) + + result = sync(x) + y + + proc main2() = + echo "\nSanity check 2: fib(20)" + + tp = Taskpool.new() + let f = async_fib(20) + tp.shutdown() + + doAssert f == 6765 + + main2() diff --git a/papers/Chase-Lev - Dynamic Circular Work-Stealing Deque.pdf b/papers/Chase-Lev - Dynamic Circular Work-Stealing Deque.pdf new file mode 100644 index 0000000..2945142 Binary files /dev/null and b/papers/Chase-Lev - Dynamic Circular Work-Stealing Deque.pdf differ diff --git a/papers/Nhat Minh Le et al - Correct and Efficient Work-Stealing for Weak Memory Models.pdf b/papers/Nhat Minh Le et al - Correct and Efficient Work-Stealing for Weak Memory Models.pdf new file mode 100644 index 0000000..a1cf767 Binary files /dev/null and b/papers/Nhat Minh Le et al - Correct and Efficient Work-Stealing for Weak Memory Models.pdf differ diff --git a/taskpools.nim b/taskpools.nim new file mode 100644 index 0000000..11976ff --- /dev/null +++ b/taskpools.nim @@ -0,0 +1,9 @@ +# Nim-Taskpools +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import taskpools/taskpools +export taskpools diff --git a/taskpools/ast_utils.nim b/taskpools/ast_utils.nim new file mode 100644 index 0000000..b177741 --- /dev/null +++ b/taskpools/ast_utils.nim @@ -0,0 +1,33 @@ +# Nim-Taskpools +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import macros + +template letsGoDeeper = + var rTree = node.kind.newTree() + for child in node: + rTree.add inspect(child) + return rTree + +proc replaceSymsByIdents*(ast: NimNode): NimNode = + proc inspect(node: NimNode): NimNode = + case node.kind: + of {nnkIdent, nnkSym}: + return ident($node) + of nnkEmpty: + return node + of nnkLiterals: + return node + of nnkHiddenStdConv: + if node[1].kind == nnkIntLit: + return node[1] + else: + expectKind(node[1], nnkSym) + return ident($node[1]) + else: + letsGoDeeper() + result = inspect(ast) diff --git a/taskpools/channels_spsc_single.nim b/taskpools/channels_spsc_single.nim new file mode 100644 index 0000000..2c98caa --- /dev/null +++ b/taskpools/channels_spsc_single.nim @@ -0,0 +1,178 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + std/atomics, + ./instrumentation/[contracts, loggers] + +type + ChannelSPSCSingle* = object + ## A type-erased SPSC channel. + ## + ## Wait-free bounded single-producer single-consumer channel + ## that can only buffer a single item + ## Properties: + ## - wait-free + ## - supports weak memory models + ## - buffers a single item + ## - Padded to avoid false sharing in collections + ## - No extra indirection to access the item, the buffer is inline the channel + ## - Linearizable + ## - Default usable size is 254 bytes (WV_MemBlockSize - 2). + ## If used in an intrusive manner, it's 126 bytes due to the default 128 bytes padding. + ## + ## The channel should be the last field of an object if used in an intrusive manner + ## + ## Motivations for type erasure + ## - when LazyFlowvar needs to be converted + ## from stack-allocated memory to heap to extended their lifetime + ## we have no type information at all as the whole runtime + ## and especially tasks does not retain it. + ## + ## - When a task depends on a future that was generated from lazy loop-splitting + ## we don't have type information either. + ## + ## - An extra benefit is probably easier embedding, or calling + ## from C or JIT code. + full{.align: 64.}: Atomic[bool] + itemSize*: uint8 + buffer*{.align: 8.}: UncheckedArray[byte] + +proc `=`( + dest: var ChannelSPSCSingle, + source: ChannelSPSCSingle + ) {.error: "A channel cannot be copied".} + +proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} = + ## If ChannelSPSCSingle is used intrusive another data structure + ## be aware that it should be the last part due to ending by UncheckedArray + preCondition: itemsize.int in 0 .. int high(uint8) + + chan.itemSize = uint8 itemsize + chan.full.store(false, moRelaxed) + +func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} = + not chan.full.load(moAcquire) + +func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} = + ## Try receiving the item buffered in the channel + ## Returns true if successful (channel was not empty) + ## + ## ⚠ Use only in the consumer thread that reads from the channel. + preCondition: (sizeof(T) == chan.itemsize.int) or + # Support dummy object + (sizeof(T) == 0 and chan.itemsize == 1) + + let full = chan.full.load(moAcquire) + if not full: + return false + dst = cast[ptr T](chan.buffer.addr)[] + chan.full.store(false, moRelease) + return true + +func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} = + ## Try sending an item into the channel + ## Reurns true if successful (channel was empty) + ## + ## ⚠ Use only in the producer thread that writes from the channel. + preCondition: (sizeof(T) == chan.itemsize.int) or + # Support dummy object + (sizeof(T) == 0 and chan.itemsize == 1) + + let full = chan.full.load(moAcquire) + if full: + return false + cast[ptr T](chan.buffer.addr)[] = src + chan.full.store(true, moRelease) + return true + +# Sanity checks +# ------------------------------------------------------------------------------ +when isMainModule: + import ../memory/memory_pools + + when not compileOption("threads"): + {.error: "This requires --threads:on compilation flag".} + + template sendLoop[T](chan: var ChannelSPSCSingle, + data: sink T, + body: untyped): untyped = + while not chan.trySend(data): + body + + template recvLoop[T](chan: var ChannelSPSCSingle, + data: var T, + body: untyped): untyped = + while not chan.tryRecv(data): + body + + type + ThreadArgs = object + ID: WorkerKind + chan: ptr ChannelSPSCSingle + + WorkerKind = enum + Sender + Receiver + + template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} = + if args.ID == id: + body + + proc thread_func(args: ThreadArgs) = + + # Worker RECEIVER: + # --------- + # <- chan + # <- chan + # <- chan + # + # Worker SENDER: + # --------- + # chan <- 42 + # chan <- 53 + # chan <- 64 + Worker(Receiver): + var val: int + for j in 0 ..< 10: + args.chan[].recvLoop(val): + # Busy loop, in prod we might want to yield the core/thread timeslice + discard + echo " Receiver got: ", val + doAssert val == 42 + j*11 + + Worker(Sender): + doAssert args.chan.full.load(moRelaxed) == false + for j in 0 ..< 10: + let val = 42 + j*11 + args.chan[].sendLoop(val): + # Busy loop, in prod we might want to yield the core/thread timeslice + discard + echo "Sender sent: ", val + + proc main() = + echo "Testing if 2 threads can send data" + echo "-----------------------------------" + var threads: array[2, Thread[ThreadArgs]] + var pool: TLPoolAllocator + pool.initialize() + + var chan = pool.borrow(ChannelSPSCSingle) + chan[].initialize(itemSize = sizeof(int)) + + createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan)) + createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan)) + + joinThread(threads[0]) + joinThread(threads[1]) + + recycle(chan) + + echo "-----------------------------------" + echo "Success" + + main() diff --git a/taskpools/chase_lev_deques.nim b/taskpools/chase_lev_deques.nim new file mode 100644 index 0000000..e20d3f0 --- /dev/null +++ b/taskpools/chase_lev_deques.nim @@ -0,0 +1,181 @@ +# Nim-Taskpools +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# chase_lev_deques.nim +# -------------------- +# This file implements a Chase-Lev deque +# This is a single-consumer multi-consumer concurrent queue +# for work-stealing schedulers. +# +# Papers: +# - Dynamic Circular Work-Stealing Deque +# David Chase, Yossi Lev, 1993 +# https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf +# +# - Correct and Efficient Work-Stealing for Weak Memory Models +# Nhat Minh Lê, Antoniu Pop, Albert Cohen, Francesco Zappa Nardelli, 2013 +# https://fzn.fr/readings/ppopp13.pdf +# +# We straight translate the second paper which includes formal proofs of correctness, +# and uses modern C++11 code. +# +# A Chase-lev dequeue implements the following push, pop, steal. +# +# top bottom +# --------------------------------- +# | | | | <- push() +# steal() <- | Task 0 | Task 1 | Task 2 | -> pop() +# any thread | | | | owner-only +# --------------------------------- +# +# To reduce contention, stealing is done on the opposite end from push/pop +# so that there is a race only for the very last task. + +{.push raises: [].} + +import + system/ansi_c, + std/[locks, typetraits, atomics], + ./instrumentation/[contracts, loggers] + +type + Buf[T] = object + ## Backend buffer of a ChaseLevDeque + ## `capacity` MUST be a power of 2 + capacity: int + mask: int # == capacity-1 implies (i and mask) == (i mod capacity) + rawBuffer: UncheckedArray[Atomic[T]] + + ChaseLevDeque*[T] = object + ## This implements a lock-free, growable, work-stealing deque. + ## The owning thread enqueues and dequeues at the bottom + ## Foreign threads steal at the top. + ## + ## Default queue size is 8 + ## Queue can grow to handle up to 34 359 738 368 tasks in flights + ## TODO: + ## with --gc:arc / --gc:orc, use a seq instead of a fixed max size. + top {.align: 64.}: Atomic[int] + bottom: Atomic[int] + buf: Atomic[ptr Buf[T]] + garbage: array[32, ptr Buf[T]] # up to 34 359 738 368 sized buffer + garbageUsed: uint8 + +func isPowerOfTwo(n: int): bool {.inline.} = + (n and (n - 1)) == 0 and (n != 0) + +proc newBuf(T: typedesc, capacity: int): ptr Buf[T] = + # Tasks have a destructor + # static: + # doAssert supportsCopyMem(T), $T & " must be a (POD) plain-old-data type: no seq, string, ref." + + preCondition: capacity.isPowerOfTwo() + + result = cast[ptr Buf[T]]( + c_malloc(csize_t 2*sizeof(int) + sizeof(T)*capacity) + ) + + result.capacity = capacity + result.mask = capacity - 1 + result.rawBuffer.addr.zeroMem(sizeof(T)*capacity) + +proc `[]=`[T](buf: var Buf[T], index: int, item: T) {.inline.} = + buf.rawBuffer[index and buf.mask].store(item, moRelaxed) + +proc `[]`[T](buf: var Buf[T], index: int): T {.inline.} = + result = buf.rawBuffer[index and buf.mask].load(moRelaxed) + +proc grow[T](deque: var ChaseLevDeque[T], buf: var ptr Buf[T], top, bottom: int) {.inline.} = + ## Double the buffer size + ## bottom is the last item index + ## + ## To handle race-conditions the current "top", "bottom" and "buf" + ## have to be saved before calling this procedure. + ## It reads and writes the "deque.buf", "deque.garbage" and "deque.garbageUsed" + + # Read -> Copy -> Update + var tmp = newBuf(T, buf.capacity*2) + for i in top ..< bottom: + tmp[][i] = buf[][i] + + # This requires 68+ billions tasks in flight (per-thread) + ascertain: deque.garbageUsed.int < deque.garbage.len + + deque.garbage[deque.garbageUsed] = buf + swap(buf, tmp) + deque.buf.store(buf, moRelaxed) + +# Public API +# --------------------------------------------------- + +proc init*[T](deque: var ChaseLevDeque[T]) = + ## Initializes a new Chase-lev work-stealing deque. + deque.reset() + deque.buf.store(newBuf(T, 8), moRelaxed) + +proc teardown*[T](deque: var ChaseLevDeque[T]) = + ## Teardown a Chase-lev work-stealing deque. + for i in 0 ..< deque.garbageUsed.int: + c_free(deque.garbage[i]) + c_free(deque.buf.load(moRelaxed)) + +proc push*[T](deque: var ChaseLevDeque[T], item: T) = + ## Enqueue an item at the bottom + ## The item should not be used afterwards. + + let # Handle race conditions + b = deque.bottom.load(moRelaxed) + t = deque.top.load(moAcquire) + var a = deque.buf.load(moRelaxed) + + if b-t > a.capacity - 1: + # Full queue + deque.grow(a, t, b) + + a[][b] = item + fence(moRelease) + deque.bottom.store(b+1, moRelaxed) + +proc pop*[T](deque: var ChaseLevDeque[T]): T = + ## Deque an item at the bottom + + let # Handle race conditions + b = deque.bottom.load(moRelaxed) - 1 + a = deque.buf.load(moRelaxed) + + deque.bottom.store(b, moRelaxed) + fence(moSequentiallyConsistent) + var t = deque.top.load(moRelaxed) + + if t <= b: + # Non-empty queue. + result = a[][b] + if t == b: + # Single last element in queue. + if not compare_exchange(deque.top, t, t+1, moSequentiallyConsistent, moRelaxed): + # Failed race. + result = default(T) + deque.bottom.store(b+1, moRelaxed) + else: + # Empty queue. + result = default(T) + deque.bottom.store(b+1, moRelaxed) + +proc steal*[T](deque: var ChaseLevDeque[T]): T = + ## Deque an item at the top + var t = deque.top.load(moAcquire) + fence(moSequentiallyConsistent) + let b = deque.bottom.load(moAcquire) + result = default(T) + + if t < b: + # Non-empty queue. + let a = deque.buf.load(moConsume) + result = a[][t] + if not compare_exchange(deque.top, t, t+1, moSequentiallyConsistent, moRelaxed): + # Failed race. + return default(T) diff --git a/taskpools/event_notifiers.nim b/taskpools/event_notifiers.nim new file mode 100644 index 0000000..cff64b2 --- /dev/null +++ b/taskpools/event_notifiers.nim @@ -0,0 +1,82 @@ +# Nim-Taskpools +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# event_notifier.nim +# ------------------ +# This file implements an event notifier. +# It allows putting idle threads to sleep or waking them up. + +# Design +# Currently it is a shared lock + condition variable (a.k.a. a semaphore) +# +# In the future an eventcount might be considered, an event count significantly +# reduces scheduler overhead by removing lock acquisition from critical path. +# See overview and implementations at +# https://gist.github.com/mratsim/04a29bdd98d6295acda4d0677c4d0041 +# +# Weave "one event-notifier per thread" further reduces overhead +# but requires the threadpool to be message-passing based. +# https://github.com/mratsim/weave/blob/a230cce98a8524b2680011e496ec17de3c1039f2/weave/cross_thread_com/event_notifiers.nim + +import + std/locks, + ./instrumentation/contracts + +type + EventNotifier* = object + ## This data structure allows threads to be parked when no events are pending + ## and woken up when a new event is. + # Lock must be aligned to a cache-line to avoid false-sharing. + lock{.align: 64.}: Lock + cond: Cond + parked: int + signals: int + +func initialize*(en: var EventNotifier) {.inline.} = + ## Initialize the event notifier + en.lock.initLock() + en.cond.initCond() + en.parked = 0 + en.signals = 0 + +func `=destroy`*(en: var EventNotifier) {.inline.} = + en.cond.deinitCond() + en.lock.deinitLock() + +func `=`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be copied".} +func `=sink`*(dst: var EventNotifier, src: EventNotifier) {.error: "An event notifier cannot be moved".} + +proc park*(en: var EventNotifier) {.inline.} = + ## Wait until we are signaled of an event + ## Thread is parked and does not consume CPU resources + en.lock.acquire() + preCondition: en.signals == 0 + + en.parked += 1 + while en.signals == 0: # handle spurious wakeups + en.cond.wait(en.lock) + en.parked -= 1 + en.signals -= 1 + + postCondition: en.signals >= 0 + en.lock.release() + +proc notify*(en: var EventNotifier) {.inline.} = + ## Unpark a thread if any is available + en.lock.acquire() + + if en.parked > 0: + en.signals += 1 + en.cond.signal() + + en.lock.release() + +proc getParked*(en: var EventNotifier): int {.inline.} = + ## Get the number of parked thread + en.lock.acquire() + result = en.parked + en.lock.release() diff --git a/taskpools/flowvars.nim b/taskpools/flowvars.nim new file mode 100644 index 0000000..7194f28 --- /dev/null +++ b/taskpools/flowvars.nim @@ -0,0 +1,71 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + ./channels_spsc_single, + system/ansi_c, + ./instrumentation/contracts, + std/os + +{.push gcsafe.} + +type + Flowvar*[T] = object + ## A Flowvar is a placeholder for a future result that may be computed in parallel + # Flowvar are optimized when containing a ptr type. + # They take less size in memory by testing isNil + # instead of having an extra atomic bool + # They also use type-erasure to avoid having duplicate code + # due to generic monomorphization. + chan: ptr ChannelSPSCSingle + +# proc `=copy`*[T](dst: var Flowvar[T], src: Flowvar[T]) {.error: "Futures/Flowvars cannot be copied".} +# +# Unfortunately we cannot prevent this easily as internally +# we need a copy: +# - nim-taskpools level when doing toTask(fnCall(args, fut)) and then returning fut. (Can be worked around with copyMem) +# - in std/tasks (need upstream workaround) + +proc newFlowVar*(T: typedesc): Flowvar[T] {.inline.} = + let size = 2 + sizeof(T) # full flag + item size + buffer + result.chan = cast[ptr ChannelSPSCSingle](c_calloc(1, csize_t size)) + result.chan[].initialize(sizeof(T)) + +proc cleanup(fv: sink Flowvar) {.inline.} = + if not fv.chan.isNil: + c_free(fv.chan) + +func isSpawned*(fv: Flowvar): bool {.inline.} = + ## Returns true if a flowvar is spawned + ## This may be useful for recursive algorithms that + ## may or may not spawn a flowvar depending on a condition. + ## This is similar to Option or Maybe types + return not fv.chan.isNil + +proc readyWith*[T](fv: Flowvar[T], childResult: T) {.inline.} = + ## Send the Flowvar result from the child thread processing the task + ## to its parent thread. + let resultSent {.used.} = fv.chan[].trySend(childResult) + postCondition: resultSent + +template tryComplete*[T](fv: Flowvar, parentResult: var T): bool = + fv.chan[].tryRecv(parentResult) + +func isReady*[T](fv: Flowvar[T]): bool {.inline.} = + ## Returns true if the result of a Flowvar is ready. + ## In that case `sync` will not block. + ## Otherwise the current will block to help on all the pending tasks + ## until the Flowvar is ready. + not fv.chan[].isEmpty() + +proc sync*[T](fv: sink Flowvar[T]): T {.inline, gcsafe.} = + ## Blocks the current thread until the flowvar is available + ## and returned. + ## The thread is not idle and will complete pending tasks. + mixin forceFuture + forceFuture(fv, result) + cleanup(fv) diff --git a/taskpools/instrumentation/contracts.nim b/taskpools/instrumentation/contracts.nim new file mode 100644 index 0000000..c6fa84a --- /dev/null +++ b/taskpools/instrumentation/contracts.nim @@ -0,0 +1,113 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import macros, os, strutils + +{.used.} + +# A simple design-by-contract API +# ---------------------------------------------------------------------------------- + +# Everything should be a template that doesn't produce any code +# when WV_Asserts is not defined. +# Those checks are controlled by a custom flag instead of +# "--boundsChecks" or "--nilChecks" to decouple them from user code checks. +# Furthermore, we want them to be very lightweight on performance + +# TODO auto-add documentation + +proc inspectInfix(node: NimNode): NimNode = + ## Inspect an expression, + ## Returns the AST as string with runtime values inlined + ## from infix operators inlined. + # TODO: pointer and custom type need a default repr + # otherwise we can only resulve simple expressions + proc inspect(node: NimNode): NimNode = + case node.kind: + of nnkInfix: + return newCall( + bindSym"&", + newCall( + bindSym"&", + newCall(ident"$", inspect(node[1])), + newLit(" " & $node[0] & " ") + ), + newCall(ident"$", inspect(node[2])) + ) + of {nnkIdent, nnkSym}: + return node + of nnkDotExpr: + return quote do: + when `node` is pointer or + `node` is ptr or + `node` is (proc): + toHex(cast[ByteAddress](`node`) and 0xffff_ffff) + else: + $(`node`) + of nnkPar: + result = nnkPar.newTree() + for sub in node: + result.add inspect(sub) + else: + return node.toStrLit() + return inspect(node) + +macro assertContract( + checkName: static string, + predicate: untyped) = + let lineinfo = lineinfoObj(predicate) + let file = extractFilename(lineinfo.filename) + + var strippedPredicate: NimNode + if predicate.kind == nnkStmtList: + assert predicate.len == 1, "Only one-liner conditions are supported" + strippedPredicate = predicate[0] + else: + strippedPredicate = predicate + + let debug = "\n Contract violated for " & checkName & " at " & file & ":" & $lineinfo.line & + "\n " & $strippedPredicate.toStrLit & + "\n The following values are contrary to expectations:" & + "\n " + let values = inspectInfix(strippedPredicate) + let myID = quote do: + when declared(myID): + $myID() + else: + "N/A" + + result = quote do: + {.noSideEffect.}: + when compileOption("assertions"): + assert(`predicate`, `debug` & $`values` & " [Worker " & `myID` & "]\n") + elif defined(WV_Asserts): + if unlikely(not(`predicate`)): + raise newException(AssertionError, `debug` & $`values` & '\n') + +# A way way to get the caller function would be nice. + +template preCondition*(require: untyped) = + ## Optional runtime check before returning from a function + assertContract("pre-condition", require) + +template postCondition*(ensure: untyped) = + ## Optional runtime check at the start of a function + assertContract("post-condition", ensure) + +template ascertain*(check: untyped) = + ## Optional runtime check in the middle of processing + assertContract("transient condition", check) + +# Sanity checks +# ---------------------------------------------------------------------------------- + +when isMainModule: + proc assertGreater(x, y: int) = + postcondition(x > y) + + # We should get a nicely formatted exception + assertGreater(10, 12) diff --git a/taskpools/instrumentation/loggers.nim b/taskpools/instrumentation/loggers.nim new file mode 100644 index 0000000..6f24002 --- /dev/null +++ b/taskpools/instrumentation/loggers.nim @@ -0,0 +1,22 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import system/ansi_c + +{.used.} + +template log*(args: varargs[untyped]): untyped = + c_printf(args) + flushFile(stdout) + +template debugTermination*(body: untyped): untyped = + when defined(TP_DebugTermination) or defined(TP_Debug): + {.noSideEffect, gcsafe.}: body + +template debug*(body: untyped): untyped = + when defined(TP_Debug): + {.noSideEffect, gcsafe.}: body diff --git a/taskpools/primitives/affinity_posix.nim b/taskpools/primitives/affinity_posix.nim new file mode 100644 index 0000000..e0a5fd0 --- /dev/null +++ b/taskpools/primitives/affinity_posix.nim @@ -0,0 +1,52 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# Thread primitives +# ---------------------------------------------------------------------------------- + +type + Pthread {.importc: "pthread_t", header: "".} = distinct culong + CpuSet {.byref, importc: "cpu_set_t", header: "".} = object + +proc pthread_self(): Pthread {.header: "".} + +proc pthread_setaffinity_np( + thread: Pthread, + cpuset_size: int, + cpuset: CpuSet + ) {.header: "".} + ## Limit specified `thread` to run only on the processors + ## represented in `cpuset` + +# Note CpuSet is always passed by (hidden) pointer + +proc cpu_zero(cpuset: var CpuSet) {.importc: "CPU_ZERO", header: "".} + ## Clears the set so that it contains no CPU +proc cpu_set(cpu: cint, cpuset: var CpuSet) {.importc: "CPU_SET", header: "".} + ## Add CPU to set + +# Affinity +# ---------------------------------------------------------------------------------- + +# Nim doesn't allow the main thread to set its own affinity + +proc set_thread_affinity(t: Pthread, cpu: int32) {.inline.}= + when defined(osx) or defined(android): + {.warning: "To improve performance we should pin threads to cores.\n" & + "This is not possible with MacOS or Android.".} + # Note: on Android it's even more complex due to the Big.Little architecture + # with cores with different performance profiles to save on battery + else: + var cpuset {.noinit.}: CpuSet + + cpu_zero(cpuset) + cpu_set(cpu, cpuset) + pthread_setaffinity_np(t, sizeof(CpuSet), cpuset) + +proc pinToCpu*(cpu: int32) {.inline.} = + ## Set the affinity of the main thread (the calling thread) + set_thread_affinity(pthread_self(), cpu) diff --git a/taskpools/primitives/affinity_windows.nim b/taskpools/primitives/affinity_windows.nim new file mode 100644 index 0000000..f594311 --- /dev/null +++ b/taskpools/primitives/affinity_windows.nim @@ -0,0 +1,18 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import winlean + +when not compileOption("threads"): + {.error: "This requires --threads:on compilation flag".} + +proc setThreadAffinityMask(hThread: Handle, dwThreadAffinityMask: uint) {. + importc: "SetThreadAffinityMask", stdcall, header: "".} + +proc pinToCpu*(cpu: int32) {.inline.} = + ## Set the affinity of the main thread (the calling thread) + setThreadAffinityMask(getThreadID(), uint(1 shl cpu)) \ No newline at end of file diff --git a/taskpools/primitives/barriers.md b/taskpools/primitives/barriers.md new file mode 100644 index 0000000..1cf679f --- /dev/null +++ b/taskpools/primitives/barriers.md @@ -0,0 +1,53 @@ +# Synchronization Barriers + +OSX does not implement pthread_barrier as its an optional part +of the POSIX standard and they probably want to drive people to libdispatch/Grand Central Dispatch. + +So we need to roll our own with a POSIX compatible API. + +## Glibc barriers, design bug and implementation + +> Note: due to GPL licensing, do not lift the code. +> Not that we can as it is heavily dependent on futexes +> which are not available on OSX + +We need to make sure that we don't hit the same bug +as glibc: https://sourceware.org/bugzilla/show_bug.cgi?id=13065 +which seems to be an issue in some of the barrier implementations +in the wild. + +The design of Glibc barriers is here: +https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/DESIGN-barrier.txt;h=23463c6b7e77231697db3e13933b36ce295365b1;hb=HEAD + +And implementation: +- https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/pthread_barrier_destroy.c;h=76957adef3ee751e5b0cfa429fcf4dd3cfd80b2b;hb=HEAD +- https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/pthread_barrier_init.c;h=c8ebab3a3cb5cbbe469c0d05fb8d9ca0c365b2bb;hb=HEAD` +- https://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/pthread_barrier_wait.c;h=49fcfd370c1c4929fdabdf420f2f19720362e4a0;hb=HEAD + +## Synchronization barrier techniques + +This article goes over the techniques of +"pool barrier" and "ticket barrier" +https://locklessinc.com/articles/barriers/ +to reach 2x to 20x the speed of pthreads barrier + +This course https://cs.anu.edu.au/courses/comp8320/lectures/aux/comp422-Lecture21-Barriers.pdf +goes over +- centralized barrier with sense reversal +- combining tree barrier +- dissemination barrier +- tournament barrier +- scalable tree barrier +More courses: +- http://www.cs.rochester.edu/u/sandhya/csc458/seminars/jb_Barrier_Methods.pdf + +It however requires lightweight mutexes like Linux futexes +that OSX lacks. + +This post goes over lightweight mutexes like Benaphores (from BeOS) +https://preshing.com/20120226/roll-your-own-lightweight-mutex/ + +This gives a few barrier implementations +http://gallium.inria.fr/~maranget/MPRI/02.pdf +and refers to Cubible paper for formally verifying synchronization barriers +http://cubicle.lri.fr/papers/jfla2014.pdf (in French) diff --git a/taskpools/primitives/barriers.nim b/taskpools/primitives/barriers.nim new file mode 100644 index 0000000..4c48a55 --- /dev/null +++ b/taskpools/primitives/barriers.nim @@ -0,0 +1,69 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +when defined(windows): + import ./barriers_windows + when compileOption("assertions"): + import os + + type SyncBarrier* = SynchronizationBarrier + + proc init*(syncBarrier: var SyncBarrier, threadCount: range[0'i32..high(int32)]) {.inline.} = + ## Initialize a synchronization barrier that will block ``threadCount`` threads + ## before release. + let err {.used.} = InitializeSynchronizationBarrier(syncBarrier, threadCount, -1) + when compileOption("assertions"): + if err != 1: + assert err == 0 + raiseOSError(osLastError()) + + proc wait*(syncBarrier: var SyncBarrier): bool {.inline.} = + ## Blocks thread at a synchronization barrier. + ## Returns true for one of the threads (the last one on Windows, undefined on Posix) + ## and false for the others. + result = bool EnterSynchronizationBarrier(syncBarrier, SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE) + + proc delete*(syncBarrier: sink SyncBarrier) {.inline.} = + ## Deletes a synchronization barrier. + ## This assumes no race between waiting at a barrier and deleting it, + ## and reuse of the barrier requires initialization. + DeleteSynchronizationBarrier(syncBarrier.addr) + +else: + import ./barriers_posix + when compileOption("assertions"): + import os + + type SyncBarrier* = PthreadBarrier + + proc init*(syncBarrier: var SyncBarrier, threadCount: range[0'i32..high(int32)]) {.inline.} = + ## Initialize a synchronization barrier that will block ``threadCount`` threads + ## before release. + let err {.used.} = pthread_barrier_init(syncBarrier, nil, threadCount) + when compileOption("assertions"): + if err != 0: + raiseOSError(OSErrorCode(err)) + + proc wait*(syncBarrier: var SyncBarrier): bool {.inline.} = + ## Blocks thread at a synchronization barrier. + ## Returns true for one of the threads (the last one on Windows, undefined on Posix) + ## and false for the others. + let err {.used.} = pthread_barrier_wait(syncBarrier) + when compileOption("assertions"): + if err != PTHREAD_BARRIER_SERIAL_THREAD and err < 0: + raiseOSError(OSErrorCode(err)) + result = if err == PTHREAD_BARRIER_SERIAL_THREAD: true + else: false + + proc delete*(syncBarrier: sink SyncBarrier) {.inline.} = + ## Deletes a synchronization barrier. + ## This assumes no race between waiting at a barrier and deleting it, + ## and reuse of the barrier requires initialization. + let err {.used.} = pthread_barrier_destroy(syncBarrier) + when compileOption("assertions"): + if err < 0: + raiseOSError(OSErrorCode(err)) \ No newline at end of file diff --git a/taskpools/primitives/barriers_macos.nim b/taskpools/primitives/barriers_macos.nim new file mode 100644 index 0000000..dfaa38b --- /dev/null +++ b/taskpools/primitives/barriers_macos.nim @@ -0,0 +1,88 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# OSX doesn't implement pthread_barrier_t +# It's an optional part of the POSIX standard +# +# This is a manual implementation of a sense reversing barrier + +import locks + +type + Natural32 = range[0'i32..high(int32)] + + Errno* = cint + + PthreadAttr* = object + ## Dummy + PthreadBarrier* = object + ## Implementation of a sense reversing barrier + ## (The Art of Multiprocessor Programming by Maurice Herlihy & Nir Shavit) + + lock: Lock # Alternatively spinlock on Atomic + cond {.guard: lock.}: Cond + sense {.guard: lock.}: bool # Choose int32 to avoid zero-expansion cost in registers? + left {.guard: lock.}: Natural32 # Number of threads missing at the barrier before opening + count: Natural32 # Total number of threads that need to arrive before opening the barrier + +const + PTHREAD_BARRIER_SERIAL_THREAD* = Errno(1) + +proc pthread_cond_broadcast(cond: var Cond): Errno {.header:"".} + ## Nim only signal one thread in locks + ## We need to unblock all + +proc broadcast(cond: var Cond) {.inline.}= + discard pthread_cond_broadcast(cond) + +func pthread_barrier_init*( + barrier: var PthreadBarrier, + attr: ptr PthreadAttr, + count: range[0'i32..high(int32)] + ): Errno = + barrier.lock.initLock() + {.locks: [barrier.lock].}: + barrier.cond.initCond() + barrier.left = count + barrier.count = count + # barrier.sense = false + +proc pthread_barrier_wait*(barrier: var PthreadBarrier): Errno = + ## Wait on `barrier` + ## Returns PTHREAD_BARRIER_SERIAL_THREAD for a single arbitrary thread + ## Returns 0 for the other + ## Returns Errno if there is an error + barrier.lock.acquire() + {.locks: [barrier.lock].}: + var local_sense = barrier.sense # Thread local sense + dec barrier.left + + if barrier.left == 0: + # Last thread to arrive at the barrier + # Reverse phase and release it + barrier.left = barrier.count + barrier.sense = not barrier.sense + barrier.cond.broadcast() + barrier.lock.release() + return PTHREAD_BARRIER_SERIAL_THREAD + + while barrier.sense == local_sense: + # We are waiting for threads + # Wait for the sense to reverse + # while loop because we might have spurious wakeups + barrier.cond.wait(barrier.lock) + + # Reversed, we can leave the barrier + barrier.lock.release() + return Errno(0) + +proc pthread_barrier_destroy*(barrier: var PthreadBarrier): Errno = + {.locks: [barrier.lock].}: + barrier.cond.deinitCond() + barrier.lock.deinitLock() + +# TODO: tests diff --git a/taskpools/primitives/barriers_posix.nim b/taskpools/primitives/barriers_posix.nim new file mode 100644 index 0000000..8b45e8f --- /dev/null +++ b/taskpools/primitives/barriers_posix.nim @@ -0,0 +1,51 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# Abstractions over POSIX barriers (non-)implementations + +when not compileOption("threads"): + {.error: "This requires --threads:on compilation flag".} + +# Types +# ------------------------------------------------------- + +when defined(osx): + import ./barriers_macos + export PthreadAttr, PthreadBarrier, Errno, PTHREAD_BARRIER_SERIAL_THREAD +else: + type + PthreadAttr* {.byref, importc: "pthread_attr_t", header: "".} = object + PthreadBarrier* {.byref, importc: "pthread_barrier_t", header: "".} = object + + Errno* = cint + + var PTHREAD_BARRIER_SERIAL_THREAD* {.importc, header:"".}: Errno + +# Pthread +# ------------------------------------------------------- +when defined(osx): + export pthread_barrier_init, pthread_barrier_wait, pthread_barrier_destroy +else: + proc pthread_barrier_init*( + barrier: PthreadBarrier, + attr: PthreadAttr or ptr PthreadAttr, + count: range[0'i32..high(int32)] + ): Errno {.header: "".} + ## Initialize `barrier` with the attributes `attr`. + ## The barrier is opened when `count` waiters arrived. + + proc pthread_barrier_destroy*( + barrier: sink PthreadBarrier): Errno {.header: "".} + ## Destroy a previously dynamically initialized `barrier`. + + proc pthread_barrier_wait*( + barrier: var PthreadBarrier + ): Errno {.header: "".} + ## Wait on `barrier` + ## Returns PTHREAD_BARRIER_SERIAL_THREAD for a single arbitrary thread + ## Returns 0 for the other + ## Returns Errno if there is an error diff --git a/taskpools/primitives/barriers_windows.nim b/taskpools/primitives/barriers_windows.nim new file mode 100644 index 0000000..c330878 --- /dev/null +++ b/taskpools/primitives/barriers_windows.nim @@ -0,0 +1,31 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import winlean + +# Technically in but MSVC complains with +# @m..@s..@sweave@sscheduler.nim.cpp +# C:\Program Files (x86)\Windows Kits\10\include\10.0.17763.0\um\winnt.h(154): fatal error C1189: #error: "No Target Architecture + +type + SynchronizationBarrier*{.importc:"SYNCHRONIZATION_BARRIER", header:"".} = object + +var SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE* {.importc, header: "".}: DWORD + ## Skip expensive checks on barrier enter if a barrier is never deleted. + +proc EnterSynchronizationBarrier*(lpBarrier: var SynchronizationBarrier, dwFlags: DWORD): WINBOOL {.importc, stdcall, header: "".} +proc DeleteSynchronizationBarrier*(lpBarrier: ptr SynchronizationBarrier) {.importc, stdcall, header: "".} +proc InitializeSynchronizationBarrier*(lpBarrier: var SynchronizationBarrier, lTotalThreads: LONG, lSpinCount: LONG): WINBOOL {.importc, stdcall, header: "".} + +when isMainModule: + import os + + var x{.noinit.}: SynchronizationBarrier + let err = InitializeSynchronizationBarrier(x, 2, -1) + if err != 1: + assert err == 0 + raiseOSError(osLastError()) \ No newline at end of file diff --git a/taskpools/shims_pre_1_6/README.md b/taskpools/shims_pre_1_6/README.md new file mode 100644 index 0000000..d28857a --- /dev/null +++ b/taskpools/shims_pre_1_6/README.md @@ -0,0 +1,12 @@ +# Versions + +## std/tasks +- https://github.com/nim-lang/Nim/blob/3619a5a2aa1c7387ec7df01b195bc683943654ff/lib/std/tasks.nim + +We don't support aborting if there is a closure as this requires [#17501](https://github.com/nim-lang/Nim/pull/17501/files) + +## std/isolation +- https://github.com/nim-lang/Nim/blob/603af22b7ca46ac566f8c7c15402028f3f976a4e/lib/std/isolation.nim + +## std/effecttraits +- https://github.com/nim-lang/Nim/blob/603af22b7ca46ac566f8c7c15402028f3f976a4e/lib/std/effecttraits.nim diff --git a/taskpools/shims_pre_1_6/effecttraits.nim b/taskpools/shims_pre_1_6/effecttraits.nim new file mode 100644 index 0000000..358280d --- /dev/null +++ b/taskpools/shims_pre_1_6/effecttraits.nim @@ -0,0 +1,54 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2020 Nim contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module provides access to the inferred .raises effects +## for Nim's macro system. +## **Since**: Version 1.4. +## +## One can test for the existance of this standard module +## via `defined(nimHasEffectTraitsModule)`. + +import macros + +proc getRaisesListImpl(n: NimNode): NimNode = discard "see compiler/vmops.nim" +proc getTagsListImpl(n: NimNode): NimNode = discard "see compiler/vmops.nim" +proc isGcSafeImpl(n: NimNode): bool = discard "see compiler/vmops.nim" +proc hasNoSideEffectsImpl(n: NimNode): bool = discard "see compiler/vmops.nim" + +proc getRaisesList*(fn: NimNode): NimNode = + ## Extracts the `.raises` list of the func/proc/etc `fn`. + ## `fn` has to be a resolved symbol of kind `nnkSym`. This + ## implies that the macro that calls this proc should accept `typed` + ## arguments and not `untyped` arguments. + expectKind fn, nnkSym + result = getRaisesListImpl(fn) + +proc getTagsList*(fn: NimNode): NimNode = + ## Extracts the `.tags` list of the func/proc/etc `fn`. + ## `fn` has to be a resolved symbol of kind `nnkSym`. This + ## implies that the macro that calls this proc should accept `typed` + ## arguments and not `untyped` arguments. + expectKind fn, nnkSym + result = getTagsListImpl(fn) + +proc isGcSafe*(fn: NimNode): bool = + ## Return true if the func/proc/etc `fn` is `gcsafe`. + ## `fn` has to be a resolved symbol of kind `nnkSym`. This + ## implies that the macro that calls this proc should accept `typed` + ## arguments and not `untyped` arguments. + expectKind fn, nnkSym + result = isGcSafeImpl(fn) + +proc hasNoSideEffects*(fn: NimNode): bool = + ## Return true if the func/proc/etc `fn` has `noSideEffect`. + ## `fn` has to be a resolved symbol of kind `nnkSym`. This + ## implies that the macro that calls this proc should accept `typed` + ## arguments and not `untyped` arguments. + expectKind fn, nnkSym + result = hasNoSideEffectsImpl(fn) diff --git a/taskpools/shims_pre_1_6/isolation.nim b/taskpools/shims_pre_1_6/isolation.nim new file mode 100644 index 0000000..a55cb07 --- /dev/null +++ b/taskpools/shims_pre_1_6/isolation.nim @@ -0,0 +1,50 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2020 Nim contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements the `Isolated[T]` type for +## safe construction of isolated subgraphs that can be +## passed efficiently to different channels and threads. +## +## .. warning:: This module is experimental and its interface may change. +## + +type + Isolated*[T] = object ## Isolated data can only be moved, not copied. + value: T + +proc `=copy`*[T](dest: var Isolated[T]; src: Isolated[T]) {.error.} + +proc `=sink`*[T](dest: var Isolated[T]; src: Isolated[T]) {.inline.} = + # delegate to value's sink operation + `=sink`(dest.value, src.value) + +proc `=destroy`*[T](dest: var Isolated[T]) {.inline.} = + # delegate to value's destroy operation + `=destroy`(dest.value) + +# XXX: removed the {.magic: "Isolate".} +func isolate*[T](value: sink T): Isolated[T] = + ## Creates an isolated subgraph from the expression `value`. + ## Isolation is checked at compile time. + ## + ## Please read https://github.com/nim-lang/RFCs/issues/244 + ## for more details. + Isolated[T](value: value) + +func unsafeIsolate*[T](value: sink T): Isolated[T] = + ## Creates an isolated subgraph from the expression `value`. + ## + ## .. warning:: The proc doesn't check whether `value` is isolated. + ## + Isolated[T](value: value) + +func extract*[T](src: var Isolated[T]): T = + ## Returns the internal value of `src`. + ## The value is moved from `src`. + result = move(src.value) diff --git a/taskpools/shims_pre_1_6/tasks.nim b/taskpools/shims_pre_1_6/tasks.nim new file mode 100644 index 0000000..2d1cb9b --- /dev/null +++ b/taskpools/shims_pre_1_6/tasks.nim @@ -0,0 +1,284 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2021 Nim contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module provides basic primitives for creating parallel programs. +## A `Task` should be only owned by a single Thread, it cannot be shared by threads. + +import std/[macros, typetraits] +import system/ansi_c + +import ./isolation +export isolation + +when compileOption("threads"): + from ./effecttraits import isGcSafe + + +# +# proc hello(a: int, b: string) = +# echo $a & b +# +# let literal = "Nim" +# let t = toTask(hello(521, literal)) +# +# +# is roughly converted to +# +# type +# ScratchObj_369098780 = object +# a: int +# b: string +# +# let scratch_369098762 = cast[ptr ScratchObj_369098780](c_calloc(csize_t 1, +# csize_t sizeof(ScratchObj_369098780))) +# if scratch_369098762.isNil: +# raise newException(OutOfMemDefect, "Could not allocate memory") +# block: +# var isolate_369098776 = isolate(521) +# scratch_369098762.a = extract(isolate_369098776) +# var isolate_369098778 = isolate(literal) +# scratch_369098762.b = extract(isolate_369098778) +# proc hello_369098781(args`gensym3: pointer) {.nimcall.} = +# let objTemp_369098775 = cast[ptr ScratchObj_369098780](args`gensym3) +# let :tmp_369098777 = objTemp_369098775.a +# let :tmp_369098779 = objTemp_369098775.b +# hello(a = :tmp_369098777, b = :tmp_369098779) +# +# proc destroyScratch_369098782(args`gensym3: pointer) {.nimcall.} = +# let obj_369098783 = cast[ptr ScratchObj_369098780](args`gensym3) +# =destroy(obj_369098783[]) +# let t = Task(callback: hello_369098781, args: scratch_369098762, destroy: destroyScratch_369098782) +# + + +type + Task* = object ## `Task` contains the callback and its arguments. + callback: proc (args: pointer) {.nimcall, gcsafe.} + args: pointer + destroy: proc (args: pointer) {.nimcall.} + + +proc `=copy`*(x: var Task, y: Task) {.error.} + +proc `=destroy`*(t: var Task) {.inline.} = + ## Frees the resources allocated for a `Task`. + if t.args != nil: + if t.destroy != nil: + t.destroy(t.args) + c_free(t.args) + +proc invoke*(task: Task) {.inline, gcsafe.} = + ## Invokes the `task`. + assert task.callback != nil + task.callback(task.args) + +template checkIsolate(scratchAssignList: seq[NimNode], procParam, scratchDotExpr: NimNode) = + # block: + # var isoTempA = isolate(521) + # scratch.a = extract(isolateA) + # var isoTempB = isolate(literal) + # scratch.b = extract(isolateB) + let isolatedTemp = genSym(nskTemp, "isoTemp") + + # XXX: Fix sym bindings + # scratchAssignList.add newVarStmt(isolatedTemp, newCall(newidentNode("isolate"), procParam)) + # scratchAssignList.add newAssignment(scratchDotExpr, + # newcall(newIdentNode("extract"), isolatedTemp)) + scratchAssignList.add newVarStmt(isolatedTemp, newCall(bindSym("isolate"), procParam)) + scratchAssignList.add newAssignment(scratchDotExpr, + newcall(bindSym("extract"), isolatedTemp)) + +template addAllNode(assignParam: NimNode, procParam: NimNode) = + let scratchDotExpr = newDotExpr(scratchIdent, formalParams[i][0]) + + checkIsolate(scratchAssignList, procParam, scratchDotExpr) + + let tempNode = genSym(kind = nskTemp, ident = formalParams[i][0].strVal) + callNode.add nnkExprEqExpr.newTree(formalParams[i][0], tempNode) + tempAssignList.add newLetStmt(tempNode, newDotExpr(objTemp, formalParams[i][0])) + scratchRecList.add newIdentDefs(newIdentNode(formalParams[i][0].strVal), assignParam) + +macro toTask*(e: typed{nkCall | nkInfix | nkPrefix | nkPostfix | nkCommand | nkCallStrLit}): Task = + ## Converts the call and its arguments to `Task`. + runnableExamples("--gc:orc"): + proc hello(a: int) = echo a + + let b = toTask hello(13) + assert b is Task + + doAssert getTypeInst(e).typeKind == ntyVoid + + # requires 1.6 + # when compileOption("threads"): + # if not isGcSafe(e[0]): + # error("'toTask' takes a GC safe call expression") + + # TODO + # https://github.com/nim-lang/Nim/pull/17501/files + # + # if hasClosure(e[0]): + # error("closure call is not allowed") + + if e.len > 1: + let scratchIdent = genSym(kind = nskTemp, ident = "scratch") + let impl = e[0].getTypeInst + + when defined(nimTasksDebug): + echo impl.treeRepr + echo e.treeRepr + let formalParams = impl[0] + + var + scratchRecList = newNimNode(nnkRecList) + scratchAssignList: seq[NimNode] + tempAssignList: seq[NimNode] + callNode: seq[NimNode] + + let + objTemp = genSym(nskTemp, ident = "objTemp") + + for i in 1 ..< formalParams.len: + var param = formalParams[i][1] + + if param.kind == nnkBracketExpr and param[0].eqIdent("sink"): + param = param[0] + + if param.typeKind in {ntyExpr, ntyStmt}: + error("'toTask'ed function cannot have a 'typed' or 'untyped' parameter") + + case param.kind + of nnkVarTy: + error("'toTask'ed function cannot have a 'var' parameter") + of nnkBracketExpr: + if param[0].typeKind == ntyTypeDesc: + callNode.add nnkExprEqExpr.newTree(formalParams[i][0], e[i]) + elif param[0].typeKind in {ntyVarargs, ntyOpenArray}: + if param[1].typeKind in {ntyExpr, ntyStmt}: + error("'toTask'ed function cannot have a 'typed' or 'untyped' parameter") + let + seqType = nnkBracketExpr.newTree(newIdentNode("seq"), param[1]) + seqCallNode = newcall("@", e[i]) + addAllNode(seqType, seqCallNode) + else: + addAllNode(param, e[i]) + of nnkBracket, nnkObjConstr: + # passing by static parameters + # so we pass them directly instead of passing by scratchObj + callNode.add nnkExprEqExpr.newTree(formalParams[i][0], e[i]) + of nnkSym, nnkPtrTy: + addAllNode(param, e[i]) + of nnkCharLit..nnkNilLit: + callNode.add nnkExprEqExpr.newTree(formalParams[i][0], e[i]) + else: + error("not supported type kinds") + + let scratchObjType = genSym(kind = nskType, ident = "ScratchObj") + let scratchObj = nnkTypeSection.newTree( + nnkTypeDef.newTree( + scratchObjType, + newEmptyNode(), + nnkObjectTy.newTree( + newEmptyNode(), + newEmptyNode(), + scratchRecList + ) + ) + ) + + + let scratchObjPtrType = quote do: + cast[ptr `scratchObjType`](c_calloc(csize_t 1, csize_t sizeof(`scratchObjType`))) + + let scratchLetSection = newLetStmt( + scratchIdent, + scratchObjPtrType + ) + + let scratchCheck = quote do: + if `scratchIdent`.isNil: + # Renamed in 1.4 + # raise newException(OutOfMemDefect, "Could not allocate memory") + raise newException(OutOfMemError, "Could not allocate memory") + + var stmtList = newStmtList() + stmtList.add(scratchObj) + stmtList.add(scratchLetSection) + stmtList.add(scratchCheck) + stmtList.add(nnkBlockStmt.newTree(newEmptyNode(), newStmtList(scratchAssignList))) + + var functionStmtList = newStmtList() + let funcCall = newCall(e[0], callNode) + functionStmtList.add tempAssignList + functionStmtList.add funcCall + + let funcName = genSym(nskProc, e[0].strVal) + let destroyName = genSym(nskProc, "destroyScratch") + let objTemp2 = genSym(ident = "obj") + let tempNode = quote("@") do: + `=destroy`(@objTemp2[]) + + result = quote do: + `stmtList` + + proc `funcName`(args: pointer) {.gcsafe, nimcall.} = + let `objTemp` = cast[ptr `scratchObjType`](args) + `functionStmtList` + + proc `destroyName`(args: pointer) {.nimcall.} = + let `objTemp2` = cast[ptr `scratchObjType`](args) + `tempNode` + + Task(callback: `funcName`, args: `scratchIdent`, destroy: `destroyName`) + else: + let funcCall = newCall(e[0]) + let funcName = genSym(nskProc, e[0].strVal) + + result = quote do: + proc `funcName`(args: pointer) {.gcsafe, nimcall.} = + `funcCall` + + Task(callback: `funcName`, args: nil) + + when defined(nimTasksDebug): + echo result.repr + +runnableExamples("--gc:orc"): + block: + var num = 0 + proc hello(a: int) = inc num, a + + let b = toTask hello(13) + b.invoke() + assert num == 13 + # A task can be invoked multiple times + b.invoke() + assert num == 26 + + block: + type + Runnable = ref object + data: int + + var data: int + proc hello(a: Runnable) {.nimcall.} = + a.data += 2 + data = a.data + + + when false: + # the parameters of call must be isolated. + let x = Runnable(data: 12) + let b = toTask hello(x) # error ----> expression cannot be isolated: x + b.invoke() + + let b = toTask(hello(Runnable(data: 12))) + b.invoke() + assert data == 14 + b.invoke() + assert data == 16 diff --git a/taskpools/sparsesets.nim b/taskpools/sparsesets.nim new file mode 100644 index 0000000..638d483 --- /dev/null +++ b/taskpools/sparsesets.nim @@ -0,0 +1,151 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + std/random, + system/ansi_c, + ./instrumentation/contracts + +const TP_MaxWorkers = 255 +type Setuint = uint8 # We support at most 255 threads (0xFF is kept as special value to signify absence in the set) + +const Empty = high(Setuint) + +type + SparseSet* = object + ## Stores efficiently a set of integers in the range [0 .. Capacity) + ## Supports: + ## - O(1) inclusion, exclusion and contains + ## - O(1) random pick + ## - O(1) length + ## - O(length) iteration + ## + ## Space: Capacity * sizeof(words) + ## + ## This is contrary to bitsets which requires: + ## - random picking: multiple random "contains" + a fallback to uncompressing the set + ## - O(Capacity/sizeof(words)) length (via popcounts) + ## - O(capacity) iteration + indices: ptr UncheckedArray[Setuint] + values: ptr UncheckedArray[Setuint] + rawBuffer: ptr UncheckedArray[Setuint] + len*: Setuint + capacity*: Setuint + +func allocate*(s: var SparseSet, capacity: SomeInteger) {.inline.} = + preCondition: capacity <= TP_MaxWorkers + + s.capacity = Setuint capacity + s.rawBuffer = cast[ptr UncheckedArray[Setuint]](c_calloc(csize_t 2*capacity, csize_t sizeof(Setuint))) + s.indices = s.rawBuffer + s.values = cast[ptr UncheckedArray[Setuint]](s.rawBuffer[capacity].addr) + +func delete*(s: var SparseSet) {.inline.} = + s.indices = nil + s.values = nil + c_free(s.rawBuffer) + +func refill*(s: var SparseSet) {.inline.} = + ## Reset the sparseset by including all integers + ## in the range [0 .. Capacity) + preCondition: not s.indices.isNil + preCondition: not s.values.isNil + preCondition: not s.rawBuffer.isNil + preCondition: s.capacity != 0 + + s.len = s.capacity + + for i in Setuint(0) ..< s.len: + s.indices[i] = i + s.values[i] = i + +func isEmpty*(s: SparseSet): bool {.inline.} = + s.len == 0 + +func contains*(s: SparseSet, n: SomeInteger): bool {.inline.} = + assert n.int != Empty.int + s.indices[n] != Empty + +func incl*(s: var SparseSet, n: SomeInteger) {.inline.} = + preCondition: n < Empty + + if n in s: return + + preCondition: s.len < s.capacity + + s.indices[n] = s.len + s.values[s.len] = n + s.len += 1 + +func peek*(s: SparseSet): int32 {.inline.} = + ## Returns the last point in the set + ## Note: if an item is deleted this is not the last inserted point + preCondition: s.len.int > 0 + int32 s.values[s.len - 1] + +func excl*(s: var SparseSet, n: SomeInteger) {.inline.} = + if n notin s: return + + # We do constant time deletion by replacing the deleted + # integer by the last value in the array of values + + let delIdx = s.indices[n] + + s.len -= 1 + let lastVal = s.values[s.len] + + s.indices[lastVal] = del_idx # Last value now points to deleted index + s.values[delIdx] = s.values[lastVal] # Deleted item is now last value + + # Erase the item + s.indices[n] = Empty + +func randomPick*(s: SparseSet, rng: var Rand): int {.inline.} = + ## Randomly pick from the set. + # The value is NOT removed from it. + let pickIdx = rng.rand(s.len-1) + result = s.values[pickIdx].int + +func `$`*(s: SparseSet): string = + $toOpenArray(s.values, 0, s.len.int - 1) + +# Sanity checks +# ------------------------------------------------------------------------------ + +when isMainModule: + + const Size = 10 + const Picked = 5 + + var S: SparseSet + S.allocate(Size) + S.refill() + echo S + + var rngState = initRand(123) + var picked: seq[int] + + for _ in 0 ..< Picked: + let p = S.randomPick(rngState) + picked.add p + S.excl p + echo "---" + echo "picked: ", p + echo "S indices: ", toOpenArray(S.indices, 0, S.capacity.int - 1) + + echo "---" + echo "picked: ", picked + echo "S: ", S + echo "S indices: ", toOpenArray(S.indices, 0, S.capacity.int - 1) + + for x in 0 ..< Size: + if x notin picked: + echo x, " notin picked -> in S" + doAssert x in S + else: + echo x, " in picked -> notin S" + doAssert x notin S diff --git a/taskpools/taskpools.nim b/taskpools/taskpools.nim new file mode 100644 index 0000000..7a93fef --- /dev/null +++ b/taskpools/taskpools.nim @@ -0,0 +1,530 @@ +# Nim-Taskpools +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# Taskpools +# +# This file implements a taskpool +# +# Implementation: +# +# It is a simple shared memory based work-stealing threadpool. +# The primary focus is: +# - Delegate compute intensive tasks to the threadpool. +# - Simple to audit by staying close to foundational papers +# and using simple datastructures otherwise. +# - Low energy consumption: +# threads should be put to sleep ASAP +# instead of polling/spinning (energy vs latency tradeoff) +# - Decent performance: +# Work-stealing has optimal asymptotic parallel speedup. +# Work-stealing has significantly reduced contention +# when many tasks are created, +# for example by divide-and-conquer algorithms, compared to a global task queue +# +# Not a priority: +# - Handling trillions of very short tasks (less than 100µs). +# - Advanced task dependencies or events API. +# - Unbalanced parallel-for loops. +# - Handling services that should run for the lifetime of the program. +# +# Doing IO on a compute threadpool should be avoided +# In case a thread is blocked for IO, other threads can steal pending tasks in that thread. +# If all threads are pending for IO, the threadpool will not make any progress and be soft-locked. + +{.push raises: [].} + +import + system/ansi_c, + std/[random, cpuinfo, atomics, macros], + ./channels_spsc_single, + ./chase_lev_deques, + ./event_notifiers, + ./primitives/barriers, + ./instrumentation/[contracts, loggers], + ./sparsesets, + ./flowvars, + ./ast_utils + +export + # flowvars + Flowvar, isSpawned, isReady, sync + +when defined(windows): + import ./primitives/affinity_windows +else: + import ./primitives/affinity_posix + +when (NimMajor,NimMinor,NimPatch) >= (1,6,0): + import std/tasks +else: + import ./shims_pre_1_6/tasks + +type + WorkerID = int32 + + TaskNode = ptr object + # Linked list of tasks + parent: TaskNode + task: Task + + Signal = object + terminate {.align: 64.}: Atomic[bool] + + WorkerContext = object + ## Thread-local worker context + + # Params + id: WorkerID + taskpool: Taskpool + + # Tasks + taskDeque: ptr ChaseLevDeque[TaskNode] # owned task deque + currentTask: TaskNode + + # Synchronization + eventNotifier: ptr EventNotifier # shared event notifier + signal: ptr Signal # owned signal + + # Thefts + rng: Rand # RNG state to select victims + numThreads: int + otherDeques: ptr UncheckedArray[ChaseLevDeque[TaskNode]] + victims: SparseSet + + Taskpool* = ptr object + barrier: SyncBarrier + ## Barrier for initialization and teardown + eventNotifier: EventNotifier + ## Puts thread to sleep + + numThreads{.align: 64.}: int + workerDeques: ptr UncheckedArray[ChaseLevDeque[TaskNode]] + ## Direct access for task stealing + workers: ptr UncheckedArray[Thread[(Taskpool, WorkerID)]] + workerSignals: ptr UncheckedArray[Signal] + ## Access signaledTerminate + +# Thread-local config +# --------------------------------------------- + +var workerContext {.threadvar.}: WorkerContext + ## Thread-local Worker context + +proc setupWorker() = + ## Initialize the thread-local context of a worker + ## Requires the ID and taskpool fields to be initialized + template ctx: untyped = workerContext + + preCondition: not ctx.taskpool.isNil() + preCondition: 0 <= ctx.id and ctx.id < ctx.taskpool.numThreads + preCondition: not ctx.taskpool.workerDeques.isNil() + preCondition: not ctx.taskpool.workerSignals.isNil() + + # Thefts + ctx.rng = initRand(0xEFFACED + ctx.id) + ctx.numThreads = ctx.taskpool.numThreads + ctx.otherDeques = ctx.taskpool.workerDeques + ctx.victims.allocate(ctx.taskpool.numThreads) + + # Synchronization + ctx.eventNotifier = addr ctx.taskpool.eventNotifier + ctx.signal = addr ctx.taskpool.workerSignals[ctx.id] + ctx.signal.terminate.store(false, moRelaxed) + + # Tasks + ctx.taskDeque = addr ctx.taskpool.workerDeques[ctx.id] + ctx.currentTask = nil + + # Init + ctx.taskDeque[].init() + +proc teardownWorker() = + ## Cleanup the thread-local context of a worker + template ctx: untyped = workerContext + ctx.taskDeque[].teardown() + ctx.victims.delete() + +proc eventLoop(ctx: var WorkerContext) {.raises:[Exception].} + +proc workerEntryFn(params: tuple[taskpool: Taskpool, id: WorkerID]) + {.raises: [Exception].} = + ## On the start of the threadpool workers will execute this + ## until they receive a termination signal + # We assume that thread_local variables start all at their binary zero value + preCondition: workerContext == default(WorkerContext) + + template ctx: untyped = workerContext + + # If the following crashes, you need --tlsEmulation:off + ctx.id = params.id + ctx.taskpool = params.taskpool + + setupWorker() + + # 1 matching barrier in Taskpool.new() for root thread + discard params.taskpool.barrier.wait() + + {.gcsafe.}: # Not GC-safe when multi-threaded due to thread-local variables + ctx.eventLoop() + + debugTermination: + log(">>> Worker %2d shutting down <<<\n", ctx.id) + + # 1 matching barrier in taskpool.shutdown() for root thread + discard params.taskpool.barrier.wait() + + teardownWorker() + +# Tasks +# --------------------------------------------- + +proc new(T: type TaskNode, parent: TaskNode, task: sink Task): T = + type TaskNodeObj = typeof(default(T)[]) + var tn = cast[TaskNode](c_calloc(1, csize_t sizeof(TaskNodeObj))) + tn.parent = parent + tn.task = task + return tn + +proc runTask(tn: var TaskNode) {.raises:[Exception], inline.} = + ## Run a task and consumes the taskNode + tn.task.invoke() + tn.c_free() + +proc schedule(ctx: WorkerContext, tn: sink TaskNode) {.inline.} = + ## Schedule a task in the taskpool + debug: log("Worker %2d: schedule task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, tn, tn.parent, ctx.currentTask) + ctx.taskDeque[].push(tn) + ctx.taskpool.eventNotifier.notify() + +# Scheduler +# --------------------------------------------- + +proc trySteal(ctx: var WorkerContext): TaskNode = + ## Try to steal a task. + + ctx.victims.refill() + ctx.victims.excl(ctx.id) + + while not ctx.victims.isEmpty(): + let target = ctx.victims.randomPick(ctx.rng) + + let stolenTask = ctx.otherDeques[target].steal() + if not stolenTask.isNil: + return stolenTask + + ctx.victims.excl(target) + + return nil + +proc eventLoop(ctx: var WorkerContext) {.raises:[Exception].} = + ## Each worker thread executes this loop over and over. + while not ctx.signal.terminate.load(moRelaxed): + # 1. Pick from local deque + debug: log("Worker %2d: eventLoop 1 - searching task from local deque\n", ctx.id) + while (var taskNode = ctx.taskDeque[].pop(); not taskNode.isNil): + debug: log("Worker %2d: eventLoop 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask) + taskNode.runTask() + + # 2. Run out of tasks, become a thief + debug: log("Worker %2d: eventLoop 2 - becoming a thief\n", ctx.id) + var stolenTask = ctx.trySteal() + if not stolenTask.isNil: + # 2.a Run task + debug: log("Worker %2d: eventLoop 2.a - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, stolenTask, stolenTask.parent, ctx.currentTask) + stolenTask.runTask() + else: + # 2.b Park the thread until a new task enters the taskpool + debug: log("Worker %2d: eventLoop 2.b - sleeping\n", ctx.id) + ctx.eventNotifier[].park() + debug: log("Worker %2d: eventLoop 2.b - waking\n", ctx.id) + +# Tasking +# --------------------------------------------- + +const RootTask = default(Task) # TODO: sentinel value different from null task + +template isRootTask(task: Task): bool = + task == RootTask + +proc forceFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[Exception].} = + ## Eagerly complete an awaited FlowVar + + template ctx: untyped = workerContext + + template isFutReady(): untyped = + fv.chan[].tryRecv(parentResult) + + if isFutReady(): + return + + ## 1. Process all the children of the current tasks. + ## This ensures that we can give control back ASAP. + debug: log("Worker %2d: sync 1 - searching task from local deque\n", ctx.id) + while (var taskNode = ctx.taskDeque[].pop(); not taskNode.isNil): + if taskNode.parent != ctx.currentTask: + debug: log("Worker %2d: sync 1 - skipping non-direct descendant task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask) + ctx.schedule(taskNode) + break + debug: log("Worker %2d: sync 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask) + taskNode.runTask() + if isFutReady(): + debug: log("Worker %2d: sync 1 - future ready, exiting\n", ctx.id) + return + + ## 2. We run out-of-tasks or out-of-direct-child of our current awaited task + ## So the task is bottlenecked by dependencies in other threads, + ## hence we abandon our enqueued work and steal in the others' queues + ## in hope it advances our awaited task. This prioritizes latency over throughput. + debug: log("Worker %2d: sync 2 - future not ready, becoming a thief (currentTask 0x%.08x)\n", ctx.id, ctx.currentTask) + while not isFutReady(): + var taskNode = ctx.trySteal() + + if not taskNode.isNil: + # We stole some task, we hope we advance our awaited task + debug: log("Worker %2d: sync 2.1 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask) + taskNode.runTask() + # elif (taskNode = ctx.taskDeque[].pop(); not taskNode.isNil): + # # We advance our own queue, this increases throughput but may impact latency on the awaited task + # debug: log("Worker %2d: sync 2.2 - couldn't steal, running own task\n", ctx.id) + # taskNode.runTask() + else: + # We don't park as there is no notif for task completion + cpuRelax() + +proc syncAll*(pool: Taskpool) {.raises: [Exception].} = + ## Blocks until all pending tasks are completed + ## This MUST only be called from + ## the root scope that created the taskpool + template ctx: untyped = workerContext + + debugTermination: + log(">>> Worker %2d enters barrier <<<\n", ctx.id) + + preCondition: ctx.id == 0 + preCondition: ctx.currentTask.task.isRootTask() + + # Empty all tasks + var foreignThreadsParked = false + while not foreignThreadsParked: + # 1. Empty local tasks + debug: log("Worker %2d: syncAll 1 - searching task from local deque\n", ctx.id) + while (var taskNode = ctx.taskDeque[].pop(); not taskNode.isNil): + debug: log("Worker %2d: syncAll 1 - running task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask) + taskNode.runTask() + + if ctx.numThreads == 1 or foreignThreadsParked: + break + + # 2. Help other threads + debug: log("Worker %2d: syncAll 2 - becoming a thief\n", ctx.id) + var taskNode = ctx.trySteal() + + if not taskNode.isNil: + # 2.1 We stole some task + debug: log("Worker %2d: syncAll 2.1 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask) + taskNode.runTask() + else: + # 2.2 No task to steal + if pool.eventNotifier.getParked() == pool.numThreads - 1: + # 2.2.1 all threads besides the current are parked + debugTermination: + log("Worker %2d: syncAll 2.2.1 - termination, all other threads sleeping\n", ctx.id) + foreignThreadsParked = true + else: + # 2.2.2 We don't park as there is no notif for task completion + cpuRelax() + + debugTermination: + log(">>> Worker %2d leaves barrier <<<\n", ctx.id) + +# Runtime +# --------------------------------------------- + +proc new*(T: type Taskpool, numThreads = countProcessors()): T {.raises: [Exception].} = + ## Initialize a threadpool that manages `numThreads` threads. + ## Default to the number of logical processors available. + + var tp = cast[T](c_calloc(1, csize_t sizeof(default(Taskpool)[]))) + + tp.barrier.init(numThreads.int32) + tp.eventNotifier.initialize() + tp.numThreads = numThreads + tp.workerDeques = cast[ptr UncheckedArray[ChaseLevDeque[TaskNode]]](c_calloc(csize_t numThreads, csize_t sizeof ChaseLevDeque[TaskNode])) + tp.workers = cast[ptr UncheckedArray[Thread[(Taskpool, WorkerID)]]](c_calloc(csize_t numThreads, csize_t sizeof Thread[(Taskpool, WorkerID)])) + tp.workerSignals = cast[ptr UncheckedArray[Signal]](c_calloc(csize_t numThreads, csize_t sizeof Signal)) + + # Setup master thread + workerContext.id = 0 + workerContext.taskpool = tp + when not(defined(cpp) and defined(vcc)): + # TODO: Nim casts between Windows Handles but that requires reinterpret cast for C++ + pinToCpu(0) + + # Start worker threads + for i in 1 ..< numThreads: + createThread(tp.workers[i], worker_entry_fn, (tp, WorkerID(i))) + # TODO: we might want to take into account Hyper-Threading (HT) + # and allow spawning tasks and pinning to cores that are not HT-siblings. + # This is important for memory-bound workloads (like copy, addition, ...) + # where both sibling cores will compete for L1 and L2 cache, effectively + # halving the memory bandwidth or worse, flushing what the other put in cache. + # Note that while 2x siblings is common, Xeon Phi has 4x Hyper-Threading. + when not(defined(cpp) and defined(vcc)): + # TODO: Nim casts between Windows Handles but that requires reinterpret cast for C++ + pinToCpu(tp.workers[i], i) + + # Root worker + setupWorker() + + # Root task, this is a sentinel task that is never called. + workerContext.currentTask = TaskNode.new( + parent = nil, + task = default(Task) # TODO RootTask, somehow this uses `=copy` + ) + + # Wait for the child threads + discard tp.barrier.wait() + return tp + +proc cleanup(tp: var TaskPool) {.raises: [OSError].} = + ## Cleanup all resources allocated by the taskpool + preCondition: workerContext.currentTask.task.isRootTask() + + for i in 1 ..< tp.numThreads: + joinThread(tp.workers[i]) + + tp.workerSignals.c_free() + tp.workers.c_free() + tp.workerDeques.c_free() + `=destroy`(tp.eventNotifier) + tp.barrier.delete() + + tp.c_free() + +proc shutdown*(tp: var TaskPool) {.raises:[Exception].} = + ## Wait until all tasks are processed and then shutdown the taskpool + preCondition: workerContext.currentTask.task.isRootTask() + tp.syncAll() + + # Signal termination to all threads + for i in 0 ..< tp.numThreads: + tp.workerSignals[i].terminate.store(true, moRelaxed) + + let parked = tp.eventNotifier.getParked() + for i in 0 ..< parked: + tp.eventNotifier.notify() + + # 1 matching barrier in worker_entry_fn + discard tp.barrier.wait() + + teardownWorker() + tp.cleanup() + + # Dealloc dummy task + workerContext.currentTask.c_free() + +# Task parallelism +# --------------------------------------------- +{.pop.} # raises:[] + +macro spawn*(tp: TaskPool, fnCall: typed): untyped = + ## Spawns the input function call asynchronously, potentially on another thread of execution. + ## + ## If the function calls returns a result, spawn will wrap it in a Flowvar. + ## You can use `sync` to block the current thread and extract the asynchronous result from the flowvar. + ## You can use `isReady` to check if result is available and if subsequent + ## `spawn` returns immediately. + ## + ## Tasks are processed approximately in Last-In-First-Out (LIFO) order + result = newStmtList() + + let fn = fnCall[0] + let fnName = $fn + + # Get the return type if any + let retType = fnCall[0].getImpl[3][0] + let needFuture = retType.kind != nnkEmpty + + # Package in a task + let taskNode = ident("taskNode") + let task = ident("task") + if not needFuture: + result.add quote do: + let `task` = toTask(`fnCall`) + let `taskNode` = TaskNode.new(workerContext.currentTask, `task`) + schedule(workerContext, `taskNode`) + + else: + # tasks have no return value. + # 1. We create a channel/flowvar to transmit the return value to awaiter/sync + # 2. We create a wrapper async_fn without return value that send the return value in the channel + # 3. We package that wrapper function in a task + + # 1. Create the channel + let fut = ident("fut") + let futTy = nnkBracketExpr.newTree( + bindSym"FlowVar", + retType + ) + result.add quote do: + let `fut` = newFlowVar(type `retType`) + + # 2. Create a wrapper function that sends result to the channel + # TODO, upstream "getImpl" doesn't return the generic params + let genericParams = fn.getImpl()[2].replaceSymsByIdents() + let formalParams = fn.getImpl()[3].replaceSymsByIdents() + + var asyncParams = nnkFormalParams.newTree( + newEmptyNode() + ) + var fnCallIdents = nnkCall.newTree( + fnCall[0] + ) + for i in 1 ..< formalParams.len: + let ident = formalParams[i].replaceSymsByIdents() + asyncParams.add ident + for j in 0 ..< ident.len - 2: + # Handle "a, b: int" + fnCallIdents.add ident[j] + + let futFnParam = ident("fut") + asyncParams.add newIdentDefs(futFnParam, futTy) + + let asyncBody = quote do: + # XXX: can't test that when the RootTask is default(Task) instead of a sentinel value + # preCondition: not isRootTask(workerContext.currentTask.task) + + let res = `fnCallIdents` + readyWith(`futFnParam`, res) + + let asyncFn = ident("taskpool_" & fnName) + result.add nnkProcDef.newTree( + asyncFn, + newEmptyNode(), + genericParams, + asyncParams, + nnkPragma.newTree(ident("nimcall")), + newEmptyNode(), + asyncBody + ) + + var asyncCall = newCall(asyncFn) + for i in 1 ..< fnCall.len: + asyncCall.add fnCall[i].replaceSymsByIdents() + asyncCall.add fut + + result.add quote do: + let `task` = toTask(`asyncCall`) + let `taskNode` = TaskNode.new(workerContext.currentTask, `task`) + schedule(workerContext, `taskNode`) + + # Return the future / flowvar + `fut` + + # Wrap in a block for namespacing + result = nnkBlockStmt.newTree(newEmptyNode(), result) + echo result.toStrLit()