From 087cc4ce7b50a7e286b29adcba6a6fbfcbefb1c3 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 28 May 2025 19:05:25 -0600 Subject: [PATCH] Refactor codebase: enhance error handling, improve type definitions, and streamline async procedures --- codex/codex.nim | 28 +++---- codex/node.nim | 141 +++++++++++++++++------------------- codex/slots/builder.nim | 2 +- codex/slots/proofs.nim | 4 +- codex/slots/types.nim | 24 +++--- codex/stores/treehelper.nim | 8 +- codex/utils/arrayutils.nim | 2 - 7 files changed, 94 insertions(+), 115 deletions(-) diff --git a/codex/codex.nim b/codex/codex.nim index 3ee48d68..234f2d7f 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -208,22 +208,13 @@ proc new*( .withTcpTransport({ServerFlags.ReuseAddr}) .build() - var - cache: CacheStore = nil - taskpool: Taskpool + var tp = + try: + Taskpool.new(numThreads = int(config.numThreads)) + except CatchableError as exc: + raiseAssert("Failure in tp initialization:" & exc.msg) - try: - if config.numThreads == ThreadCount(0): - taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) - else: - taskpool = Taskpool.new(numThreads = int(config.numThreads)) - info "Threadpool started", numThreads = taskpool.numThreads - except CatchableError as exc: - raiseAssert("Failure in taskpool initialization:" & exc.msg) - - if config.cacheSize > 0'nb: - cache = CacheStore.new(cacheSize = config.cacheSize) - ## Is unused? + info "Threadpool started", numThreads = tp.numThreads let discoveryDir = config.dataDir / CodexDhtNamespace @@ -299,9 +290,8 @@ proc new*( store = NetworkStore.new(engine, repoStore) prover = if config.prover: - let backend = - config.initializeBackend().expect("Unable to create prover backend.") - some Prover.new(store, backend, config.numProofSamples) + let prover = config.initializeProver(tp).expect("Unable to create prover.") + some prover else: none Prover @@ -311,7 +301,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, - taskPool = taskpool, + taskPool = tp, ) restServer = RestServerRef diff --git a/codex/node.nim b/codex/node.nim index b742df2c..2250b431 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -46,6 +46,7 @@ import ./errors import ./logutils import ./utils/asynciter import ./utils/trackedfutures +import ./utils/poseidon2digest export logutils @@ -63,17 +64,17 @@ type ] CodexNode* = object - switch: Switch - networkId: PeerId - networkStore: NetworkStore - engine: BlockExcEngine - prover: ?Prover - discovery: Discovery - contracts*: Contracts - clock*: Clock - storage*: Contracts - taskpool: Taskpool - trackedFutures: TrackedFutures + switch: Switch # the libp2p network switch + networkId: PeerId # the peer id of the node + networkStore: NetworkStore # the network store + engine: BlockExcEngine # the block exchange engine + prover: ?Prover # the prover + discovery: Discovery # the discovery service + contracts*: Contracts # the contracts + clock*: Clock # the clock + storage*: Contracts # the storage + taskpool: Taskpool # the taskpool + trackedFutures: TrackedFutures # the tracked futures CodexNodeRef* = ref CodexNode @@ -96,18 +97,12 @@ func discovery*(self: CodexNodeRef): Discovery = proc storeManifest*( self: CodexNodeRef, manifest: Manifest -): Future[?!bt.Block] {.async.} = - without encodedVerifiable =? manifest.encode(), err: - trace "Unable to encode manifest" - return failure(err) +): Future[?!bt.Block] {.async: (raises: [CancelledError]).} = + let + encodedVerifiable = ?manifest.encode() + blk = ?bt.Block.new(data = encodedVerifiable, codec = ManifestCodec) - without blk =? bt.Block.new(data = encodedVerifiable, codec = ManifestCodec), error: - trace "Unable to create block from manifest" - return failure(error) - - if err =? (await self.networkStore.putBlock(blk)).errorOption: - trace "Unable to store manifest block", cid = blk.cid, err = err.msg - return failure(err) + ?await self.networkStore.putBlock(blk) success blk @@ -338,7 +333,9 @@ proc retrieve*( await self.streamEntireDataset(manifest, cid) -proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = +proc deleteSingleBlock( + self: CodexNodeRef, cid: Cid +): Future[?!void] {.async: (raises: [CancelledError]).} = if err =? (await self.networkStore.delBlock(cid)).errorOption: error "Error deleting block", cid, err = err.msg return failure(err) @@ -346,7 +343,9 @@ proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = trace "Deleted block", cid return success() -proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = +proc deleteEntireDataset( + self: CodexNodeRef, cid: Cid +): Future[?!void] {.async: (raises: [CancelledError]).} = # Deletion is a strictly local operation var store = self.networkStore.localStore @@ -403,7 +402,7 @@ proc store*( filename: ?string = string.none, mimetype: ?string = string.none, blockSize = DefaultBlockSize, -): Future[?!Cid] {.async.} = +): Future[?!Cid] {.async: (raises: [CancelledError]).} = ## Save stream contents as dataset with given blockSize ## to nodes's BlockStore, and return Cid of its manifest ## @@ -478,7 +477,9 @@ proc store*( return manifestBlk.cid.success -proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = +proc iterateManifests*( + self: CodexNodeRef, onManifest: OnManifest +) {.async: (raises: [CancelledError]).} = without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest): warn "Failed to listBlocks" return @@ -505,7 +506,7 @@ proc setupRequest( pricePerBytePerSecond: UInt256, collateralPerByte: UInt256, expiry: uint64, -): Future[?!StorageRequest] {.async.} = +): Future[?!StorageRequest] {.async: (raises: [CancelledError]).} = ## Setup slots for a given dataset ## @@ -527,32 +528,20 @@ proc setupRequest( trace "Setting up slots" - without manifest =? await self.fetchManifest(cid), error: - trace "Unable to fetch manifest for cid" - return failure error - - # Erasure code the dataset according to provided parameters - let erasure = Erasure.new( - self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool - ) - - without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: - trace "Unable to erasure code dataset" - return failure(error) - - without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err: - trace "Unable to create slot builder" - return failure(err) - - without verifiable =? (await builder.buildManifest()), err: - trace "Unable to build verifiable manifest" - return failure(err) - - without manifestBlk =? await self.storeManifest(verifiable), err: - trace "Unable to store verifiable manifest" - return failure(err) - let + manifest = ?await self.fetchManifest(cid) + + # Erasure code the dataset according to provided parameters + erasure = Erasure.new( + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, + self.taskpool, + ) + + encoded = ?await erasure.encode(manifest, ecK, ecM) + builder = ?Poseidon2Builder.new(self.networkStore.localStore, encoded) + verifiable = ?await builder.buildManifest() + manifestBlk = ?await self.storeManifest(verifiable) + verifyRoot = if builder.verifyRoot.isNone: return failure("No slots root") @@ -586,7 +575,7 @@ proc requestStorage*( pricePerBytePerSecond: UInt256, collateralPerByte: UInt256, expiry: uint64, -): Future[?!PurchaseId] {.async.} = +): Future[?!PurchaseId] {.async: (raises: [CancelledError]).} = ## Initiate a request for storage sequence, this might ## be a multistep procedure. ## @@ -617,7 +606,17 @@ proc requestStorage*( trace "Unable to setup request" return failure err - let purchase = await contracts.purchasing.purchase(request) + # TODO: remove try/except once state machine has checked exceptions + let purchase = + try: + await contracts.purchasing.purchase(request) + except CancelledError as err: + trace "Purchase cancelled", err = err.msg + raise err + except CatchableError as err: + trace "Unable to purchase storage", err = err.msg + return failure(err) + success purchase.id proc onStore( @@ -731,38 +730,28 @@ proc onProve( if prover =? self.prover: trace "Prover enabled" - without cid =? Cid.init(cidStr).mapFailure, err: - error "Unable to parse Cid", cid, err = err.msg - return failure(err) - - without manifest =? await self.fetchManifest(cid), err: - error "Unable to fetch manifest for cid", err = err.msg - return failure(err) + let + cid = ?Cid.init(cidStr).mapFailure + manifest = ?await self.fetchManifest(cid) + builder = + ?Poseidon2Builder.new(self.networkStore, manifest, manifest.verifiableStrategy) + sampler = ?Poseidon2Sampler.new(slotIdx, self.networkStore, builder) when defined(verify_circuit): - without (inputs, proof) =? await prover.prove(slotIdx.int, manifest, challenge), - err: - error "Unable to generate proof", err = err.msg - return failure(err) + let (proof, checked) = + ?await prover.prove(sampler, manifest, challenge, verify = true) - without checked =? await prover.verify(proof, inputs), err: - error "Unable to verify proof", err = err.msg - return failure(err) - - if not checked: + if checked.isSome and not checked.get: error "Proof verification failed" return failure("Proof verification failed") trace "Proof verified successfully" else: - without (_, proof) =? await prover.prove(slotIdx.int, manifest, challenge), err: - error "Unable to generate proof", err = err.msg - return failure(err) + let (proof, _) = ?await prover.prove(sampler, manifest, challenge, verify = false) - let groth16Proof = proof.toGroth16Proof() - trace "Proof generated successfully", groth16Proof + trace "Proof generated successfully", proof - success groth16Proof + success proof else: warn "Prover not enabled" failure "Prover not enabled" diff --git a/codex/slots/builder.nim b/codex/slots/builder.nim index 25844db6..1857150c 100644 --- a/codex/slots/builder.nim +++ b/codex/slots/builder.nim @@ -3,6 +3,6 @@ import ./converters import ../merkletree -export builder, converters +export builder, converters, merkletree type Poseidon2Builder* = SlotsBuilder[Poseidon2Tree, Poseidon2Hash] diff --git a/codex/slots/proofs.nim b/codex/slots/proofs.nim index 4f7f01b5..a1f56d9a 100644 --- a/codex/slots/proofs.nim +++ b/codex/slots/proofs.nim @@ -1,5 +1,5 @@ import ./proofs/backends import ./proofs/prover -import ./proofs/backendfactory +import ./proofs/proverfactory -export circomcompat, prover, backendfactory +export backends, prover, proverfactory diff --git a/codex/slots/types.nim b/codex/slots/types.nim index 0cd24326..aabba0ca 100644 --- a/codex/slots/types.nim +++ b/codex/slots/types.nim @@ -8,23 +8,23 @@ ## those terms. type - Sample*[H] = object - cellData*: seq[H] - merklePaths*: seq[H] + Sample*[SomeHash] = object + cellData*: seq[SomeHash] + merklePaths*: seq[SomeHash] - PublicInputs*[H] = object + PublicInputs*[SomeHash] = object slotIndex*: int - datasetRoot*: H - entropy*: H + datasetRoot*: SomeHash + entropy*: SomeHash - ProofInputs*[H] = object - entropy*: H - datasetRoot*: H + ProofInputs*[SomeHash] = object + entropy*: SomeHash + datasetRoot*: SomeHash slotIndex*: Natural - slotRoot*: H + slotRoot*: SomeHash nCellsPerSlot*: Natural nSlotsPerDataSet*: Natural - slotProof*: seq[H] + slotProof*: seq[SomeHash] # inclusion proof that shows that the slot root (leaf) is part of the dataset (root) - samples*: seq[Sample[H]] + samples*: seq[Sample[SomeHash]] # inclusion proofs which show that the selected cells (leafs) are part of the slot (roots) diff --git a/codex/stores/treehelper.nim b/codex/stores/treehelper.nim index e1f5d48d..99f8cde1 100644 --- a/codex/stores/treehelper.nim +++ b/codex/stores/treehelper.nim @@ -25,7 +25,7 @@ import ../merkletree proc putSomeProofs*( store: BlockStore, tree: CodexTree, iter: Iter[int] -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = without treeCid =? tree.rootCid, err: return failure(err) @@ -51,8 +51,10 @@ proc putSomeProofs*( proc putSomeProofs*( store: BlockStore, tree: CodexTree, iter: Iter[Natural] -): Future[?!void] = +): Future[?!void] {.async: (raises: [CancelledError], raw: true).} = store.putSomeProofs(tree, iter.map((i: Natural) => i.ord)) -proc putAllProofs*(store: BlockStore, tree: CodexTree): Future[?!void] = +proc putAllProofs*( + store: BlockStore, tree: CodexTree +): Future[?!void] {.async: (raises: [CancelledError], raw: true).} = store.putSomeProofs(tree, Iter[int].new(0 ..< tree.leavesCount)) diff --git a/codex/utils/arrayutils.nim b/codex/utils/arrayutils.nim index e36a0cb3..c6721f6b 100644 --- a/codex/utils/arrayutils.nim +++ b/codex/utils/arrayutils.nim @@ -1,5 +1,3 @@ -import std/sequtils - proc createDoubleArray*( outerLen, innerLen: int ): ptr UncheckedArray[ptr UncheckedArray[byte]] =