diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..74f8634 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,181 @@ +name: Taskpools CI +on: [push, pull_request] + +jobs: + build: + strategy: + fail-fast: false + max-parallel: 20 + matrix: + branch: [version-1-2, version-1-4] + target: + - os: linux + cpu: amd64 + TEST_LANG: c + - os: linux + cpu: amd64 + TEST_LANG: cpp + - os: linux + cpu: i386 + TEST_LANG: c + - os: macos + cpu: amd64 + TEST_LANG: c + - os: windows + cpu: amd64 + TEST_LANG: c + - os: windows + cpu: amd64 + TEST_LANG: cpp + - os: windows + cpu: i386 + TEST_LANG: c + include: + - target: + os: linux + builder: ubuntu-18.04 + - target: + os: macos + builder: macos-10.15 + - target: + os: windows + builder: windows-2019 + name: '${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ matrix.target.TEST_LANG }}-${{ matrix.target.BACKEND }} (${{ matrix.branch }})' + runs-on: ${{ matrix.builder }} + steps: + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@0.5.0 + with: + access_token: ${{ github.token }} + + - name: Checkout taskpools + uses: actions/checkout@v2 + with: + path: nim-taskpools + + - name: Install dependencies (Linux i386) + if: runner.os == 'Linux' && matrix.target.cpu == 'i386' + run: | + sudo dpkg --add-architecture i386 + sudo apt-fast update -qq + sudo DEBIAN_FRONTEND='noninteractive' apt-fast install \ + --no-install-recommends -yq gcc-multilib g++-multilib \ + libssl-dev:i386 + mkdir -p external/bin + cat << EOF > external/bin/gcc + #!/bin/bash + exec $(which gcc) -m32 "\$@" + EOF + cat << EOF > external/bin/g++ + #!/bin/bash + exec $(which g++) -m32 "\$@" + EOF + chmod 755 external/bin/gcc external/bin/g++ + echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH + + - name: Install dependencies (Windows) + if: runner.os == 'Windows' + shell: bash + run: | + mkdir external + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + arch=64 + else + arch=32 + fi + curl -L "https://nim-lang.org/download/mingw$arch.7z" -o "external/mingw$arch.7z" + curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip + 7z x "external/mingw$arch.7z" -oexternal/ + 7z x external/windeps.zip -oexternal/dlls + echo '${{ github.workspace }}'"/external/mingw$arch/bin" >> $GITHUB_PATH + echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH + + - name: Setup environment + shell: bash + run: echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH + + - name: Get latest Nim commit hash + id: versions + shell: bash + run: | + getHash() { + git ls-remote "https://github.com/$1" "${2:-HEAD}" | cut -f 1 + } + nimHash=$(getHash nim-lang/Nim '${{ matrix.branch }}') + csourcesHash=$(getHash nim-lang/csources) + echo "::set-output name=nim::$nimHash" + echo "::set-output name=csources::$csourcesHash" + - name: Restore prebuilt Nim from cache + id: nim-cache + uses: actions/cache@v1 + with: + path: nim + key: 'nim-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.nim }}' + + - name: Restore prebuilt csources from cache + if: steps.nim-cache.outputs.cache-hit != 'true' + id: csources-cache + uses: actions/cache@v1 + with: + path: csources/bin + key: 'csources-${{ matrix.target.os }}-${{ matrix.target.cpu }}-${{ steps.versions.outputs.csources }}' + + - name: Checkout Nim csources + if: > + steps.csources-cache.outputs.cache-hit != 'true' && + steps.nim-cache.outputs.cache-hit != 'true' + uses: actions/checkout@v2 + with: + repository: nim-lang/csources + path: csources + ref: ${{ steps.versions.outputs.csources }} + + - name: Checkout Nim + if: steps.nim-cache.outputs.cache-hit != 'true' + uses: actions/checkout@v2 + with: + repository: nim-lang/Nim + path: nim + ref: ${{ steps.versions.outputs.nim }} + + - name: Build Nim and associated tools + if: steps.nim-cache.outputs.cache-hit != 'true' + shell: bash + run: | + ncpu= + ext= + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + ext=.exe + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + if [[ ! -e csources/bin/nim$ext ]]; then + make -C csources -j $ncpu CC=gcc ucpu='${{ matrix.target.cpu }}' + else + echo 'Using prebuilt csources' + fi + cp -v csources/bin/nim$ext nim/bin + cd nim + nim c koch + ./koch boot -d:release + ./koch tools -d:release + # clean up to save cache space + rm koch + rm -rf nimcache + rm -rf dist + rm -rf .git + + - name: Run taskpools tests + shell: bash + run: | + export UCPU="$cpu" + cd nim-taskpools + nimble test diff --git a/taskpools.nimble b/taskpools.nimble index a5cb310..7ec8f11 100644 --- a/taskpools.nimble +++ b/taskpools.nimble @@ -34,11 +34,13 @@ task test, "Run Taskpools tests": test "", "examples/e01_simple_tasks.nim" # Benchmarks - test "", "benchmarks/bouncing_producer_consumer/taskpool_bpc.nim" test "", "benchmarks/dfs/taskpool_dfs.nim" test "", "benchmarks/heat/taskpool_heat.nim" test "", "benchmarks/nqueens/taskpool_nqueens.nim" - test "", "benchmarks/single_task_producer/taskpool_spc.nim" + + when not defined(windows): + test "", "benchmarks/single_task_producer/taskpool_spc.nim" + test "", "benchmarks/bouncing_producer_consumer/taskpool_bpc.nim" # TODO - generics in macro issue # test "", "benchmarks/matmul_cache_oblivious/taskpool_matmul_co.nim" diff --git a/taskpools/flowvars.nim b/taskpools/flowvars.nim index 6e6da8e..aa26e36 100644 --- a/taskpools/flowvars.nim +++ b/taskpools/flowvars.nim @@ -6,10 +6,11 @@ # at your option. This file may not be copied, modified, or distributed except according to those terms. import - ./channels_spsc_single, system/ansi_c, + std/os, ./instrumentation/contracts, - std/os + ./channels_spsc_single, + ./primitives/allocs {.push gcsafe.} @@ -32,13 +33,13 @@ type proc newFlowVar*(T: typedesc): Flowvar[T] {.inline.} = let size = 2 + sizeof(T) # full flag + item size + buffer - result.chan = cast[ptr ChannelSPSCSingle](c_calloc(1, csize_t size)) + result.chan = wv_allocAligned(ChannelSPSCSingle, size, alignment = 64) result.chan[].initialize(sizeof(T)) proc cleanup(fv: Flowvar) {.inline.} = # TODO: Nim v1.4+ can use "sink Flowvar" if not fv.chan.isNil: - c_free(fv.chan) + wv_freeAligned(fv.chan) func isSpawned*(fv: Flowvar): bool {.inline.} = ## Returns true if a flowvar is spawned diff --git a/taskpools/primitives/allocs.nim b/taskpools/primitives/allocs.nim new file mode 100644 index 0000000..046464d --- /dev/null +++ b/taskpools/primitives/allocs.nim @@ -0,0 +1,152 @@ +# Weave +# Copyright (c) 2019 Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + system/ansi_c + +# Helpers +# ---------------------------------------------------------------------------------- + +proc isPowerOfTwo*(n: int): bool {.inline.} = + (n and (n - 1)) == 0 + +# TODO: cannot dispatch at compile-time due to https://github.com/nim-lang/Nim/issues/12726 +# but all our use-case are for power of 2 + +func roundNextMultipleOf*(x: Natural, n: Natural): int {.inline.} = + assert n.isPowerOfTwo() + result = (x + n - 1) and not(n - 1) + +# func roundNextMultipleOf*(x: Natural, n: static Natural): int {.inline.} = +# ## Round the input to the next multiple of "n" +# when n.isPowerOfTwo(): +# # n is a power of 2. (If compiler cannot prove that x>0 it does not make the optim) +# result = (x + n - 1) and not(n - 1) +# else: +# result = ((x + n - 1) div n) * n + +# Memory +# ---------------------------------------------------------------------------------- + +# Nim allocShared, createShared, deallocShared +# take a global lock that is absolutely killing performance +# and shows up either: +# - native_queued_spin_lock_slowpath +# - __pthread_mutex_lock and __pthread_mutex_unlock_usercnt +# +# We use system malloc by default, the flag -d:useMalloc is not enough + +template deref*(T: typedesc): typedesc = + ## Return the base object type behind a ptr type + typeof(default(T)[]) + +proc wv_alloc*(T: typedesc): ptr T {.inline.}= + ## Default allocator for the Picasso library + ## This allocates memory to hold the type T + ## and returns a pointer to it + ## + ## Can use Nim allocator to measure the overhead of its lock + ## Memory is not zeroed + when defined(WV_useNimAlloc): + createSharedU(T) + else: + cast[ptr T](c_malloc(csize_t sizeof(T))) + +proc wv_allocPtr*(T: typedesc[ptr], zero: static bool = false): T {.inline.}= + ## Default allocator for the Picasso library + ## This allocates memory to hold the + ## underlying type of the pointer type T. + ## i.e. if T is ptr int, this allocates an int + ## + ## Can use Nim allocator to measure the overhead of its lock + ## Memory is not zeroed + result = wv_alloc(deref(T)) + when zero: + zeroMem(result, sizeof(deref(T))) + +proc wv_alloc*(T: typedesc, len: SomeInteger): ptr UncheckedArray[T] {.inline.} = + ## Default allocator for the Picasso library. + ## This allocates a contiguous chunk of memory + ## to hold ``len`` elements of type T + ## and returns a pointer to it. + ## + ## Can use Nim allocator to measure the overhead of its lock + ## Memory is not zeroed + when defined(WV_useNimAlloc): + cast[type result](createSharedU(T, len)) + else: + cast[type result](c_malloc(csize_t len*sizeof(T))) + +proc wv_free*[T: ptr](p: T) {.inline.} = + when defined(WV_useNimAlloc): + freeShared(p) + else: + c_free(p) + +when defined(windows): + proc alloca(size: int): pointer {.header: "".} +else: + proc alloca(size: int): pointer {.header: "".} + +template alloca*(T: typedesc): ptr T = + cast[ptr T](alloca(sizeof(T))) + +template alloca*(T: typedesc, len: Natural): ptr UncheckedArray[T] = + cast[ptr UncheckedArray[T]](alloca(sizeof(T) * len)) + +when defined(windows): + proc aligned_alloc_windows(size, alignment: csize_t): pointer {.sideeffect,importc:"_aligned_malloc", header:"".} + # Beware of the arg order! + proc wv_freeAligned*[T](p: ptr T){.sideeffect,importc:"_aligned_free", header:"".} +elif defined(osx): + proc posix_memalign(mem: var pointer, alignment, size: csize_t){.sideeffect,importc, header:"".} + proc aligned_alloc(alignment, size: csize_t): pointer {.inline.} = + posix_memalign(result, alignment, size) + proc wv_freeAligned*[T](p: ptr T){.inline.} = + c_free(p) +else: + proc aligned_alloc(alignment, size: csize_t): pointer {.sideeffect,importc, header:"".} + proc wv_freeAligned*[T](p: ptr T){.inline.} = + c_free(p) + +proc wv_allocAligned*(T: typedesc, alignment: static Natural): ptr T {.inline.} = + ## aligned_alloc requires allocating in multiple of the alignment. + static: + assert alignment.isPowerOfTwo() + let # TODO - cannot use a const due to https://github.com/nim-lang/Nim/issues/12726 + size = sizeof(T) + requiredMem = size.roundNextMultipleOf(alignment) + + when defined(windows): + cast[ptr T](aligned_alloc_windows(csize_t requiredMem, csize_t alignment)) + else: + cast[ptr T](aligned_alloc(csize_t alignment, csize_t requiredMem)) + +proc wv_allocAligned*(T: typedesc, size: int, alignment: static Natural): ptr T {.inline.} = + ## aligned_alloc requires allocating in multiple of the alignment. + static: + assert alignment.isPowerOfTwo() + let + requiredMem = size.roundNextMultipleOf(alignment) + + when defined(windows): + cast[ptr T](aligned_alloc_windows(csize_t requiredMem, csize_t alignment)) + else: + cast[ptr T](aligned_alloc(csize_t alignment, csize_t requiredMem)) + +proc wv_allocArrayAligned*(T: typedesc, len: int, alignment: static Natural): ptr UncheckedArray[T] {.inline.} = + ## aligned_alloc requires allocating in multiple of the alignment. + static: + assert alignment.isPowerOfTwo() + let + size = sizeof(T) * len + requiredMem = size.roundNextMultipleOf(alignment) + + when defined(windows): + cast[ptr UncheckedArray[T]](aligned_alloc_windows(csize_t requiredMem, csize_t alignment)) + else: + cast[ptr UncheckedArray[T]](aligned_alloc(csize_t alignment, csize_t requiredMem)) \ No newline at end of file diff --git a/taskpools/taskpools.nim b/taskpools/taskpools.nim index a3ef05c..097f58c 100644 --- a/taskpools/taskpools.nim +++ b/taskpools/taskpools.nim @@ -43,7 +43,7 @@ import ./channels_spsc_single, ./chase_lev_deques, ./event_notifiers, - ./primitives/barriers, + ./primitives/[barriers, allocs], ./instrumentation/[contracts, loggers], ./sparsesets, ./flowvars, @@ -98,6 +98,7 @@ type Taskpool* = ptr object barrier: SyncBarrier ## Barrier for initialization and teardown + # --- Align: 64 eventNotifier: EventNotifier ## Puts thread to sleep @@ -348,14 +349,16 @@ proc new*(T: type Taskpool, numThreads = countProcessors()): T {.raises: [Except ## Initialize a threadpool that manages `numThreads` threads. ## Default to the number of logical processors available. - var tp = cast[T](c_calloc(1, csize_t sizeof(default(Taskpool)[]))) + type TpObj = typeof(default(Taskpool)[]) + # Event notifier requires an extra 64 bytes for alignment + var tp = wv_allocAligned(TpObj, sizeof(TpObj) + 64, 64) tp.barrier.init(numThreads.int32) tp.eventNotifier.initialize() tp.numThreads = numThreads - tp.workerDeques = cast[ptr UncheckedArray[ChaseLevDeque[TaskNode]]](c_calloc(csize_t numThreads, csize_t sizeof ChaseLevDeque[TaskNode])) - tp.workers = cast[ptr UncheckedArray[Thread[(Taskpool, WorkerID)]]](c_calloc(csize_t numThreads, csize_t sizeof Thread[(Taskpool, WorkerID)])) - tp.workerSignals = cast[ptr UncheckedArray[Signal]](c_calloc(csize_t numThreads, csize_t sizeof Signal)) + tp.workerDeques = wv_allocArrayAligned(ChaseLevDeque[TaskNode], numThreads, alignment = 64) + tp.workers = wv_allocArrayAligned(Thread[(Taskpool, WorkerID)], numThreads, alignment = 64) + tp.workerSignals = wv_allocArrayAligned(Signal, numThreads, alignment = 64) # Setup master thread workerContext.id = 0 @@ -397,13 +400,13 @@ proc cleanup(tp: var TaskPool) {.raises: [OSError].} = for i in 1 ..< tp.numThreads: joinThread(tp.workers[i]) - tp.workerSignals.c_free() - tp.workers.c_free() - tp.workerDeques.c_free() + tp.workerSignals.wv_freeAligned() + tp.workers.wv_freeAligned() + tp.workerDeques.wv_freeAligned() `=destroy`(tp.eventNotifier) tp.barrier.delete() - tp.c_free() + tp.wv_freeAligned() proc shutdown*(tp: var TaskPool) {.raises:[Exception].} = ## Wait until all tasks are processed and then shutdown the taskpool