mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
chore: bump taskpools
.. to support generic workers which allows getting rid of MerkleTask
This commit is contained in:
parent
db8f81cd63
commit
ac5e3c733b
@ -69,13 +69,6 @@ type
|
|||||||
compress*: CompressFn[H, K] # compress function
|
compress*: CompressFn[H, K] # compress function
|
||||||
zero*: H # zero value
|
zero*: H # zero value
|
||||||
|
|
||||||
MerkleTask*[H, K] = object
|
|
||||||
store*: SharedBuf[byte]
|
|
||||||
layerOffsets: SharedBuf[int]
|
|
||||||
compress*: ptr CompressData[H, K]
|
|
||||||
signal*: ThreadSignalPtr
|
|
||||||
success*: Atomic[bool]
|
|
||||||
|
|
||||||
func levels*[H, K](self: MerkleTree[H, K]): int =
|
func levels*[H, K](self: MerkleTree[H, K]): int =
|
||||||
return self.layerOffsets.len
|
return self.layerOffsets.len
|
||||||
|
|
||||||
@ -324,19 +317,20 @@ func merkleTreeWorker[H, K](
|
|||||||
|
|
||||||
merkleTreeWorker(store, offsets, compress, layer + 1, false)
|
merkleTreeWorker(store, offsets, compress, layer + 1, false)
|
||||||
|
|
||||||
proc merkleTreeWorker[H, K](task: ptr MerkleTask[H, K]) {.gcsafe.} =
|
proc merkleTreeWorker[H, K](
|
||||||
|
store: SharedBuf[byte],
|
||||||
|
offsets: SharedBuf[int],
|
||||||
|
compress: ptr CompressData[H, K],
|
||||||
|
signal: ThreadSignalPtr,
|
||||||
|
): bool =
|
||||||
defer:
|
defer:
|
||||||
discard task[].signal.fireSync()
|
discard signal.fireSync()
|
||||||
|
|
||||||
let res = merkleTreeWorker(
|
let res = merkleTreeWorker(
|
||||||
task[].store.toOpenArray(),
|
store.toOpenArray(), offsets.toOpenArray(), compress[], 0, isBottomLayer = true
|
||||||
task[].layerOffsets.toOpenArray(),
|
|
||||||
task[].compress[],
|
|
||||||
0,
|
|
||||||
isBottomLayer = true,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
task[].success.store(res.isOk())
|
return res.isOk()
|
||||||
|
|
||||||
func prepare*[H, K](
|
func prepare*[H, K](
|
||||||
self: MerkleTree[H, K], compressor: CompressFn, zero: H, leaves: openArray[H]
|
self: MerkleTree[H, K], compressor: CompressFn, zero: H, leaves: openArray[H]
|
||||||
@ -379,15 +373,13 @@ proc compute*[H, K](
|
|||||||
defer:
|
defer:
|
||||||
signal.close().expect("closing once works")
|
signal.close().expect("closing once works")
|
||||||
|
|
||||||
var task = MerkleTask[H, K](
|
let res = tp.spawn merkleTreeWorker(
|
||||||
store: SharedBuf.view(self.store),
|
SharedBuf.view(self.store),
|
||||||
layerOffsets: SharedBuf.view(self.layerOffsets),
|
SharedBuf.view(self.layerOffsets),
|
||||||
compress: addr self.compress,
|
addr self.compress,
|
||||||
signal: signal,
|
signal,
|
||||||
)
|
)
|
||||||
|
|
||||||
tp.spawn merkleTreeWorker(addr task)
|
|
||||||
|
|
||||||
# To support cancellation, we'd have to ensure the task we posted to taskpools
|
# To support cancellation, we'd have to ensure the task we posted to taskpools
|
||||||
# exits early - since we're not doing that, block cancellation attempts
|
# exits early - since we're not doing that, block cancellation attempts
|
||||||
try:
|
try:
|
||||||
@ -398,7 +390,7 @@ proc compute*[H, K](
|
|||||||
# a memory violation if we let it run - panic instead
|
# a memory violation if we let it run - panic instead
|
||||||
raiseAssert "Could not wait for signal, was it initialized? " & exc.msg
|
raiseAssert "Could not wait for signal, was it initialized? " & exc.msg
|
||||||
|
|
||||||
if not task.success.load():
|
if not res.sync():
|
||||||
return failure("merkle tree task failed")
|
return failure("merkle tree task failed")
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|||||||
2
vendor/nim-taskpools
vendored
2
vendor/nim-taskpools
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 4acdc6ef005a93dba09f902ed75197548cf7b451
|
Subproject commit 97f76faef6ba64bc77d9808c27ec5e9917e7cfde
|
||||||
Loading…
x
Reference in New Issue
Block a user