diff --git a/taskpools.nim b/taskpools.nim index 11976ff..7bf18ab 100644 --- a/taskpools.nim +++ b/taskpools.nim @@ -1,5 +1,5 @@ -# Nim-Taskpools -# Copyright (c) 2021 Status Research & Development GmbH +# taskpools +# Copyright (c) 2021- Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). diff --git a/taskpools/ast_utils.nim b/taskpools/ast_utils.nim index b177741..d0a6df8 100644 --- a/taskpools/ast_utils.nim +++ b/taskpools/ast_utils.nim @@ -1,4 +1,4 @@ -# Nim-Taskpools +# taskpools # Copyright (c) 2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). diff --git a/taskpools/channels_spsc_single.nim b/taskpools/channels_spsc_single.nim index 492ce05..12ab53d 100644 --- a/taskpools/channels_spsc_single.nim +++ b/taskpools/channels_spsc_single.nim @@ -1,5 +1,6 @@ -# Weave +# taskpools # Copyright (c) 2019 Mamy André-Ratsimbazafy +# Copyright (c) 2021- Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). @@ -8,12 +9,11 @@ {.push raises: [].} import - std/atomics, - ./instrumentation/[contracts, loggers] + std/[atomics, typetraits] type - ChannelSPSCSingle* = object - ## A type-erased SPSC channel. + ChannelSPSCSingle*[T] = object + ## A single-value SPSC channel ## ## Wait-free bounded single-producer single-consumer channel ## that can only buffer a single item @@ -27,22 +27,13 @@ type ## ## The channel should be the last field of an object if used in an intrusive manner full{.align: 64.}: Atomic[bool] - itemSize*: uint8 - buffer*{.align: 8.}: UncheckedArray[byte] + value*: T -proc `=`( - dest: var ChannelSPSCSingle, - source: ChannelSPSCSingle +proc `=copy`[T]( + dest: var ChannelSPSCSingle[T], + source: ChannelSPSCSingle[T] ) {.error: "A channel cannot be copied".} -proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} = - ## If ChannelSPSCSingle is used intrusive another data structure - ## be aware that it should be the last part due to ending by UncheckedArray - preCondition: itemsize.int in 0 .. int high(uint8) - - chan.itemSize = uint8 itemsize - chan.full.store(false, moRelaxed) - func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} = not chan.full.load(moAcquire) @@ -51,50 +42,46 @@ func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} = ## Returns true if successful (channel was not empty) ## ## ⚠ Use only in the consumer thread that reads from the channel. - preCondition: (sizeof(T) == chan.itemSize.int) or - # Support dummy object - (sizeof(T) == 0 and chan.itemSize == 1) + static: doAssert supportsCopyMem(T), "Channel is not garbage-collection-safe" - let full = chan.full.load(moAcquire) - if not full: - return false - dst = cast[ptr T](chan.buffer.addr)[] - chan.full.store(false, moRelease) - return true + case chan.full.load(moAcquire) + of true: + dst = move(chan.value) + chan.full.store(false, moRelease) + true + of false: + false func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} = ## Try sending an item into the channel ## Reurns true if successful (channel was empty) ## ## ⚠ Use only in the producer thread that writes from the channel. - preCondition: (sizeof(T) == chan.itemSize.int) or - # Support dummy object - (sizeof(T) == 0 and chan.itemSize == 1) + static: doAssert supportsCopyMem(T), "Channel is not garbage-collection-safe" - let full = chan.full.load(moAcquire) - if full: - return false - cast[ptr T](chan.buffer.addr)[] = src - chan.full.store(true, moRelease) - return true + case chan.full.load(moAcquire) + of true: + false + of false: + chan.value = move(src) + chan.full.store(true, moRelease) + true {.pop.} # raises: [] # Sanity checks # ------------------------------------------------------------------------------ when isMainModule: - import system/ansi_c - when not compileOption("threads"): {.error: "This requires --threads:on compilation flag".} - template sendLoop[T](chan: var ChannelSPSCSingle, + template sendLoop[T](chan: var ChannelSPSCSingle[T], data: sink T, body: untyped): untyped = while not chan.trySend(data): body - template recvLoop[T](chan: var ChannelSPSCSingle, + template recvLoop[T](chan: var ChannelSPSCSingle[T], data: var T, body: untyped): untyped = while not chan.tryRecv(data): @@ -103,7 +90,7 @@ when isMainModule: type ThreadArgs = object ID: WorkerKind - chan: ptr ChannelSPSCSingle + chan: ptr ChannelSPSCSingle[int] WorkerKind = enum Sender @@ -144,13 +131,15 @@ when isMainModule: discard echo "Sender sent: ", val + import primitives/allocs proc main() = echo "Testing if 2 threads can send data" echo "-----------------------------------" var threads: array[2, Thread[ThreadArgs]] - var chan = cast[ptr ChannelSPSCSingle](c_calloc(1, csize_t sizeof(ChannelSPSCSingle))) - chan[].initialize(itemSize = sizeof(int)) + var chan = tp_allocAligned( + ChannelSPSCSingle[int], sizeof(ChannelSPSCSingle[int]), 64) + zeroMem(chan, sizeof(ChannelSPSCSingle[int])) createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan)) createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan)) @@ -158,7 +147,7 @@ when isMainModule: joinThread(threads[0]) joinThread(threads[1]) - c_free(chan) + tp_freeAligned(chan) echo "-----------------------------------" echo "Success" diff --git a/taskpools/chase_lev_deques.nim b/taskpools/chase_lev_deques.nim index e51d7c3..977df20 100644 --- a/taskpools/chase_lev_deques.nim +++ b/taskpools/chase_lev_deques.nim @@ -1,4 +1,4 @@ -# Nim-Taskpools +# taskpools # Copyright (c) 2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). diff --git a/taskpools/event_notifiers.nim b/taskpools/event_notifiers.nim index a2cda67..d26c6bb 100644 --- a/taskpools/event_notifiers.nim +++ b/taskpools/event_notifiers.nim @@ -1,4 +1,4 @@ -# Nim-Taskpools +# taskpools # Copyright (c) 2021-2023 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). diff --git a/taskpools/flowvars.nim b/taskpools/flowvars.nim index 9df9ea0..6658765 100644 --- a/taskpools/flowvars.nim +++ b/taskpools/flowvars.nim @@ -1,5 +1,6 @@ -# Weave +# taskpools # Copyright (c) 2019 Mamy André-Ratsimbazafy +# Copyright (c) 2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). @@ -12,8 +13,6 @@ import ./channels_spsc_single, ./primitives/allocs -{.push gcsafe.} - type Flowvar*[T] = object ## A Flowvar is a placeholder for a future result that may be computed in parallel @@ -22,19 +21,19 @@ type # instead of having an extra atomic bool # They also use type-erasure to avoid having duplicate code # due to generic monomorphization. - chan: ptr ChannelSPSCSingle + chan: ptr ChannelSPSCSingle[T] # proc `=copy`*[T](dst: var Flowvar[T], src: Flowvar[T]) {.error: "Futures/Flowvars cannot be copied".} # # Unfortunately we cannot prevent this easily as internally # we need a copy: -# - nim-taskpools level when doing toTask(fnCall(args, fut)) and then returning fut. (Can be worked around with copyMem) +# - taskpools level when doing toTask(fnCall(args, fut)) and then returning fut. (Can be worked around with copyMem) # - in std/tasks (need upstream workaround) proc newFlowVar*(T: typedesc): Flowvar[T] {.inline.} = - let size = 2 + sizeof(T) # full flag + item size + buffer - result.chan = tp_allocAligned(ChannelSPSCSingle, size, alignment = 64) - result.chan[].initialize(sizeof(T)) + result.chan = tp_allocAligned( + ChannelSPSCSingle[T], sizeof(ChannelSPSCSingle[T]), alignment = 64) + zeroMem(result.chan, sizeof(ChannelSPSCSingle[T])) proc cleanup(fv: Flowvar) {.inline.} = # TODO: Nim v1.4+ can use "sink Flowvar" diff --git a/taskpools/taskpools.nim b/taskpools/taskpools.nim index e8d5a81..f45f350 100644 --- a/taskpools/taskpools.nim +++ b/taskpools/taskpools.nim @@ -1,4 +1,4 @@ -# Nim-Taskpools +# taskpools # Copyright (c) 2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).