diff --git a/codex/codex.nim b/codex/codex.nim index 81357464..928305c1 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -56,7 +56,7 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer - taskpool: Taskpool + taskPool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -194,8 +194,8 @@ proc stop*(s: CodexServer) {.async.} = error "Failed to stop codex node", failures = res.failure.len raiseAssert "Failed to stop codex node" - if not s.taskpool.isNil: - s.taskpool.shutdown() + if not s.taskPool.isNil: + s.taskPool.shutdown() proc new*( T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey @@ -216,16 +216,16 @@ proc new*( var cache: CacheStore = nil - taskpool: Taskpool + taskPool: Taskpool try: if config.numThreads == ThreadCount(0): - taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) + taskPool = Taskpool.new(numThreads = min(countProcessors(), 16)) else: - taskpool = Taskpool.new(numThreads = int(config.numThreads)) - info "Threadpool started", numThreads = taskpool.numThreads + taskPool = Taskpool.new(numThreads = int(config.numThreads)) + info "Threadpool started", numThreads = taskPool.numThreads except CatchableError as exc: - raiseAssert("Failure in taskpool initialization:" & exc.msg) + raiseAssert("Failure in taskPool initialization:" & exc.msg) if config.cacheSize > 0'nb: cache = CacheStore.new(cacheSize = config.cacheSize) @@ -307,7 +307,7 @@ proc new*( if config.prover: let backend = config.initializeBackend().expect("Unable to create prover backend.") - some Prover.new(store, backend, config.numProofSamples) + some Prover.new(store, backend, config.numProofSamples, taskPool) else: none Prover @@ -317,7 +317,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, - taskPool = taskpool, + taskPool = taskPool, ) restServer = RestServerRef @@ -337,5 +337,5 @@ proc new*( restServer: restServer, repoStore: repoStore, maintenance: maintenance, - taskpool: taskpool, + taskPool: taskPool, ) diff --git a/codex/merkletree/poseidon2.nim b/codex/merkletree/poseidon2.nim index 0b6fdd77..64f9bc01 100644 --- a/codex/merkletree/poseidon2.nim +++ b/codex/merkletree/poseidon2.nim @@ -100,7 +100,7 @@ proc init*( var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) var task = Poseidon2TreeTask( - tree: cast[ptr Poseidon2Tree](addr tree), leaves: @leaves, signal: signal + tree: cast[ptr Poseidon2Tree](addr tree), leaves: leaves, signal: signal ) doAssert tp.numThreads > 1, diff --git a/codex/node.nim b/codex/node.nim index e7a3b555..724f8bf3 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -72,7 +72,7 @@ type contracts*: Contracts clock*: Clock storage*: Contracts - taskpool: Taskpool + taskPool: Taskpool trackedFutures: TrackedFutures CodexNodeRef* = ref CodexNode @@ -294,7 +294,7 @@ proc streamEntireDataset( try: # Spawn an erasure decoding job let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg @@ -439,7 +439,7 @@ proc store*( finally: await stream.close() - without tree =? (await CodexTree.init(self.taskpool, cids)), err: + without tree =? (await CodexTree.init(self.taskPool, cids)), err: return failure(err) without treeCid =? tree.rootCid(CIDv1, dataCodec), err: @@ -533,14 +533,15 @@ proc setupRequest( # Erasure code the dataset according to provided parameters let erasure = Erasure.new( - self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: trace "Unable to erasure code dataset" return failure(error) - without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err: + without builder =? + Poseidon2Builder.new(self.networkStore.localStore, encoded, self.taskPool), err: trace "Unable to create slot builder" return failure(err) @@ -644,7 +645,9 @@ proc onStore( return failure(err) without builder =? - Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy), err: + Poseidon2Builder.new( + self.networkStore, manifest, self.taskPool, manifest.verifiableStrategy + ), err: trace "Unable to create slots builder", err = err.msg return failure(err) @@ -679,7 +682,7 @@ proc onStore( trace "start repairing slot", slotIdx try: let erasure = Erasure.new( - self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool ) if err =? (await erasure.repair(manifest)).errorOption: error "Unable to erasure decode repairing manifest", @@ -880,7 +883,7 @@ proc new*( networkStore: NetworkStore, engine: BlockExcEngine, discovery: Discovery, - taskpool: Taskpool, + taskPool: Taskpool, prover = Prover.none, contracts = Contracts.default, ): CodexNodeRef = @@ -893,7 +896,7 @@ proc new*( engine: engine, prover: prover, discovery: discovery, - taskPool: taskpool, + taskPool: taskPool, contracts: contracts, trackedFutures: TrackedFutures(), ) diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 5fbb0fe1..34c3ed9a 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -18,18 +18,20 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results import pkg/constantine/math/io/io_fields +import pkg/taskpools import ../../logutils import ../../utils import ../../stores import ../../manifest import ../../merkletree +import ../../utils/poseidon2digest import ../../utils/asynciter import ../../indexingstrategy import ../converters -export converters, asynciter +export converters, asynciter, poseidon2digest logScope: topics = "codex slotsbuilder" @@ -45,6 +47,7 @@ type SlotsBuilder*[T, H] = ref object of RootObj emptyBlock: seq[byte] # empty block verifiableTree: ?T # verification tree (dataset tree) emptyDigestTree: T # empty digest tree for empty blocks + taskPool: Taskpool func verifiable*[T, H](self: SlotsBuilder[T, H]): bool {.inline.} = ## Returns true if the slots are verifiable. @@ -165,6 +168,35 @@ proc buildBlockTree*[T, H]( success (blk.data, tree) +proc getBlockDigest*[T, H]( + self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural +): Future[?!H] {.async: (raises: [CancelledError]).} = + logScope: + blkIdx = blkIdx + slotPos = slotPos + numSlotBlocks = self.manifest.numSlotBlocks + cellSize = self.cellSize + + trace "Building block tree" + + if slotPos > (self.manifest.numSlotBlocks - 1): + # pad blocks are 0 byte blocks + trace "Returning empty digest tree for pad block" + return self.emptyDigestTree.root + + without blk =? await self.store.getBlock(self.manifest.treeCid, blkIdx), err: + error "Failed to get block CID for tree at index", err = err.msg + return failure(err) + + if blk.isEmpty: + return self.emptyDigestTree.root + + without digest =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err: + error "Failed to create digest for block", err = err.msg + return failure(err) + + return success digest + proc getCellHashes*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural ): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} = @@ -190,8 +222,7 @@ proc getCellHashes*[T, H]( pos = i trace "Getting block CID for tree at index" - without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, - err: + without digest =? (await self.getBlockDigest(blkIdx, i)), err: error "Failed to get block CID for tree at index", err = err.msg return failure(err) @@ -310,6 +341,7 @@ proc new*[T, H]( _: type SlotsBuilder[T, H], store: BlockStore, manifest: Manifest, + taskPool: Taskpool, strategy = LinearStrategy, cellSize = DefaultCellSize, ): ?!SlotsBuilder[T, H] = @@ -383,6 +415,7 @@ proc new*[T, H]( emptyBlock: emptyBlock, numSlotBlocks: numSlotBlocksTotal, emptyDigestTree: emptyDigestTree, + taskPool: taskPool, ) if manifest.verifiable: diff --git a/codex/slots/proofs/prover.nim b/codex/slots/proofs/prover.nim index 1afcd068..bba39e8c 100644 --- a/codex/slots/proofs/prover.nim +++ b/codex/slots/proofs/prover.nim @@ -13,6 +13,7 @@ import pkg/chronicles import pkg/circomcompat import pkg/poseidon2 import pkg/questionable/results +import pkg/taskpools import pkg/libp2p/cid @@ -47,6 +48,7 @@ type backend: AnyBackend store: BlockStore nSamples: int + taskPool: Taskpool proc prove*( self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge @@ -61,7 +63,7 @@ proc prove*( trace "Received proof challenge" - without builder =? AnyBuilder.new(self.store, manifest), err: + without builder =? AnyBuilder.new(self.store, manifest, self.taskPool), err: error "Unable to create slots builder", err = err.msg return failure(err) @@ -88,6 +90,6 @@ proc verify*( self.backend.verify(proof, inputs) proc new*( - _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int + _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int, tp: Taskpool ): Prover = - Prover(store: store, backend: backend, nSamples: nSamples) + Prover(store: store, backend: backend, nSamples: nSamples, taskPool: tp) diff --git a/codex/utils/poseidon2digest.nim b/codex/utils/poseidon2digest.nim index 6eaf21e9..7607aee2 100644 --- a/codex/utils/poseidon2digest.nim +++ b/codex/utils/poseidon2digest.nim @@ -7,13 +7,26 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/[atomics] import pkg/poseidon2 import pkg/questionable/results import pkg/libp2p/multihash import pkg/stew/byteutils +import pkg/taskpools +import pkg/chronos +import pkg/chronos/threadsync import ../merkletree +type DigestTask* = object + signal: ThreadSignalPtr + bytes: seq[byte] + chunkSize: int + success: Atomic[bool] + digest: Isolated[Poseidon2Hash] + +export DigestTask + func spongeDigest*( _: type Poseidon2Hash, bytes: openArray[byte], rate: static int = 2 ): ?!Poseidon2Hash = @@ -30,7 +43,7 @@ func spongeDigest*( success Sponge.digest(bytes, rate) -func digestTree*( +proc digestTree*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!Poseidon2Tree = ## Hashes chunks of data with a sponge of rate 2, and combines the @@ -44,6 +57,7 @@ func digestTree*( var index = 0 var leaves: seq[Poseidon2Hash] + while index < bytes.len: let start = index let finish = min(index + chunkSize, bytes.len) @@ -61,6 +75,52 @@ func digest*( (?Poseidon2Tree.digestTree(bytes, chunkSize)).root +proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} = + defer: + discard task[].signal.fireSync() + + var res = Poseidon2Tree.digest(task[].bytes, task[].chunkSize) + + if res.isErr: + task[].success.store(false) + return + + task[].digest = isolate(res.get()) + task[].success.store(true) + +proc digest*( + _: type Poseidon2Tree, tp: Taskpool, bytes: seq[byte], chunkSize: int +): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} = + without signal =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + signal.close().expect("closing once works") + + doAssert tp.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + + var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize) + + tp.spawn digestWorker(tp, addr task) + + let signalFut = signal.wait() + + if err =? catch(await signalFut.join()).errorOption: + ?catch(await noCancel signalFut) + if err of CancelledError: + raise (ref CancelledError) err + + if not task.success.load(): + return failure("digest task failed") + + defer: + task.digest = default(Isolated[Poseidon2Hash]) + + var digest = task.digest.extract + + success digest + func digestMhash*( _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int ): ?!MultiHash = diff --git a/tests/codex/merkletree/testcodextree.nim b/tests/codex/merkletree/testcodextree.nim index e8cc537a..9ab680ce 100644 --- a/tests/codex/merkletree/testcodextree.nim +++ b/tests/codex/merkletree/testcodextree.nim @@ -1,4 +1,5 @@ import std/sequtils +import std/times import pkg/questionable/results import pkg/stew/byteutils @@ -48,6 +49,7 @@ suite "Test CodexTree": var expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) tree = CodexTree.init(leaves = expectedLeaves) + check: tree.isOk tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) diff --git a/tests/codex/merkletree/testposeidon2tree.nim b/tests/codex/merkletree/testposeidon2tree.nim index 0a3e48ac..f574d637 100644 --- a/tests/codex/merkletree/testposeidon2tree.nim +++ b/tests/codex/merkletree/testposeidon2tree.nim @@ -63,17 +63,19 @@ suite "Test Poseidon2Tree": tree == fromNodes test "Build poseidon2 tree from poseidon2 leaves asynchronously": - var tp = Taskpool.new(numThreads = 2) + echo "Build poseidon2 tree from poseidon2 leaves asynchronously" + var tp = Taskpool.new() defer: tp.shutdown() + echo "@@@@@" let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() - check: tree.leaves == expectedLeaves test "Build poseidon2 tree from byte leaves asynchronously": - var tp = Taskpool.new(numThreads = 2) + echo "Build poseidon2 tree from byte leaves asynchronously" + var tp = Taskpool.new() defer: tp.shutdown() diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index e8d9c743..fcb91e8f 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -56,12 +56,13 @@ asyncchecksuite "Test Node - Host contracts": verifiable: Manifest verifiableBlock: bt.Block protected: Manifest + taskPool: Taskpool setup: # Setup Host Contracts and dependencies market = MockMarket.new() sales = Sales.new(market, clock, localStore) - + taskPool = Taskpool.new() node.contracts = ( none ClientInteractions, some HostInteractions.new(clock, sales), @@ -75,20 +76,23 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifestCid = manifestBlock.cid (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() (await localStore.putBlock(verifiableBlock)).tryGet() + teardown: + taskPool.shutdown() + test "onExpiryUpdate callback is set": check sales.onExpiryUpdate.isSome diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 78298ad7..46e6df3a 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -47,10 +47,15 @@ privateAccess(CodexNodeRef) # enable access to private fields asyncchecksuite "Test Node - Basic": setupAndTearDown() + var taskPool: Taskpool setup: + taskPool = Taskpool.new() await node.start() + teardown: + taskPool.shutdown() + test "Fetch Manifest": let manifest = await storeDataGetManifest(localStore, chunker) @@ -174,13 +179,12 @@ asyncchecksuite "Test Node - Basic": test "Setup purchase request": let - erasure = - Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() diff --git a/tests/codex/node/testslotrepair.nim b/tests/codex/node/testslotrepair.nim index d96078d2..3d588a6d 100644 --- a/tests/codex/node/testslotrepair.nim +++ b/tests/codex/node/testslotrepair.nim @@ -13,6 +13,7 @@ import pkg/codex/contracts import pkg/codex/slots import pkg/codex/manifest import pkg/codex/erasure +import pkg/taskpools import pkg/codex/blocktype as bt import pkg/chronos/transports/stream @@ -51,6 +52,7 @@ asyncchecksuite "Test Node - Slot Repair": ) var manifest: Manifest + taskPool: Taskpool builder: Poseidon2Builder verifiable: Manifest verifiableBlock: bt.Block @@ -100,7 +102,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -118,6 +120,7 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[1].switch.stop() # slot 0 missing now # repair missing slot + (await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now @@ -131,16 +134,19 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[4].switch.stop() # slot 0 missing now # repair missing slot from repaired slots + (await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 1 missing now # repair missing slot from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() await nodes[6].switch.stop() # slot 2 missing now # repair missing slot from repaired slots + (await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() let @@ -179,7 +185,7 @@ asyncchecksuite "Test Node - Slot Repair": (await localStore.putBlock(manifestBlock)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() - builder = Poseidon2Builder.new(localStore, protected).tryGet() + builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet() verifiable = (await builder.buildManifest()).tryGet() verifiableBlock = bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -198,19 +204,24 @@ asyncchecksuite "Test Node - Slot Repair": await nodes[3].switch.stop() # slot 2 missing now # repair missing slots + (await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() + (await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() await nodes[2].switch.stop() # slot 1 missing now await nodes[4].switch.stop() # slot 3 missing now # repair missing slots from repaired slots + (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() + (await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet() await nodes[5].switch.stop() # slot 4 missing now # repair missing slot from repaired slots + (await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet() let diff --git a/tests/codex/slots/backends/testcircomcompat.nim b/tests/codex/slots/backends/testcircomcompat.nim index b61d4f18..637ee36b 100644 --- a/tests/codex/slots/backends/testcircomcompat.nim +++ b/tests/codex/slots/backends/testcircomcompat.nim @@ -3,6 +3,7 @@ import std/options import ../../../asynctest import pkg/chronos +import pkg/taskpools import pkg/poseidon2 import pkg/serde/json @@ -77,6 +78,7 @@ suite "Test Circom Compat Backend": challenge: array[32, byte] builder: Poseidon2Builder sampler: Poseidon2Sampler + taskPool: Taskpool setup: let @@ -85,11 +87,13 @@ suite "Test Circom Compat Backend": store = RepoStore.new(repoDs, metaDs) + taskPool = Taskpool.new() + (manifest, protected, verifiable) = await createVerifiableManifest( - store, numDatasetBlocks, ecK, ecM, blockSize, cellSize + store, numDatasetBlocks, ecK, ecM, blockSize, cellSize, taskPool ) - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskPool).tryGet sampler = Poseidon2Sampler.new(slotId, store, builder).tryGet circom = CircomCompat.init(r1cs, wasm, zkey) @@ -101,6 +105,7 @@ suite "Test Circom Compat Backend": circom.release() # this comes from the rust FFI await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should verify with correct input": var proof = circom.prove(proofInputs).tryGet diff --git a/tests/codex/slots/helpers.nim b/tests/codex/slots/helpers.nim index fced1f1c..01159c21 100644 --- a/tests/codex/slots/helpers.nim +++ b/tests/codex/slots/helpers.nim @@ -12,6 +12,7 @@ import pkg/codex/chunker import pkg/codex/indexingstrategy import pkg/codex/slots import pkg/codex/rng +import pkg/taskpools import ../helpers @@ -145,6 +146,7 @@ proc createVerifiableManifest*( ecM: int, blockSize: NBytes, cellSize: NBytes, + taskPool: Taskpool, ): Future[tuple[manifest: Manifest, protected: Manifest, verifiable: Manifest]] {. async .} = @@ -165,7 +167,9 @@ proc createVerifiableManifest*( totalDatasetSize, ) - builder = Poseidon2Builder.new(store, protectedManifest, cellSize = cellSize).tryGet + builder = Poseidon2Builder.new( + store, protectedManifest, cellSize = cellSize, taskPool = taskPool + ).tryGet verifiableManifest = (await builder.buildManifest()).tryGet # build the slots and manifest diff --git a/tests/codex/slots/sampler/testsampler.nim b/tests/codex/slots/sampler/testsampler.nim index 78b245a3..bf7277a3 100644 --- a/tests/codex/slots/sampler/testsampler.nim +++ b/tests/codex/slots/sampler/testsampler.nim @@ -5,6 +5,7 @@ import ../../../asynctest import pkg/questionable/results +import pkg/taskpools import pkg/codex/stores import pkg/codex/merkletree import pkg/codex/utils/json @@ -26,11 +27,16 @@ suite "Test Sampler - control samples": inputData: string inputJson: JsonNode proofInput: ProofInputs[Poseidon2Hash] + taskpool: Taskpool setup: inputData = readFile("tests/circuits/fixtures/input.json") inputJson = !JsonNode.parse(inputData) proofInput = Poseidon2Hash.jsonToProofInput(inputJson) + taskpool = Taskpool.new() + + teardown: + taskpool.shutdown() test "Should verify control samples": let @@ -87,25 +93,29 @@ suite "Test Sampler": manifest: Manifest protected: Manifest verifiable: Manifest + taskpool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() + taskpool = Taskpool.new() + store = RepoStore.new(repoDs, metaDs) (manifest, protected, verifiable) = await createVerifiableManifest( - store, datasetBlocks, ecK, ecM, blockSize, cellSize + store, datasetBlocks, ecK, ecM, blockSize, cellSize, taskpool ) # create sampler - builder = Poseidon2Builder.new(store, verifiable).tryGet + builder = Poseidon2Builder.new(store, verifiable, taskpool).tryGet teardown: await store.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskpool.shutdown() test "Should fail instantiating for invalid slot index": let sampler = Poseidon2Sampler.new(builder.slotRoots.len, store, builder) @@ -114,7 +124,7 @@ suite "Test Sampler": test "Should fail instantiating for non verifiable builder": let - nonVerifiableBuilder = Poseidon2Builder.new(store, protected).tryGet + nonVerifiableBuilder = Poseidon2Builder.new(store, protected, taskpool).tryGet sampler = Poseidon2Sampler.new(slotIndex, store, nonVerifiableBuilder) check sampler.isErr diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index c567db55..34ff96ba 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -4,6 +4,7 @@ import pkg/chronos import pkg/libp2p/cid import pkg/codex/merkletree +import pkg/taskpools import pkg/codex/chunker import pkg/codex/blocktype as bt import pkg/codex/slots @@ -29,6 +30,7 @@ suite "Test Prover": var store: BlockStore prover: Prover + taskPool: Taskpool setup: let @@ -45,13 +47,14 @@ suite "Test Prover": numProofSamples: samples, ) backend = config.initializeBackend().tryGet() - + taskPool = Taskpool.new() store = RepoStore.new(repoDs, metaDs) - prover = Prover.new(store, backend, config.numProofSamples) + prover = Prover.new(store, backend, config.numProofSamples, taskPool) teardown: await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() test "Should sample and prove a slot": let (_, _, verifiable) = await createVerifiableManifest( @@ -61,6 +64,7 @@ suite "Test Prover": 3, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet @@ -80,6 +84,7 @@ suite "Test Prover": 1, # ecM blockSize, cellSize, + taskPool, ) let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet diff --git a/tests/codex/slots/testslotbuilder.nim b/tests/codex/slots/testslotbuilder.nim index fc3c7bd5..55f917ef 100644 --- a/tests/codex/slots/testslotbuilder.nim +++ b/tests/codex/slots/testslotbuilder.nim @@ -15,6 +15,7 @@ import pkg/codex/utils import pkg/codex/utils/digest import pkg/poseidon2 import pkg/poseidon2/io +import pkg/taskpools import ./helpers import ../helpers @@ -72,12 +73,13 @@ suite "Slot builder": protectedManifest: Manifest builder: Poseidon2Builder chunker: Chunker + taskPool: Taskpool setup: let repoDs = repoTmp.newDb() metaDs = metaTmp.newDb() - + taskPool = Taskpool.new() localStore = RepoStore.new(repoDs, metaDs) chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize) @@ -92,6 +94,7 @@ suite "Slot builder": await localStore.close() await repoTmp.destroyDb() await metaTmp.destroyDb() + taskPool.shutdown() # TODO: THIS IS A BUG IN asynctest, because it doesn't release the # objects after the test is done, so we need to do it manually @@ -113,8 +116,9 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, unprotectedManifest, cellSize = cellSize).error.msg == - "Manifest is not protected." + Poseidon2Builder.new( + localStore, unprotectedManifest, taskPool, cellSize = cellSize + ).error.msg == "Manifest is not protected." test "Number of blocks must be devisable by number of slots": let mismatchManifest = Manifest.new( @@ -131,7 +135,7 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Number of blocks must be divisible by number of slots." test "Block size must be divisable by cell size": @@ -149,12 +153,13 @@ suite "Slot builder": ) check: - Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == + Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg == "Block size must be divisible by cell size." test "Should build correct slot builder": - builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.cellSize == cellSize @@ -171,7 +176,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -196,7 +201,7 @@ suite "Slot builder": ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() for i in 0 ..< numSlots: @@ -215,8 +220,9 @@ suite "Slot builder": slotTree.root().tryGet() == expectedRoot test "Should persist trees for all slots": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() for i in 0 ..< numSlots: let @@ -242,7 +248,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() (await builder.buildSlots()).tryGet @@ -270,7 +276,7 @@ suite "Slot builder": 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks ) builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() slotsHashes = collect(newSeq): @@ -296,45 +302,53 @@ suite "Slot builder": test "Should not build from verifiable manifest with 0 slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots = @[] - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with incorrect number of slots": var builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest.slotRoots.del(verifyManifest.slotRoots.len - 1) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should not build from verifiable manifest with invalid verify root": - let builder = - Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() + let builder = Poseidon2Builder + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) + .tryGet() var verifyManifest = (await builder.buildManifest()).tryGet() rng.shuffle(Rng.instance, verifyManifest.verifyRoot.data.buffer) - check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr + check Poseidon2Builder.new( + localStore, verifyManifest, taskPool, cellSize = cellSize + ).isErr test "Should build from verifiable manifest": let builder = Poseidon2Builder - .new(localStore, protectedManifest, cellSize = cellSize) + .new(localStore, protectedManifest, taskPool, cellSize = cellSize) .tryGet() verifyManifest = (await builder.buildManifest()).tryGet() - verificationBuilder = - Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).tryGet() + verificationBuilder = Poseidon2Builder + .new(localStore, verifyManifest, taskPool, cellSize = cellSize) + .tryGet() check: builder.slotRoots == verificationBuilder.slotRoots diff --git a/tests/codex/utils/testPoseidon.nim b/tests/codex/utils/testPoseidon.nim new file mode 100644 index 00000000..aedf5fcf --- /dev/null +++ b/tests/codex/utils/testPoseidon.nim @@ -0,0 +1,40 @@ +{.push raises: [].} + +import std/[times, strformat, random] +import pkg/questionable/results + +import pkg/codex/merkletree/poseidon2 + +import pkg/codex/utils/poseidon2digest +import ../../asynctest + +test "Test poseidon2 digestTree": + randomize(42) + const + dataSize = 64 * 1024 # 64KB + chunkSize = 2 * 1024 # 2KB + iterations = 10 # Number of iterations + + echo &"Benchmarking digestTree with data size: {dataSize} bytes, chunk size: {chunkSize} bytes" + + # Generate random data + var data = newSeq[byte](dataSize) + for i in 0 ..< dataSize: + data[i] = byte(rand(255)) + + # Actual benchmark + let startTime = cpuTime() + + for i in 1 .. iterations: + let treeResult = Poseidon2Tree.digestTree(data, chunkSize).tryGet() + + # Optionally print info about each iteration + + let endTime = cpuTime() + let totalTime = endTime - startTime + let avgTime = totalTime / iterations.float + + echo &"Results:" + echo &" Total time for {iterations} iterations: {totalTime:.6f} seconds" + echo &" Average time per iteration: {avgTime:.6f} seconds" + echo &" Iterations per second: {iterations.float / totalTime:.2f}"