From 7c75cc866397116ba4fe2dffad53c90cee97e7fe Mon Sep 17 00:00:00 2001 From: munna0908 Date: Sat, 1 Mar 2025 23:07:05 +0530 Subject: [PATCH] inital commit --- codex/slots/proofs/backends/circomcompat.nim | 84 ++++++----- codex/slots/proofs/prover.nim | 125 ++++++++++++++-- tests/codex/slots/testprover.nim | 2 +- tests/integration/multinodes.nim | 1 + tests/integration/testsales.nim | 142 +++++++++---------- 5 files changed, 232 insertions(+), 122 deletions(-) diff --git a/codex/slots/proofs/backends/circomcompat.nim b/codex/slots/proofs/backends/circomcompat.nim index b9f8e84c..e44019ed 100644 --- a/codex/slots/proofs/backends/circomcompat.nim +++ b/codex/slots/proofs/backends/circomcompat.nim @@ -38,7 +38,8 @@ type zkeyPath: string # path to the zkey file backendCfg: ptr CircomBn254Cfg vkp*: ptr CircomKey - taskpool: Taskpool + taskpool*: Taskpool + withLock* Lock NormalizedProofInputs*[H] {.borrow: `.`.} = distinct ProofInputs[H] @@ -97,7 +98,7 @@ proc release*(self: CircomCompat) = if not isNil(self.vkp): self.vkp.unsafeAddr.release_key() -proc circomProveTask(task: ptr ProveTask) {.gcsafe.} = +proc circomProveTask(tp:Taskpool,task: ptr ProveTask) {.gcsafe.} = defer: discard task[].signal.fireSync() @@ -118,7 +119,7 @@ proc circomProveTask(task: ptr ProveTask) {.gcsafe.} = proc asyncProve*[H]( self: CircomCompat, input: NormalizedProofInputs[H], proof: ptr Proof -): Future[?!void] {.async.} = +): ?!bool = doAssert input.samples.len == self.numSamples, "Number of samples does not match" doAssert input.slotProof.len <= self.datasetDepth, @@ -182,11 +183,11 @@ proc asyncProve*[H]( for s in input.samples: var - merklePaths = s.merklePaths.mapIt(@(it.toBytes)).concat + merklePaths = s.merklePaths.mapIt(it.toBytes) data = s.cellData.mapIt(@(it.toBytes)).concat if ctx.push_input_u256_array( - "merklePaths".cstring, merklePaths[0].addr, uint (merklePaths.len) + "merklePaths".cstring, merklePaths[0].addr, uint (merklePaths[0].len * merklePaths.len) ) != ERR_OK: return failure("Failed to push merkle paths") @@ -200,46 +201,59 @@ proc asyncProve*[H]( defer: threadPtr.close().expect("closing once works") - var task = ProveTask(circom: addr self, ctx: ctx, proof: proof, signal: threadPtr) + # var task = ProveTask(circom: addr self, ctx: ctx, proof: proof, signal: threadPtr) - let taskPtr = addr task + # let taskPtr = addr task - doAssert task.circom.taskpool.numThreads > 1, - "Must have at least one separate thread or signal will never be fired" - task.circom.taskpool.spawn circomProveTask(taskPtr) - let threadFut = threadPtr.wait() + # doAssert task.circom.taskpool.numThreads > 1, + # "Must have at least one separate thread or signal will never be fired" + # task.circom.taskpool.spawn circomProveTask(taskPtr) + # let threadFut = threadPtr.wait() - try: - await threadFut.join() - except CatchableError as exc: + # try: + # await threadFut.join() + # except CatchableError as exc: + # try: + # await threadFut + # except AsyncError as asyncExc: + # return failure(asyncExc.msg) + # finally: + # if exc of CancelledError: + # raise (ref CancelledError) exc + # else: + # return failure(exc.msg) + + # if not task.success.load(): + # return failure("Failed to prove circuit") + + var proofPtr: ptr Proof = nil + + let proof1 = try: - await threadFut - except AsyncError as asyncExc: - return failure(asyncExc.msg) + if (let res = self.backendCfg.prove_circuit(ctx, proofPtr.addr); res != ERR_OK) or + proofPtr == nil: + return failure("Failed to prove - err code: " & $res) + + proofPtr[] finally: - if exc of CancelledError: - raise (ref CancelledError) exc - else: - return failure(exc.msg) + if proofPtr != nil: + proofPtr.addr.release_proof() + + echo "Printing non copied proof" + echo proof1 - if not task.success.load(): - return failure("Failed to prove circuit") + echo - success() + + copyProof(proof, proof1) + + success(true) proc prove*[H]( - self: CircomCompat, input: ProofInputs[H] -): Future[?!CircomProof] {.async, raises: [CancelledError].} = - var proof = ProofPtr.new() - defer: - destroyProof(proof) + self: CircomCompat, input: ptr NormalizedProofInputs[H],proof: ProofPtr +): ?!bool = + return self.asyncProve(input[], proof) - try: - if error =? (await self.asyncProve(self.normalizeInput(input), proof)).errorOption: - return failure(error) - return success(deepCopy(proof)[]) - except CancelledError as exc: - raise exc proc circomVerifyTask(task: ptr VerifyTask) {.gcsafe.} = defer: diff --git a/codex/slots/proofs/prover.nim b/codex/slots/proofs/prover.nim index ee75bbfe..d53b8ac9 100644 --- a/codex/slots/proofs/prover.nim +++ b/codex/slots/proofs/prover.nim @@ -8,7 +8,10 @@ ## those terms. ## +import std/atomics + import pkg/chronos +import pkg/chronos/threadsync import pkg/chronicles import pkg/circomcompat import pkg/poseidon2 @@ -46,6 +49,92 @@ type store: BlockStore nSamples: int + ProveTask[H] = object + circom: ptr CircomCompat + proof: ptr Proof + inputs: ptr NormalizedProofInputs[H] + success: Atomic[bool] + signal: ThreadSignalPtr + +# proc circomProveTask*(tp: Taskpool, t: ptr ProveTask) {.gcsafe.} = # prove slot +# defer: +# discard t[].signal.fireSync() +# without _ =? t[].backend[].prove(t.proofInputs[]), err: +# error "Unable to prove slot", err = err.msg +# t.success.store(false) +# return + +# t.success.store(true) + +proc createProofTask[H]( + backend: AnyBackend, + proofInputs: ptr NormalizedProofInputs[H], + proof: ProofPtr, + signal: ThreadSignalPtr, +): ProveTask[H] = + ProveTask[H](circom: addr backend, inputs: proofInputs, proof: proof, signal: signal) + +proc circomProveTask(tp: Taskpool, t: ptr ProveTask) {.gcsafe.} = + defer: + discard t[].signal.fireSync() + + without _ =? t[].circom[].prove(t.inputs, t.proof), err: + error "Unable to prove slot", err = err.msg + t.success.store(false) + return + + t.success.store(true) + +proc asyncProve*( + self: Prover, input: ProofInputs, proof: ptr Proof +): Future[?!void] {.async: (raises: [CancelledError]).} = + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var normalInputs = self.backend.normalizeInput(input) + + var + + #var task = ProveTask(circom: addr self.backend,inputs:addr normalInputs, proof: proof, signal: threadPtr) + task = createProofTask(self.backend, addr normalInputs, proof, threadPtr) + let taskPtr = addr task + doAssert self.backend.taskpool.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + self.backend.taskpool.spawn circomProveTask(self.backend.taskpool, taskPtr) + let threadFut = threadPtr.wait() + + try: + await threadFut.join() + except CatchableError as exc: + try: + await threadFut + except AsyncError as asyncExc: + return failure(asyncExc.msg) + finally: + if exc of CancelledError: + raise (ref CancelledError) exc + else: + return failure(exc.msg) + + if not taskPtr.success.load(): + return failure("Failed to prove") + + success() + +proc verify*( + self: Prover, proof: AnyProof, inputs: AnyProofInputs +): Future[?!bool] {.async.} = + ## Prove a statement using backend. + ## Returns a future that resolves to a proof. + without res =? (await self.backend.verify(proof, inputs)), err: + error "Unable to verify proof", err = err.msg + return failure(err) + + return success(res) + proc prove*( self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge ): Future[?!(AnyProofInputs, AnyProof)] {.async.} = @@ -71,24 +160,30 @@ proc prove*( error "Unable to get proof input for slot", err = err.msg return failure(err) - # prove slot - without proof =? await self.backend.prove(proofInput), err: - error "Unable to prove slot", err = err.msg - return failure(err) + var taskProof = ProofPtr.new() + defer: + destroyProof(taskProof) + + # without _ =? await self.asyncProve(proofInput,taskProof),err: + # return failure(err) + + try: + if err =? (await self.asyncProve(proofInput, taskProof)).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + + var proof: Proof + + copyProof(proof.addr, taskProof[]) + + without success =? await self.verify(taskProof[], proofInput), err: + echo "&&&&&&&&&&&&&&&&&", err.msg + + echo "#################", success success (proofInput, proof) -proc verify*( - self: Prover, proof: AnyProof, inputs: AnyProofInputs -): Future[?!bool] {.async.} = - ## Prove a statement using backend. - ## Returns a future that resolves to a proof. - without res =? (await self.backend.verify(proof, inputs)), err: - error "Unable to verify proof", err = err.msg - return failure(err) - - return success(res) - proc new*( _: type Prover, store: BlockStore, backend: AnyBackend, nSamples: int ): Prover = diff --git a/tests/codex/slots/testprover.nim b/tests/codex/slots/testprover.nim index 895be1ea..30f0c3a7 100644 --- a/tests/codex/slots/testprover.nim +++ b/tests/codex/slots/testprover.nim @@ -140,7 +140,7 @@ suite "Test Prover": destroyProof(cancelledProof) # call asyncProve and cancel the task - let proveFut = backend.asyncProve(backend.normalizeInput(inputs), cancelledProof) + let proveFut = prover.asyncProve(inputs, cancelledProof) proveFut.cancel() try: diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index bade6899..123be7f3 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -217,6 +217,7 @@ template multinodesuite*(name: string, body: untyped) = proc startProviderNode(conf: CodexConfig): Future[NodeProcess] {.async.} = let providerIdx = providers().len var config = conf + config.debugEnabled = true config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl) config.addCliOption( StartUpCmd.persistence, "--eth-account", $accounts[running.len] diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim index 6c5c30d5..b24de9fe 100644 --- a/tests/integration/testsales.nim +++ b/tests/integration/testsales.nim @@ -29,84 +29,84 @@ multinodesuite "Sales": host = providers()[0].client client = clients()[0].client - test "node handles new storage availability", salesConfig: - let availability1 = host.postAvailability( - totalSize = 1.uint64, - duration = 2.uint64, - minPricePerBytePerSecond = 3.u256, - totalCollateral = 4.u256, - ).get - let availability2 = host.postAvailability( - totalSize = 4.uint64, - duration = 5.uint64, - minPricePerBytePerSecond = 6.u256, - totalCollateral = 7.u256, - ).get - check availability1 != availability2 + # test "node handles new storage availability", salesConfig: + # let availability1 = host.postAvailability( + # totalSize = 1.uint64, + # duration = 2.uint64, + # minPricePerBytePerSecond = 3.u256, + # totalCollateral = 4.u256, + # ).get + # let availability2 = host.postAvailability( + # totalSize = 4.uint64, + # duration = 5.uint64, + # minPricePerBytePerSecond = 6.u256, + # totalCollateral = 7.u256, + # ).get + # check availability1 != availability2 - test "node lists storage that is for sale", salesConfig: - let availability = host.postAvailability( - totalSize = 1.uint64, - duration = 2.uint64, - minPricePerBytePerSecond = 3.u256, - totalCollateral = 4.u256, - ).get - check availability in host.getAvailabilities().get + # test "node lists storage that is for sale", salesConfig: + # let availability = host.postAvailability( + # totalSize = 1.uint64, + # duration = 2.uint64, + # minPricePerBytePerSecond = 3.u256, + # totalCollateral = 4.u256, + # ).get + # check availability in host.getAvailabilities().get - test "updating non-existing availability", salesConfig: - let nonExistingResponse = host.patchAvailabilityRaw( - AvailabilityId.example, - duration = 100.uint64.some, - minPricePerBytePerSecond = 2.u256.some, - totalCollateral = 200.u256.some, - ) - check nonExistingResponse.status == "404 Not Found" + # test "updating non-existing availability", salesConfig: + # let nonExistingResponse = host.patchAvailabilityRaw( + # AvailabilityId.example, + # duration = 100.uint64.some, + # minPricePerBytePerSecond = 2.u256.some, + # totalCollateral = 200.u256.some, + # ) + # check nonExistingResponse.status == "404 Not Found" - test "updating availability", salesConfig: - let availability = host.postAvailability( - totalSize = 140000.uint64, - duration = 200.uint64, - minPricePerBytePerSecond = 3.u256, - totalCollateral = 300.u256, - ).get + # test "updating availability", salesConfig: + # let availability = host.postAvailability( + # totalSize = 140000.uint64, + # duration = 200.uint64, + # minPricePerBytePerSecond = 3.u256, + # totalCollateral = 300.u256, + # ).get - host.patchAvailability( - availability.id, - duration = 100.uint64.some, - minPricePerBytePerSecond = 2.u256.some, - totalCollateral = 200.u256.some, - ) + # host.patchAvailability( + # availability.id, + # duration = 100.uint64.some, + # minPricePerBytePerSecond = 2.u256.some, + # totalCollateral = 200.u256.some, + # ) - let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get - check updatedAvailability.duration == 100.uint64 - check updatedAvailability.minPricePerBytePerSecond == 2 - check updatedAvailability.totalCollateral == 200 - check updatedAvailability.totalSize == 140000.uint64 - check updatedAvailability.freeSize == 140000.uint64 + # let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get + # check updatedAvailability.duration == 100.uint64 + # check updatedAvailability.minPricePerBytePerSecond == 2 + # check updatedAvailability.totalCollateral == 200 + # check updatedAvailability.totalSize == 140000.uint64 + # check updatedAvailability.freeSize == 140000.uint64 - test "updating availability - freeSize is not allowed to be changed", salesConfig: - let availability = host.postAvailability( - totalSize = 140000.uint64, - duration = 200.uint64, - minPricePerBytePerSecond = 3.u256, - totalCollateral = 300.u256, - ).get - let freeSizeResponse = - host.patchAvailabilityRaw(availability.id, freeSize = 110000.uint64.some) - check freeSizeResponse.status == "400 Bad Request" - check "not allowed" in freeSizeResponse.body + # test "updating availability - freeSize is not allowed to be changed", salesConfig: + # let availability = host.postAvailability( + # totalSize = 140000.uint64, + # duration = 200.uint64, + # minPricePerBytePerSecond = 3.u256, + # totalCollateral = 300.u256, + # ).get + # let freeSizeResponse = + # host.patchAvailabilityRaw(availability.id, freeSize = 110000.uint64.some) + # check freeSizeResponse.status == "400 Bad Request" + # check "not allowed" in freeSizeResponse.body - test "updating availability - updating totalSize", salesConfig: - let availability = host.postAvailability( - totalSize = 140000.uint64, - duration = 200.uint64, - minPricePerBytePerSecond = 3.u256, - totalCollateral = 300.u256, - ).get - host.patchAvailability(availability.id, totalSize = 100000.uint64.some) - let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get - check updatedAvailability.totalSize == 100000 - check updatedAvailability.freeSize == 100000 + # test "updating availability - updating totalSize", salesConfig: + # let availability = host.postAvailability( + # totalSize = 140000.uint64, + # duration = 200.uint64, + # minPricePerBytePerSecond = 3.u256, + # totalCollateral = 300.u256, + # ).get + # host.patchAvailability(availability.id, totalSize = 100000.uint64.some) + # let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get + # check updatedAvailability.totalSize == 100000 + # check updatedAvailability.freeSize == 100000 test "updating availability - updating totalSize does not allow bellow utilized", salesConfig: