This commit is contained in:
munna0908 2025-07-07 23:44:52 +05:30
parent 26d17de462
commit 0c1b6822cc
No known key found for this signature in database
GPG Key ID: 2FFCD637E937D3E6
17 changed files with 269 additions and 70 deletions

View File

@ -56,7 +56,7 @@ type
codexNode: CodexNodeRef codexNode: CodexNodeRef
repoStore: RepoStore repoStore: RepoStore
maintenance: BlockMaintainer maintenance: BlockMaintainer
taskpool: Taskpool taskPool: Taskpool
CodexPrivateKey* = libp2p.PrivateKey # alias CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet EthWallet = ethers.Wallet
@ -194,8 +194,8 @@ proc stop*(s: CodexServer) {.async.} =
error "Failed to stop codex node", failures = res.failure.len error "Failed to stop codex node", failures = res.failure.len
raiseAssert "Failed to stop codex node" raiseAssert "Failed to stop codex node"
if not s.taskpool.isNil: if not s.taskPool.isNil:
s.taskpool.shutdown() s.taskPool.shutdown()
proc new*( proc new*(
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
@ -216,16 +216,16 @@ proc new*(
var var
cache: CacheStore = nil cache: CacheStore = nil
taskpool: Taskpool taskPool: Taskpool
try: try:
if config.numThreads == ThreadCount(0): if config.numThreads == ThreadCount(0):
taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) taskPool = Taskpool.new(numThreads = min(countProcessors(), 16))
else: else:
taskpool = Taskpool.new(numThreads = int(config.numThreads)) taskPool = Taskpool.new(numThreads = int(config.numThreads))
info "Threadpool started", numThreads = taskpool.numThreads info "Threadpool started", numThreads = taskPool.numThreads
except CatchableError as exc: except CatchableError as exc:
raiseAssert("Failure in taskpool initialization:" & exc.msg) raiseAssert("Failure in taskPool initialization:" & exc.msg)
if config.cacheSize > 0'nb: if config.cacheSize > 0'nb:
cache = CacheStore.new(cacheSize = config.cacheSize) cache = CacheStore.new(cacheSize = config.cacheSize)
@ -307,7 +307,7 @@ proc new*(
if config.prover: if config.prover:
let backend = let backend =
config.initializeBackend().expect("Unable to create prover backend.") config.initializeBackend().expect("Unable to create prover backend.")
some Prover.new(store, backend, config.numProofSamples) some Prover.new(store, backend, config.numProofSamples, taskPool)
else: else:
none Prover none Prover
@ -317,7 +317,7 @@ proc new*(
engine = engine, engine = engine,
discovery = discovery, discovery = discovery,
prover = prover, prover = prover,
taskPool = taskpool, taskPool = taskPool,
) )
restServer = RestServerRef restServer = RestServerRef
@ -337,5 +337,5 @@ proc new*(
restServer: restServer, restServer: restServer,
repoStore: repoStore, repoStore: repoStore,
maintenance: maintenance, maintenance: maintenance,
taskpool: taskpool, taskPool: taskPool,
) )

View File

@ -100,7 +100,7 @@ proc init*(
var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero) var tree = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero)
var task = Poseidon2TreeTask( var task = Poseidon2TreeTask(
tree: cast[ptr Poseidon2Tree](addr tree), leaves: @leaves, signal: signal tree: cast[ptr Poseidon2Tree](addr tree), leaves: leaves, signal: signal
) )
doAssert tp.numThreads > 1, doAssert tp.numThreads > 1,

View File

@ -72,7 +72,7 @@ type
contracts*: Contracts contracts*: Contracts
clock*: Clock clock*: Clock
storage*: Contracts storage*: Contracts
taskpool: Taskpool taskPool: Taskpool
trackedFutures: TrackedFutures trackedFutures: TrackedFutures
CodexNodeRef* = ref CodexNode CodexNodeRef* = ref CodexNode
@ -294,7 +294,7 @@ proc streamEntireDataset(
try: try:
# Spawn an erasure decoding job # Spawn an erasure decoding job
let erasure = Erasure.new( let erasure = Erasure.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool
) )
without _ =? (await erasure.decode(manifest)), error: without _ =? (await erasure.decode(manifest)), error:
error "Unable to erasure decode manifest", manifestCid, exc = error.msg error "Unable to erasure decode manifest", manifestCid, exc = error.msg
@ -439,7 +439,7 @@ proc store*(
finally: finally:
await stream.close() await stream.close()
without tree =? (await CodexTree.init(self.taskpool, cids)), err: without tree =? (await CodexTree.init(self.taskPool, cids)), err:
return failure(err) return failure(err)
without treeCid =? tree.rootCid(CIDv1, dataCodec), err: without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
@ -533,14 +533,15 @@ proc setupRequest(
# Erasure code the dataset according to provided parameters # Erasure code the dataset according to provided parameters
let erasure = Erasure.new( let erasure = Erasure.new(
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskPool
) )
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
trace "Unable to erasure code dataset" trace "Unable to erasure code dataset"
return failure(error) return failure(error)
without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err: without builder =?
Poseidon2Builder.new(self.networkStore.localStore, encoded, self.taskPool), err:
trace "Unable to create slot builder" trace "Unable to create slot builder"
return failure(err) return failure(err)
@ -644,7 +645,9 @@ proc onStore(
return failure(err) return failure(err)
without builder =? without builder =?
Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy), err: Poseidon2Builder.new(
self.networkStore, manifest, self.taskPool, manifest.verifiableStrategy
), err:
trace "Unable to create slots builder", err = err.msg trace "Unable to create slots builder", err = err.msg
return failure(err) return failure(err)
@ -679,7 +682,7 @@ proc onStore(
trace "start repairing slot", slotIdx trace "start repairing slot", slotIdx
try: try:
let erasure = Erasure.new( let erasure = Erasure.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskPool
) )
if err =? (await erasure.repair(manifest)).errorOption: if err =? (await erasure.repair(manifest)).errorOption:
error "Unable to erasure decode repairing manifest", error "Unable to erasure decode repairing manifest",
@ -880,7 +883,7 @@ proc new*(
networkStore: NetworkStore, networkStore: NetworkStore,
engine: BlockExcEngine, engine: BlockExcEngine,
discovery: Discovery, discovery: Discovery,
taskpool: Taskpool, taskPool: Taskpool,
prover = Prover.none, prover = Prover.none,
contracts = Contracts.default, contracts = Contracts.default,
): CodexNodeRef = ): CodexNodeRef =
@ -893,7 +896,7 @@ proc new*(
engine: engine, engine: engine,
prover: prover, prover: prover,
discovery: discovery, discovery: discovery,
taskPool: taskpool, taskPool: taskPool,
contracts: contracts, contracts: contracts,
trackedFutures: TrackedFutures(), trackedFutures: TrackedFutures(),
) )

View File

@ -18,18 +18,20 @@ import pkg/chronos
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/constantine/math/io/io_fields import pkg/constantine/math/io/io_fields
import pkg/taskpools
import ../../logutils import ../../logutils
import ../../utils import ../../utils
import ../../stores import ../../stores
import ../../manifest import ../../manifest
import ../../merkletree import ../../merkletree
import ../../utils/poseidon2digest
import ../../utils/asynciter import ../../utils/asynciter
import ../../indexingstrategy import ../../indexingstrategy
import ../converters import ../converters
export converters, asynciter export converters, asynciter, poseidon2digest
logScope: logScope:
topics = "codex slotsbuilder" topics = "codex slotsbuilder"
@ -45,6 +47,7 @@ type SlotsBuilder*[T, H] = ref object of RootObj
emptyBlock: seq[byte] # empty block emptyBlock: seq[byte] # empty block
verifiableTree: ?T # verification tree (dataset tree) verifiableTree: ?T # verification tree (dataset tree)
emptyDigestTree: T # empty digest tree for empty blocks emptyDigestTree: T # empty digest tree for empty blocks
taskPool: Taskpool
func verifiable*[T, H](self: SlotsBuilder[T, H]): bool {.inline.} = func verifiable*[T, H](self: SlotsBuilder[T, H]): bool {.inline.} =
## Returns true if the slots are verifiable. ## Returns true if the slots are verifiable.
@ -165,6 +168,35 @@ proc buildBlockTree*[T, H](
success (blk.data, tree) success (blk.data, tree)
proc getBlockDigest*[T, H](
self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural
): Future[?!H] {.async: (raises: [CancelledError]).} =
logScope:
blkIdx = blkIdx
slotPos = slotPos
numSlotBlocks = self.manifest.numSlotBlocks
cellSize = self.cellSize
trace "Building block tree"
if slotPos > (self.manifest.numSlotBlocks - 1):
# pad blocks are 0 byte blocks
trace "Returning empty digest tree for pad block"
return self.emptyDigestTree.root
without blk =? await self.store.getBlock(self.manifest.treeCid, blkIdx), err:
error "Failed to get block CID for tree at index", err = err.msg
return failure(err)
if blk.isEmpty:
return self.emptyDigestTree.root
without digest =? (await T.digest(self.taskPool, blk.data, self.cellSize.int)), err:
error "Failed to create digest for block", err = err.msg
return failure(err)
return success digest
proc getCellHashes*[T, H]( proc getCellHashes*[T, H](
self: SlotsBuilder[T, H], slotIndex: Natural self: SlotsBuilder[T, H], slotIndex: Natural
): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} = ): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} =
@ -190,8 +222,7 @@ proc getCellHashes*[T, H](
pos = i pos = i
trace "Getting block CID for tree at index" trace "Getting block CID for tree at index"
without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, without digest =? (await self.getBlockDigest(blkIdx, i)), err:
err:
error "Failed to get block CID for tree at index", err = err.msg error "Failed to get block CID for tree at index", err = err.msg
return failure(err) return failure(err)
@ -310,6 +341,7 @@ proc new*[T, H](
_: type SlotsBuilder[T, H], _: type SlotsBuilder[T, H],
store: BlockStore, store: BlockStore,
manifest: Manifest, manifest: Manifest,
taskPool: Taskpool,
strategy = LinearStrategy, strategy = LinearStrategy,
cellSize = DefaultCellSize, cellSize = DefaultCellSize,
): ?!SlotsBuilder[T, H] = ): ?!SlotsBuilder[T, H] =
@ -383,6 +415,7 @@ proc new*[T, H](
emptyBlock: emptyBlock, emptyBlock: emptyBlock,
numSlotBlocks: numSlotBlocksTotal, numSlotBlocks: numSlotBlocksTotal,
emptyDigestTree: emptyDigestTree, emptyDigestTree: emptyDigestTree,
taskPool: taskPool,
) )
if manifest.verifiable: if manifest.verifiable:

View File

@ -13,6 +13,7 @@ import pkg/chronicles
import pkg/circomcompat import pkg/circomcompat
import pkg/poseidon2 import pkg/poseidon2
import pkg/questionable/results import pkg/questionable/results
import pkg/taskpools
import pkg/libp2p/cid import pkg/libp2p/cid
@ -47,6 +48,7 @@ type
backend: AnyBackend backend: AnyBackend
store: BlockStore store: BlockStore
nSamples: int nSamples: int
taskPool: Taskpool
proc prove*( proc prove*(
self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge
@ -61,7 +63,7 @@ proc prove*(
trace "Received proof challenge" trace "Received proof challenge"
without builder =? AnyBuilder.new(self.store, manifest), err: without builder =? AnyBuilder.new(self.store, manifest, self.taskPool), err:
error "Unable to create slots builder", err = err.msg error "Unable to create slots builder", err = err.msg
return failure(err) return failure(err)
@ -88,6 +90,6 @@ proc verify*(
self.backend.verify(proof, inputs) self.backend.verify(proof, inputs)
proc new*( proc new*(
_: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int, tp: Taskpool
): Prover = ): Prover =
Prover(store: store, backend: backend, nSamples: nSamples) Prover(store: store, backend: backend, nSamples: nSamples, taskPool: tp)

View File

@ -7,13 +7,26 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/[atomics]
import pkg/poseidon2 import pkg/poseidon2
import pkg/questionable/results import pkg/questionable/results
import pkg/libp2p/multihash import pkg/libp2p/multihash
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/taskpools
import pkg/chronos
import pkg/chronos/threadsync
import ../merkletree import ../merkletree
type DigestTask* = object
signal: ThreadSignalPtr
bytes: seq[byte]
chunkSize: int
success: Atomic[bool]
digest: Isolated[Poseidon2Hash]
export DigestTask
func spongeDigest*( func spongeDigest*(
_: type Poseidon2Hash, bytes: openArray[byte], rate: static int = 2 _: type Poseidon2Hash, bytes: openArray[byte], rate: static int = 2
): ?!Poseidon2Hash = ): ?!Poseidon2Hash =
@ -30,7 +43,7 @@ func spongeDigest*(
success Sponge.digest(bytes, rate) success Sponge.digest(bytes, rate)
func digestTree*( proc digestTree*(
_: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int
): ?!Poseidon2Tree = ): ?!Poseidon2Tree =
## Hashes chunks of data with a sponge of rate 2, and combines the ## Hashes chunks of data with a sponge of rate 2, and combines the
@ -44,6 +57,7 @@ func digestTree*(
var index = 0 var index = 0
var leaves: seq[Poseidon2Hash] var leaves: seq[Poseidon2Hash]
while index < bytes.len: while index < bytes.len:
let start = index let start = index
let finish = min(index + chunkSize, bytes.len) let finish = min(index + chunkSize, bytes.len)
@ -61,6 +75,52 @@ func digest*(
(?Poseidon2Tree.digestTree(bytes, chunkSize)).root (?Poseidon2Tree.digestTree(bytes, chunkSize)).root
proc digestWorker(tp: Taskpool, task: ptr DigestTask) {.gcsafe.} =
defer:
discard task[].signal.fireSync()
var res = Poseidon2Tree.digest(task[].bytes, task[].chunkSize)
if res.isErr:
task[].success.store(false)
return
task[].digest = isolate(res.get())
task[].success.store(true)
proc digest*(
_: type Poseidon2Tree, tp: Taskpool, bytes: seq[byte], chunkSize: int
): Future[?!Poseidon2Hash] {.async: (raises: [CancelledError]).} =
without signal =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")
defer:
signal.close().expect("closing once works")
doAssert tp.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
var task = DigestTask(signal: signal, bytes: bytes, chunkSize: chunkSize)
tp.spawn digestWorker(tp, addr task)
let signalFut = signal.wait()
if err =? catch(await signalFut.join()).errorOption:
?catch(await noCancel signalFut)
if err of CancelledError:
raise (ref CancelledError) err
if not task.success.load():
return failure("digest task failed")
defer:
task.digest = default(Isolated[Poseidon2Hash])
var digest = task.digest.extract
success digest
func digestMhash*( func digestMhash*(
_: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int _: type Poseidon2Tree, bytes: openArray[byte], chunkSize: int
): ?!MultiHash = ): ?!MultiHash =

View File

@ -1,4 +1,5 @@
import std/sequtils import std/sequtils
import std/times
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/byteutils import pkg/stew/byteutils
@ -48,6 +49,7 @@ suite "Test CodexTree":
var var
expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet()) expectedLeaves = data.mapIt(MultiHash.digest($sha256, it).tryGet())
tree = CodexTree.init(leaves = expectedLeaves) tree = CodexTree.init(leaves = expectedLeaves)
check: check:
tree.isOk tree.isOk
tree.get().leaves == expectedLeaves.mapIt(it.digestBytes) tree.get().leaves == expectedLeaves.mapIt(it.digestBytes)

View File

@ -63,17 +63,19 @@ suite "Test Poseidon2Tree":
tree == fromNodes tree == fromNodes
test "Build poseidon2 tree from poseidon2 leaves asynchronously": test "Build poseidon2 tree from poseidon2 leaves asynchronously":
var tp = Taskpool.new(numThreads = 2) echo "Build poseidon2 tree from poseidon2 leaves asynchronously"
var tp = Taskpool.new()
defer: defer:
tp.shutdown() tp.shutdown()
echo "@@@@@"
let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet() let tree = (await Poseidon2Tree.init(tp, leaves = expectedLeaves)).tryGet()
check: check:
tree.leaves == expectedLeaves tree.leaves == expectedLeaves
test "Build poseidon2 tree from byte leaves asynchronously": test "Build poseidon2 tree from byte leaves asynchronously":
var tp = Taskpool.new(numThreads = 2) echo "Build poseidon2 tree from byte leaves asynchronously"
var tp = Taskpool.new()
defer: defer:
tp.shutdown() tp.shutdown()

View File

@ -56,12 +56,13 @@ asyncchecksuite "Test Node - Host contracts":
verifiable: Manifest verifiable: Manifest
verifiableBlock: bt.Block verifiableBlock: bt.Block
protected: Manifest protected: Manifest
taskPool: Taskpool
setup: setup:
# Setup Host Contracts and dependencies # Setup Host Contracts and dependencies
market = MockMarket.new() market = MockMarket.new()
sales = Sales.new(market, clock, localStore) sales = Sales.new(market, clock, localStore)
taskPool = Taskpool.new()
node.contracts = ( node.contracts = (
none ClientInteractions, none ClientInteractions,
some HostInteractions.new(clock, sales), some HostInteractions.new(clock, sales),
@ -75,20 +76,23 @@ asyncchecksuite "Test Node - Host contracts":
let let
manifestBlock = manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool)
manifestCid = manifestBlock.cid manifestCid = manifestBlock.cid
(await localStore.putBlock(manifestBlock)).tryGet() (await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, 3, 2)).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet() builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet()
verifiable = (await builder.buildManifest()).tryGet() verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock = verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
(await localStore.putBlock(verifiableBlock)).tryGet() (await localStore.putBlock(verifiableBlock)).tryGet()
teardown:
taskPool.shutdown()
test "onExpiryUpdate callback is set": test "onExpiryUpdate callback is set":
check sales.onExpiryUpdate.isSome check sales.onExpiryUpdate.isSome

View File

@ -47,10 +47,15 @@ privateAccess(CodexNodeRef) # enable access to private fields
asyncchecksuite "Test Node - Basic": asyncchecksuite "Test Node - Basic":
setupAndTearDown() setupAndTearDown()
var taskPool: Taskpool
setup: setup:
taskPool = Taskpool.new()
await node.start() await node.start()
teardown:
taskPool.shutdown()
test "Fetch Manifest": test "Fetch Manifest":
let let
manifest = await storeDataGetManifest(localStore, chunker) manifest = await storeDataGetManifest(localStore, chunker)
@ -174,13 +179,12 @@ asyncchecksuite "Test Node - Basic":
test "Setup purchase request": test "Setup purchase request":
let let
erasure = erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskPool)
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
manifest = await storeDataGetManifest(localStore, chunker) manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
protected = (await erasure.encode(manifest, 3, 2)).tryGet() protected = (await erasure.encode(manifest, 3, 2)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet() builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet()
verifiable = (await builder.buildManifest()).tryGet() verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock = verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()

View File

@ -13,6 +13,7 @@ import pkg/codex/contracts
import pkg/codex/slots import pkg/codex/slots
import pkg/codex/manifest import pkg/codex/manifest
import pkg/codex/erasure import pkg/codex/erasure
import pkg/taskpools
import pkg/codex/blocktype as bt import pkg/codex/blocktype as bt
import pkg/chronos/transports/stream import pkg/chronos/transports/stream
@ -51,6 +52,7 @@ asyncchecksuite "Test Node - Slot Repair":
) )
var var
manifest: Manifest manifest: Manifest
taskPool: Taskpool
builder: Poseidon2Builder builder: Poseidon2Builder
verifiable: Manifest verifiable: Manifest
verifiableBlock: bt.Block verifiableBlock: bt.Block
@ -100,7 +102,7 @@ asyncchecksuite "Test Node - Slot Repair":
(await localStore.putBlock(manifestBlock)).tryGet() (await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet() builder = Poseidon2Builder.new(localStore, protected, taskPool).tryGet()
verifiable = (await builder.buildManifest()).tryGet() verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock = verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
@ -118,6 +120,7 @@ asyncchecksuite "Test Node - Slot Repair":
await nodes[1].switch.stop() # slot 0 missing now await nodes[1].switch.stop() # slot 0 missing now
# repair missing slot # repair missing slot
(await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() (await nodes[4].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now await nodes[2].switch.stop() # slot 1 missing now
@ -131,16 +134,19 @@ asyncchecksuite "Test Node - Slot Repair":
await nodes[4].switch.stop() # slot 0 missing now await nodes[4].switch.stop() # slot 0 missing now
# repair missing slot from repaired slots # repair missing slot from repaired slots
(await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() (await nodes[7].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 1 missing now await nodes[5].switch.stop() # slot 1 missing now
# repair missing slot from repaired slots # repair missing slot from repaired slots
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
await nodes[6].switch.stop() # slot 2 missing now await nodes[6].switch.stop() # slot 2 missing now
# repair missing slot from repaired slots # repair missing slot from repaired slots
(await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() (await nodes[9].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
let let
@ -179,7 +185,7 @@ asyncchecksuite "Test Node - Slot Repair":
(await localStore.putBlock(manifestBlock)).tryGet() (await localStore.putBlock(manifestBlock)).tryGet()
protected = (await erasure.encode(manifest, ecK, ecM)).tryGet() protected = (await erasure.encode(manifest, ecK, ecM)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet() builder = Poseidon2Builder.new(localStore, protected, cluster.taskpool).tryGet()
verifiable = (await builder.buildManifest()).tryGet() verifiable = (await builder.buildManifest()).tryGet()
verifiableBlock = verifiableBlock =
bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet() bt.Block.new(verifiable.encode().tryGet(), codec = ManifestCodec).tryGet()
@ -198,19 +204,24 @@ asyncchecksuite "Test Node - Slot Repair":
await nodes[3].switch.stop() # slot 2 missing now await nodes[3].switch.stop() # slot 2 missing now
# repair missing slots # repair missing slots
(await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet() (await nodes[6].onStore(request, expiry, 0.uint64, nil, isRepairing = true)).tryGet()
(await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet() (await nodes[7].onStore(request, expiry, 2.uint64, nil, isRepairing = true)).tryGet()
await nodes[2].switch.stop() # slot 1 missing now await nodes[2].switch.stop() # slot 1 missing now
await nodes[4].switch.stop() # slot 3 missing now await nodes[4].switch.stop() # slot 3 missing now
# repair missing slots from repaired slots # repair missing slots from repaired slots
(await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet() (await nodes[8].onStore(request, expiry, 1.uint64, nil, isRepairing = true)).tryGet()
(await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet() (await nodes[9].onStore(request, expiry, 3.uint64, nil, isRepairing = true)).tryGet()
await nodes[5].switch.stop() # slot 4 missing now await nodes[5].switch.stop() # slot 4 missing now
# repair missing slot from repaired slots # repair missing slot from repaired slots
(await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet() (await nodes[10].onStore(request, expiry, 4.uint64, nil, isRepairing = true)).tryGet()
let let

View File

@ -3,6 +3,7 @@ import std/options
import ../../../asynctest import ../../../asynctest
import pkg/chronos import pkg/chronos
import pkg/taskpools
import pkg/poseidon2 import pkg/poseidon2
import pkg/serde/json import pkg/serde/json
@ -77,6 +78,7 @@ suite "Test Circom Compat Backend":
challenge: array[32, byte] challenge: array[32, byte]
builder: Poseidon2Builder builder: Poseidon2Builder
sampler: Poseidon2Sampler sampler: Poseidon2Sampler
taskPool: Taskpool
setup: setup:
let let
@ -85,11 +87,13 @@ suite "Test Circom Compat Backend":
store = RepoStore.new(repoDs, metaDs) store = RepoStore.new(repoDs, metaDs)
taskPool = Taskpool.new()
(manifest, protected, verifiable) = await createVerifiableManifest( (manifest, protected, verifiable) = await createVerifiableManifest(
store, numDatasetBlocks, ecK, ecM, blockSize, cellSize store, numDatasetBlocks, ecK, ecM, blockSize, cellSize, taskPool
) )
builder = Poseidon2Builder.new(store, verifiable).tryGet builder = Poseidon2Builder.new(store, verifiable, taskPool).tryGet
sampler = Poseidon2Sampler.new(slotId, store, builder).tryGet sampler = Poseidon2Sampler.new(slotId, store, builder).tryGet
circom = CircomCompat.init(r1cs, wasm, zkey) circom = CircomCompat.init(r1cs, wasm, zkey)
@ -101,6 +105,7 @@ suite "Test Circom Compat Backend":
circom.release() # this comes from the rust FFI circom.release() # this comes from the rust FFI
await repoTmp.destroyDb() await repoTmp.destroyDb()
await metaTmp.destroyDb() await metaTmp.destroyDb()
taskPool.shutdown()
test "Should verify with correct input": test "Should verify with correct input":
var proof = circom.prove(proofInputs).tryGet var proof = circom.prove(proofInputs).tryGet

View File

@ -12,6 +12,7 @@ import pkg/codex/chunker
import pkg/codex/indexingstrategy import pkg/codex/indexingstrategy
import pkg/codex/slots import pkg/codex/slots
import pkg/codex/rng import pkg/codex/rng
import pkg/taskpools
import ../helpers import ../helpers
@ -145,6 +146,7 @@ proc createVerifiableManifest*(
ecM: int, ecM: int,
blockSize: NBytes, blockSize: NBytes,
cellSize: NBytes, cellSize: NBytes,
taskPool: Taskpool,
): Future[tuple[manifest: Manifest, protected: Manifest, verifiable: Manifest]] {. ): Future[tuple[manifest: Manifest, protected: Manifest, verifiable: Manifest]] {.
async async
.} = .} =
@ -165,7 +167,9 @@ proc createVerifiableManifest*(
totalDatasetSize, totalDatasetSize,
) )
builder = Poseidon2Builder.new(store, protectedManifest, cellSize = cellSize).tryGet builder = Poseidon2Builder.new(
store, protectedManifest, cellSize = cellSize, taskPool = taskPool
).tryGet
verifiableManifest = (await builder.buildManifest()).tryGet verifiableManifest = (await builder.buildManifest()).tryGet
# build the slots and manifest # build the slots and manifest

View File

@ -5,6 +5,7 @@ import ../../../asynctest
import pkg/questionable/results import pkg/questionable/results
import pkg/taskpools
import pkg/codex/stores import pkg/codex/stores
import pkg/codex/merkletree import pkg/codex/merkletree
import pkg/codex/utils/json import pkg/codex/utils/json
@ -26,11 +27,16 @@ suite "Test Sampler - control samples":
inputData: string inputData: string
inputJson: JsonNode inputJson: JsonNode
proofInput: ProofInputs[Poseidon2Hash] proofInput: ProofInputs[Poseidon2Hash]
taskpool: Taskpool
setup: setup:
inputData = readFile("tests/circuits/fixtures/input.json") inputData = readFile("tests/circuits/fixtures/input.json")
inputJson = !JsonNode.parse(inputData) inputJson = !JsonNode.parse(inputData)
proofInput = Poseidon2Hash.jsonToProofInput(inputJson) proofInput = Poseidon2Hash.jsonToProofInput(inputJson)
taskpool = Taskpool.new()
teardown:
taskpool.shutdown()
test "Should verify control samples": test "Should verify control samples":
let let
@ -87,25 +93,29 @@ suite "Test Sampler":
manifest: Manifest manifest: Manifest
protected: Manifest protected: Manifest
verifiable: Manifest verifiable: Manifest
taskpool: Taskpool
setup: setup:
let let
repoDs = repoTmp.newDb() repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb() metaDs = metaTmp.newDb()
taskpool = Taskpool.new()
store = RepoStore.new(repoDs, metaDs) store = RepoStore.new(repoDs, metaDs)
(manifest, protected, verifiable) = await createVerifiableManifest( (manifest, protected, verifiable) = await createVerifiableManifest(
store, datasetBlocks, ecK, ecM, blockSize, cellSize store, datasetBlocks, ecK, ecM, blockSize, cellSize, taskpool
) )
# create sampler # create sampler
builder = Poseidon2Builder.new(store, verifiable).tryGet builder = Poseidon2Builder.new(store, verifiable, taskpool).tryGet
teardown: teardown:
await store.close() await store.close()
await repoTmp.destroyDb() await repoTmp.destroyDb()
await metaTmp.destroyDb() await metaTmp.destroyDb()
taskpool.shutdown()
test "Should fail instantiating for invalid slot index": test "Should fail instantiating for invalid slot index":
let sampler = Poseidon2Sampler.new(builder.slotRoots.len, store, builder) let sampler = Poseidon2Sampler.new(builder.slotRoots.len, store, builder)
@ -114,7 +124,7 @@ suite "Test Sampler":
test "Should fail instantiating for non verifiable builder": test "Should fail instantiating for non verifiable builder":
let let
nonVerifiableBuilder = Poseidon2Builder.new(store, protected).tryGet nonVerifiableBuilder = Poseidon2Builder.new(store, protected, taskpool).tryGet
sampler = Poseidon2Sampler.new(slotIndex, store, nonVerifiableBuilder) sampler = Poseidon2Sampler.new(slotIndex, store, nonVerifiableBuilder)
check sampler.isErr check sampler.isErr

View File

@ -4,6 +4,7 @@ import pkg/chronos
import pkg/libp2p/cid import pkg/libp2p/cid
import pkg/codex/merkletree import pkg/codex/merkletree
import pkg/taskpools
import pkg/codex/chunker import pkg/codex/chunker
import pkg/codex/blocktype as bt import pkg/codex/blocktype as bt
import pkg/codex/slots import pkg/codex/slots
@ -29,6 +30,7 @@ suite "Test Prover":
var var
store: BlockStore store: BlockStore
prover: Prover prover: Prover
taskPool: Taskpool
setup: setup:
let let
@ -45,13 +47,14 @@ suite "Test Prover":
numProofSamples: samples, numProofSamples: samples,
) )
backend = config.initializeBackend().tryGet() backend = config.initializeBackend().tryGet()
taskPool = Taskpool.new()
store = RepoStore.new(repoDs, metaDs) store = RepoStore.new(repoDs, metaDs)
prover = Prover.new(store, backend, config.numProofSamples) prover = Prover.new(store, backend, config.numProofSamples, taskPool)
teardown: teardown:
await repoTmp.destroyDb() await repoTmp.destroyDb()
await metaTmp.destroyDb() await metaTmp.destroyDb()
taskPool.shutdown()
test "Should sample and prove a slot": test "Should sample and prove a slot":
let (_, _, verifiable) = await createVerifiableManifest( let (_, _, verifiable) = await createVerifiableManifest(
@ -61,6 +64,7 @@ suite "Test Prover":
3, # ecM 3, # ecM
blockSize, blockSize,
cellSize, cellSize,
taskPool,
) )
let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet
@ -80,6 +84,7 @@ suite "Test Prover":
1, # ecM 1, # ecM
blockSize, blockSize,
cellSize, cellSize,
taskPool,
) )
let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet let (inputs, proof) = (await prover.prove(1, verifiable, challenge)).tryGet

View File

@ -15,6 +15,7 @@ import pkg/codex/utils
import pkg/codex/utils/digest import pkg/codex/utils/digest
import pkg/poseidon2 import pkg/poseidon2
import pkg/poseidon2/io import pkg/poseidon2/io
import pkg/taskpools
import ./helpers import ./helpers
import ../helpers import ../helpers
@ -72,12 +73,13 @@ suite "Slot builder":
protectedManifest: Manifest protectedManifest: Manifest
builder: Poseidon2Builder builder: Poseidon2Builder
chunker: Chunker chunker: Chunker
taskPool: Taskpool
setup: setup:
let let
repoDs = repoTmp.newDb() repoDs = repoTmp.newDb()
metaDs = metaTmp.newDb() metaDs = metaTmp.newDb()
taskPool = Taskpool.new()
localStore = RepoStore.new(repoDs, metaDs) localStore = RepoStore.new(repoDs, metaDs)
chunker = chunker =
RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize) RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize)
@ -92,6 +94,7 @@ suite "Slot builder":
await localStore.close() await localStore.close()
await repoTmp.destroyDb() await repoTmp.destroyDb()
await metaTmp.destroyDb() await metaTmp.destroyDb()
taskPool.shutdown()
# TODO: THIS IS A BUG IN asynctest, because it doesn't release the # TODO: THIS IS A BUG IN asynctest, because it doesn't release the
# objects after the test is done, so we need to do it manually # objects after the test is done, so we need to do it manually
@ -113,8 +116,9 @@ suite "Slot builder":
) )
check: check:
Poseidon2Builder.new(localStore, unprotectedManifest, cellSize = cellSize).error.msg == Poseidon2Builder.new(
"Manifest is not protected." localStore, unprotectedManifest, taskPool, cellSize = cellSize
).error.msg == "Manifest is not protected."
test "Number of blocks must be devisable by number of slots": test "Number of blocks must be devisable by number of slots":
let mismatchManifest = Manifest.new( let mismatchManifest = Manifest.new(
@ -131,7 +135,7 @@ suite "Slot builder":
) )
check: check:
Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg ==
"Number of blocks must be divisible by number of slots." "Number of blocks must be divisible by number of slots."
test "Block size must be divisable by cell size": test "Block size must be divisable by cell size":
@ -149,12 +153,13 @@ suite "Slot builder":
) )
check: check:
Poseidon2Builder.new(localStore, mismatchManifest, cellSize = cellSize).error.msg == Poseidon2Builder.new(localStore, mismatchManifest, taskPool, cellSize = cellSize).error.msg ==
"Block size must be divisible by cell size." "Block size must be divisible by cell size."
test "Should build correct slot builder": test "Should build correct slot builder":
builder = builder = Poseidon2Builder
Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet()
check: check:
builder.cellSize == cellSize builder.cellSize == cellSize
@ -171,7 +176,7 @@ suite "Slot builder":
) )
builder = Poseidon2Builder builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize) .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet() .tryGet()
for i in 0 ..< numSlots: for i in 0 ..< numSlots:
@ -196,7 +201,7 @@ suite "Slot builder":
) )
builder = Poseidon2Builder builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize) .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet() .tryGet()
for i in 0 ..< numSlots: for i in 0 ..< numSlots:
@ -215,8 +220,9 @@ suite "Slot builder":
slotTree.root().tryGet() == expectedRoot slotTree.root().tryGet() == expectedRoot
test "Should persist trees for all slots": test "Should persist trees for all slots":
let builder = let builder = Poseidon2Builder
Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet()
for i in 0 ..< numSlots: for i in 0 ..< numSlots:
let let
@ -242,7 +248,7 @@ suite "Slot builder":
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
) )
builder = Poseidon2Builder builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize) .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet() .tryGet()
(await builder.buildSlots()).tryGet (await builder.buildSlots()).tryGet
@ -270,7 +276,7 @@ suite "Slot builder":
0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks 0, protectedManifest.blocksCount - 1, numSlots, numSlots, numPadSlotBlocks
) )
builder = Poseidon2Builder builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize) .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet() .tryGet()
slotsHashes = collect(newSeq): slotsHashes = collect(newSeq):
@ -296,45 +302,53 @@ suite "Slot builder":
test "Should not build from verifiable manifest with 0 slots": test "Should not build from verifiable manifest with 0 slots":
var var
builder = Poseidon2Builder builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize) .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet() .tryGet()
verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest = (await builder.buildManifest()).tryGet()
verifyManifest.slotRoots = @[] verifyManifest.slotRoots = @[]
check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr check Poseidon2Builder.new(
localStore, verifyManifest, taskPool, cellSize = cellSize
).isErr
test "Should not build from verifiable manifest with incorrect number of slots": test "Should not build from verifiable manifest with incorrect number of slots":
var var
builder = Poseidon2Builder builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize) .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet() .tryGet()
verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest = (await builder.buildManifest()).tryGet()
verifyManifest.slotRoots.del(verifyManifest.slotRoots.len - 1) verifyManifest.slotRoots.del(verifyManifest.slotRoots.len - 1)
check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr check Poseidon2Builder.new(
localStore, verifyManifest, taskPool, cellSize = cellSize
).isErr
test "Should not build from verifiable manifest with invalid verify root": test "Should not build from verifiable manifest with invalid verify root":
let builder = let builder = Poseidon2Builder
Poseidon2Builder.new(localStore, protectedManifest, cellSize = cellSize).tryGet() .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet()
var verifyManifest = (await builder.buildManifest()).tryGet() var verifyManifest = (await builder.buildManifest()).tryGet()
rng.shuffle(Rng.instance, verifyManifest.verifyRoot.data.buffer) rng.shuffle(Rng.instance, verifyManifest.verifyRoot.data.buffer)
check Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).isErr check Poseidon2Builder.new(
localStore, verifyManifest, taskPool, cellSize = cellSize
).isErr
test "Should build from verifiable manifest": test "Should build from verifiable manifest":
let let
builder = Poseidon2Builder builder = Poseidon2Builder
.new(localStore, protectedManifest, cellSize = cellSize) .new(localStore, protectedManifest, taskPool, cellSize = cellSize)
.tryGet() .tryGet()
verifyManifest = (await builder.buildManifest()).tryGet() verifyManifest = (await builder.buildManifest()).tryGet()
verificationBuilder = verificationBuilder = Poseidon2Builder
Poseidon2Builder.new(localStore, verifyManifest, cellSize = cellSize).tryGet() .new(localStore, verifyManifest, taskPool, cellSize = cellSize)
.tryGet()
check: check:
builder.slotRoots == verificationBuilder.slotRoots builder.slotRoots == verificationBuilder.slotRoots

View File

@ -0,0 +1,40 @@
{.push raises: [].}
import std/[times, strformat, random]
import pkg/questionable/results
import pkg/codex/merkletree/poseidon2
import pkg/codex/utils/poseidon2digest
import ../../asynctest
test "Test poseidon2 digestTree":
randomize(42)
const
dataSize = 64 * 1024 # 64KB
chunkSize = 2 * 1024 # 2KB
iterations = 10 # Number of iterations
echo &"Benchmarking digestTree with data size: {dataSize} bytes, chunk size: {chunkSize} bytes"
# Generate random data
var data = newSeq[byte](dataSize)
for i in 0 ..< dataSize:
data[i] = byte(rand(255))
# Actual benchmark
let startTime = cpuTime()
for i in 1 .. iterations:
let treeResult = Poseidon2Tree.digestTree(data, chunkSize).tryGet()
# Optionally print info about each iteration
let endTime = cpuTime()
let totalTime = endTime - startTime
let avgTime = totalTime / iterations.float
echo &"Results:"
echo &" Total time for {iterations} iterations: {totalTime:.6f} seconds"
echo &" Average time per iteration: {avgTime:.6f} seconds"
echo &" Iterations per second: {iterations.float / totalTime:.2f}"