diff --git a/benchmarks/bench_ec_g1_batch.nim b/benchmarks/bench_ec_g1_batch.nim index 0a20f97..15ae82a 100644 --- a/benchmarks/bench_ec_g1_batch.nim +++ b/benchmarks/bench_ec_g1_batch.nim @@ -73,14 +73,14 @@ proc main() = for numPoints in testNumPoints: let batchIters = max(1, Iters div numPoints) multiAddParallelBench(ECP_ShortW_Jac[Fp[curve], G1], numPoints, batchIters) - separator() - for numPoints in testNumPoints: - let batchIters = max(1, Iters div numPoints) - multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = false, batchIters) - separator() - for numPoints in testNumPoints: - let batchIters = max(1, Iters div numPoints) - multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = true, batchIters) + # separator() + # for numPoints in testNumPoints: + # let batchIters = max(1, Iters div numPoints) + # multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = false, batchIters) + # separator() + # for numPoints in testNumPoints: + # let batchIters = max(1, Iters div numPoints) + # multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = true, batchIters) separator() separator() diff --git a/benchmarks/bench_elliptic_parallel_template.nim b/benchmarks/bench_elliptic_parallel_template.nim index 4c64927..c293a58 100644 --- a/benchmarks/bench_elliptic_parallel_template.nim +++ b/benchmarks/bench_elliptic_parallel_template.nim @@ -43,7 +43,7 @@ proc multiAddParallelBench*(EC: typedesc, numPoints: int, iters: int) = var r{.noInit.}: EC - var tp = Threadpool.new() + let tp = Threadpool.new() bench("EC parallel batch add (" & align($tp.numThreads, 2) & " threads) " & $EC.G & " (" & $numPoints & " points)", EC, iters): tp.sum_reduce_vartime_parallel(r, points) diff --git a/benchmarks/bench_ethereum_bls_signatures.nim b/benchmarks/bench_ethereum_bls_signatures.nim index a69bc15..c615e45 100644 --- a/benchmarks/bench_ethereum_bls_signatures.nim +++ b/benchmarks/bench_ethereum_bls_signatures.nim @@ -9,22 +9,25 @@ import # Internals ../constantine/[ - ethereum_bls_signatures, + ethereum_bls_signatures_parallel, ethereum_eip2333_bls12381_key_derivation], ../constantine/math/arithmetic, + ../constantine/threadpool/threadpool, + # Std + std/[os, cpuinfo], # Helpers ../helpers/prng_unsafe, ./bench_blueprint -proc separator*() = separator(167) +proc separator*() = separator(180) proc report(op, curve: string, startTime, stopTime: MonoTime, startClk, stopClk: int64, iters: int) = let ns = inNanoseconds((stopTime-startTime) div iters) let throughput = 1e9 / float64(ns) when SupportsGetTicks: - echo &"{op:<75} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op {(stopClk - startClk) div iters:>9} CPU cycles (approx)" + echo &"{op:<88} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op {(stopClk - startClk) div iters:>9} CPU cycles (approx)" else: - echo &"{op:<75} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op" + echo &"{op:<8} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op" template bench(op: string, curve: string, iters: int, body: untyped): untyped = measure(iters, startTime, stopTime, startClk, stopClk, body) @@ -184,6 +187,43 @@ proc benchVerifyBatched*(numSigs, iters: int) = let ok = batch_verify(pubkeys, messages, signatures, secureBlindingBytes) doAssert ok == cttBLS_Success +proc benchVerifyBatchedParallel*(numSigs, iters: int) = + ## Verification of N pubkeys signing for N messages + + var + tp: Threadpool + pubkeys: seq[PublicKey] + messages: seq[array[32, byte]] + signatures: seq[Signature] + + var hashedMsg: array[32, byte] + var sig: Signature + + + var numThreads: int + if existsEnv"CTT_NUM_THREADS": + numThreads = getEnv"CTT_NUM_THREADS".parseInt() + else: + numThreads = countProcessors() + tp = Threadpool.new(numThreads) + + for i in 0 ..< numSigs: + let (sk, pk) = demoKeyGen() + sha256.hash(hashedMsg, "msg" & $i) + sig.sign(sk, hashedMsg) + + pubkeys.add pk + messages.add hashedMsg + signatures.add sig + + let secureBlindingBytes = sha256.hash("Mr F was here") + + bench("BLS parallel batch verify (" & $tp.numThreads & " threads) of " & $numSigs & " msgs by "& $numSigs & " pubkeys (with blinding)", "BLS12_381", iters): + let ok = tp.batch_verify_parallel(pubkeys, messages, signatures, secureBlindingBytes) + doAssert ok == cttBLS_Success, "invalid status: " & $ok + + tp.shutdown() + const Iters = 1000 proc main() = @@ -202,16 +242,19 @@ proc main() = # Simulate Block verification (at most 6 signatures per block) benchVerifyMulti(numSigs = 6, iters = 10) benchVerifyBatched(numSigs = 6, iters = 10) + benchVerifyBatchedParallel(numSigs = 6, iters = 10) separator() # Simulate 10 blocks verification benchVerifyMulti(numSigs = 60, iters = 10) benchVerifyBatched(numSigs = 60, iters = 10) + benchVerifyBatchedParallel(numSigs = 60, iters = 10) separator() # Simulate 30 blocks verification benchVerifyMulti(numSigs = 180, iters = 10) benchVerifyBatched(numSigs = 180, iters = 10) + benchVerifyBatchedParallel(numSigs = 180, iters = 10) separator() main() diff --git a/benchmarks/bench_ethereum_bls_signatures.nim.cfg b/benchmarks/bench_ethereum_bls_signatures.nim.cfg new file mode 100644 index 0000000..9d57ecf --- /dev/null +++ b/benchmarks/bench_ethereum_bls_signatures.nim.cfg @@ -0,0 +1 @@ +--threads:on \ No newline at end of file diff --git a/benchmarks/bench_ethereum_eip4844_kzg.nim b/benchmarks/bench_ethereum_eip4844_kzg.nim new file mode 100644 index 0000000..3a55707 --- /dev/null +++ b/benchmarks/bench_ethereum_eip4844_kzg.nim @@ -0,0 +1,231 @@ +# Constantine +# Copyright (c) 2018-2019 Status Research & Development GmbH +# Copyright (c) 2020-Present Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + # Internals + ../constantine/ethereum_eip4844_kzg_parallel, + ../constantine/math/io/io_fields, + ../constantine/math/config/[curves, type_ff], + ../constantine/threadpool/threadpool, + ../constantine/csprngs/sysrand, + ../constantine/platforms/primitives, + # Helpers + ../helpers/prng_unsafe, + ./bench_blueprint + +proc separator*() = separator(180) + +proc report(op, threads: string, startTime, stopTime: MonoTime, startClk, stopClk: int64, iters: int) = + let ns = inNanoseconds((stopTime-startTime) div iters) + let throughput = 1e9 / float64(ns) + when SupportsGetTicks: + echo &"{op:<40} {threads:<16} {throughput:>15.3f} ops/s {ns:>9} ns/op {(stopClk - startClk) div iters:>9} CPU cycles (approx)" + else: + echo &"{op:<40} {threads:<16} {throughput:>15.3f} ops/s {ns:>9} ns/op" + +template bench(op, threads: string, iters: int, body: untyped): untyped = + measure(iters, startTime, stopTime, startClk, stopClk, body) + report(op, threads, startTime, stopTime, startClk, stopClk, iters) + +type + BenchSet[N: static int] = ref object + blobs: array[N, Blob] + commitments: array[N, array[48, byte]] + proofs: array[N, array[48, byte]] + # This is only used for `verify_kzg_proof` and + # there is no short-circuit if they don't match + challenge, eval_at_challenge: array[32, byte] + +proc new(T: type BenchSet, ctx: ptr EthereumKZGContext): T = + new(result) + for i in 0 ..< result.N: + let t {.noInit.} = rng.random_unsafe(Fr[BLS12_381]) + result.blobs[i].marshal(t, bigEndian) + discard ctx.blob_to_kzg_commitment(result.commitments[i], result.blobs[i].addr) + discard ctx.compute_blob_kzg_proof(result.proofs[i], result.blobs[i].addr, result.commitments[i]) + + let challenge = rng.random_unsafe(Fr[BLS12_381]) + let eval_at_challenge = rng.random_unsafe(Fr[BLS12_381]) + + discard result.challenge.marshal(challenge, bigEndian) + discard result.eval_at_challenge.marshal(eval_at_challenge, bigEndian) + +proc benchBlobToKzgCommitment(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = + + let startSerial = getMonotime() + block: + bench("blob_to_kzg_commitment", "serial", iters): + var commitment {.noInit.}: array[48, byte] + doAssert cttEthKZG_Success == ctx.blob_to_kzg_commitment(commitment, b.blobs[0].addr) + let stopSerial = getMonotime() + + ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches + let tp = Threadpool.new() + + let startParallel = getMonotime() + block: + bench("blob_to_kzg_commitment", $tp.numThreads & " threads", iters): + var commitment {.noInit.}: array[48, byte] + doAssert cttEthKZG_Success == tp.blob_to_kzg_commitment_parallel(ctx, commitment, b.blobs[0].addr) + let stopParallel = getMonotime() + + let perfSerial = inNanoseconds((stopSerial-startSerial) div iters) + let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) + + let parallelSpeedup = float(perfSerial) / float(perfParallel) + echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + +proc benchComputeKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = + + let startSerial = getMonotime() + block: + bench("compute_kzg_proof", "serial", iters): + var proof {.noInit.}: array[48, byte] + var eval_at_challenge {.noInit.}: array[32, byte] + doAssert cttEthKZG_Success == ctx.compute_kzg_proof(proof, eval_at_challenge, b.blobs[0].addr, b.challenge) + let stopSerial = getMonotime() + + ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches + let tp = Threadpool.new() + + let startParallel = getMonotime() + block: + bench("compute_kzg_proof", $tp.numThreads & " threads", iters): + var proof {.noInit.}: array[48, byte] + var eval_at_challenge {.noInit.}: array[32, byte] + doAssert cttEthKZG_Success == tp.compute_kzg_proof_parallel(ctx, proof, eval_at_challenge, b.blobs[0].addr, b.challenge) + let stopParallel = getMonotime() + + let perfSerial = inNanoseconds((stopSerial-startSerial) div iters) + let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) + + let parallelSpeedup = float(perfSerial) / float(perfParallel) + echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + +proc benchComputeBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = + + let startSerial = getMonotime() + block: + bench("compute_blob_kzg_proof", "serial", iters): + var proof {.noInit.}: array[48, byte] + doAssert cttEthKZG_Success == ctx.compute_blob_kzg_proof(proof, b.blobs[0].addr, b.commitments[0]) + let stopSerial = getMonotime() + + ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches + let tp = Threadpool.new() + + let startParallel = getMonotime() + block: + bench("compute_blob_kzg_proof", $tp.numThreads & " threads", iters): + var proof {.noInit.}: array[48, byte] + doAssert cttEthKZG_Success == tp.compute_blob_kzg_proof_parallel(ctx, proof, b.blobs[0].addr, b.commitments[0]) + let stopParallel = getMonotime() + + let perfSerial = inNanoseconds((stopSerial-startSerial) div iters) + let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) + + let parallelSpeedup = float(perfSerial) / float(perfParallel) + echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + +proc benchVerifyKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = + + bench("verify_kzg_proof", "serial", iters): + discard ctx.verify_kzg_proof(b.commitments[0], b.challenge, b.eval_at_challenge, b.proofs[0]) + + echo "verify_kzg_proof is always serial" + +proc benchVerifyBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = + + let startSerial = getMonotime() + block: + bench("verify_blob_kzg_proof", "serial", iters): + discard ctx.verify_blob_kzg_proof(b.blobs[0].addr, b.commitments[0], b.proofs[0]) + let stopSerial = getMonotime() + + ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches + let tp = Threadpool.new() + + let startParallel = getMonotime() + block: + bench("verify_blob_kzg_proof", $tp.numThreads & " threads", iters): + discard tp.verify_blob_kzg_proof_parallel(ctx, b.blobs[0].addr, b.commitments[0], b.proofs[0]) + let stopParallel = getMonotime() + + let perfSerial = inNanoseconds((stopSerial-startSerial) div iters) + let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) + + let parallelSpeedup = float(perfSerial) / float(perfParallel) + echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + +proc benchVerifyBlobKzgProofBatch(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) = + + var secureRandomBytes {.noInit.}: array[32, byte] + discard sysrand(secureRandomBytes) + + var i = 1 + + while i <= b.N: + + let startSerial = getMonotime() + block: + bench("verify_blob_kzg_proof (batch " & $i & ')', "serial", iters): + discard verify_blob_kzg_proof_batch( + ctx, + b.blobs.asUnchecked(), + b.commitments.asUnchecked(), + b.proofs.asUnchecked(), + i, + secureRandomBytes) + let stopSerial = getMonotime() + + ## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches + let tp = Threadpool.new() + + let startParallel = getMonotime() + block: + bench("verify_blob_kzg_proof (batch " & $i & ')', $tp.numThreads & " threads", iters): + discard tp.verify_blob_kzg_proof_batch_parallel( + ctx, + b.blobs.asUnchecked(), + b.commitments.asUnchecked(), + b.proofs.asUnchecked(), + i, + secureRandomBytes) + let stopParallel = getMonotime() + + let perfSerial = inNanoseconds((stopSerial-startSerial) div iters) + let perfParallel = inNanoseconds((stopParallel-startParallel) div iters) + + let parallelSpeedup = float(perfSerial) / float(perfParallel) + echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x" + echo "" + + i *= 2 + + +const Iters = 100 +proc main() = + let ctx = load_ethereum_kzg_test_trusted_setup_mainnet() + let b = BenchSet[64].new(ctx) + separator() + benchBlobToKzgCommitment(b, ctx, Iters) + echo "" + benchComputeKzgProof(b, ctx, Iters) + echo "" + benchComputeBlobKzgProof(b, ctx, Iters) + echo "" + benchVerifyKzgProof(b, ctx, Iters) + echo "" + benchVerifyBlobKzgProof(b, ctx, Iters) + echo "" + benchVerifyBlobKzgProofBatch(b, ctx, Iters) + separator() + + +when isMainModule: + main() \ No newline at end of file diff --git a/benchmarks/bench_ethereum_eip4844_kzg.nim.cfg b/benchmarks/bench_ethereum_eip4844_kzg.nim.cfg new file mode 100644 index 0000000..9d57ecf --- /dev/null +++ b/benchmarks/bench_ethereum_eip4844_kzg.nim.cfg @@ -0,0 +1 @@ +--threads:on \ No newline at end of file diff --git a/constantine.nimble b/constantine.nimble index e1b0307..7827eb5 100644 --- a/constantine.nimble +++ b/constantine.nimble @@ -497,6 +497,7 @@ const testDesc: seq[tuple[path: string, useGMP: bool]] = @[ ("tests/t_ethereum_bls_signatures.nim", false), ("tests/t_ethereum_eip2333_bls12381_key_derivation.nim", false), ("tests/t_ethereum_eip4844_deneb_kzg.nim", false), + ("tests/t_ethereum_eip4844_deneb_kzg_parallel.nim", false), ] const testDescNvidia: seq[string] = @[ @@ -555,6 +556,7 @@ const benchDesc = [ "bench_sha256", "bench_hash_to_curve", "bench_ethereum_bls_signatures", + "bench_ethereum_eip4844_kzg", "bench_evm_modexp_dos", "bench_gmp_modexp", "bench_gmp_modmul" @@ -974,3 +976,8 @@ task bench_hash_to_curve, "Run Hash-to-Curve benchmarks": # ------------------------------------------ task bench_ethereum_bls_signatures, "Run Ethereum BLS signatures benchmarks - CC compiler": runBench("bench_ethereum_bls_signatures") + +# EIP 4844 - KZG Polynomial Commitments +# ------------------------------------------ +task bench_ethereum_eip4844_kzg, "Run Ethereum EIP4844 KZG Polynomial commitment - CC compiler": + runBench("bench_ethereum_eip4844_kzg") diff --git a/constantine/commitments/kzg_polynomial_commitments.nim b/constantine/commitments/kzg_polynomial_commitments.nim index dfb6f53..a6d9fbc 100644 --- a/constantine/commitments/kzg_polynomial_commitments.nim +++ b/constantine/commitments/kzg_polynomial_commitments.nim @@ -397,6 +397,7 @@ func kzg_verify_batch*[bits: static int, F2; C: static Curve]( freeHeapAligned(commits_min_evals) # ∑[rᵢ][zᵢ][proofᵢ]₁ + # ------------------ var tmp {.noInit.}: Fr[C] for i in 0 ..< n: tmp.prod(linearIndepRandNumbers[i], challenges[i]) @@ -406,6 +407,7 @@ func kzg_verify_batch*[bits: static int, F2; C: static Curve]( freeHeapAligned(coefs) # e(∑ [rᵢ][proofᵢ]₁, [τ]₂) . e(∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁) + ∑[rᵢ][zᵢ][proofᵢ]₁, [-1]₂) = 1 + # ----------------------------------------------------------------------------------------------------------- template sum_of_sums: untyped = sums_jac[1] sum_of_sums.sum_vartime(sum_commit_minus_evals_G1, sum_rand_challenge_proofs) diff --git a/constantine/commitments/kzg_polynomial_commitments_parallel.nim b/constantine/commitments/kzg_polynomial_commitments_parallel.nim new file mode 100644 index 0000000..a733938 --- /dev/null +++ b/constantine/commitments/kzg_polynomial_commitments_parallel.nim @@ -0,0 +1,267 @@ +# Constantine +# Copyright (c) 2018-2019 Status Research & Development GmbH +# Copyright (c) 2020-Present Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + ../math/config/curves, + ../math/[ec_shortweierstrass, arithmetic, extension_fields], + ../math/elliptic/[ec_multi_scalar_mul_parallel, ec_shortweierstrass_batch_ops], + ../math/pairings/pairings_generic, + ../math/constants/zoo_generators, + ../math/polynomials/polynomials, + ../platforms/[abstractions, views], + ../threadpool/threadpool + +import ./kzg_polynomial_commitments {.all.} +export kzg_polynomial_commitments + +## ############################################################ +## +## KZG Polynomial Commitments +## Parallel Edition +## +## ############################################################ + +# KZG - Prover - Lagrange basis +# ------------------------------------------------------------ + +proc kzg_commit_parallel*[N: static int, C: static Curve]( + tp: Threadpool, + commitment: var ECP_ShortW_Aff[Fp[C], G1], + poly_evals: array[N, BigInt], + powers_of_tau: PolynomialEval[N, G1aff[C]]) = + ## KZG Commit to a polynomial in Lagrange / Evaluation form + ## Parallelism: This only returns when computation is fully done + var commitmentJac {.noInit.}: ECP_ShortW_Jac[Fp[C], G1] + tp.multiScalarMul_vartime_parallel(commitmentJac, poly_evals, powers_of_tau.evals) + commitment.affine(commitmentJac) + +proc kzg_prove_parallel*[N: static int, C: static Curve]( + tp: Threadpool, + proof: var ECP_ShortW_Aff[Fp[C], G1], + eval_at_challenge: var Fr[C], + poly: ptr PolynomialEval[N, Fr[C]], + domain: ptr PolyDomainEval[N, Fr[C]], + challenge: ptr Fr[C], + powers_of_tau: PolynomialEval[N, G1aff[C]], + isBitReversedDomain: static bool) = + ## KZG prove commitment to a polynomial in Lagrange / Evaluation form + ## + ## Outputs: + ## - proof + ## - eval_at_challenge + ## + ## Parallelism: This only returns when computation is fully done + # Note: + # The order of inputs in + # `kzg_prove`, `evalPolyAt`, `differenceQuotientEvalOffDomain`, `differenceQuotientEvalInDomain` + # minimizes register changes when parameter passing. + # + # z = challenge in the following code + + let diffQuotientPolyFr = allocHeapAligned(PolynomialEval[N, Fr[C]], alignment = 64) + let invRootsMinusZ = allocHeapAligned(array[N, Fr[C]], alignment = 64) + + # Compute 1/(ωⁱ - z) with ω a root of unity, i in [0, N). + # zIndex = i if ωⁱ - z == 0 (it is the i-th root of unity) and -1 otherwise. + let zIndex = invRootsMinusZ[].inverseRootsMinusZ_vartime( + domain[], challenge[], + earlyReturnOnZero = false) + + if zIndex == -1: + # p(z) + tp.evalPolyAt_parallel( + eval_at_challenge, + poly, challenge, + invRootsMinusZ, + domain) + + # q(x) = (p(x) - p(z)) / (x - z) + tp.differenceQuotientEvalOffDomain_parallel( + diffQuotientPolyFr, + poly, eval_at_challenge.addr, invRootsMinusZ) + else: + # p(z) + # But the challenge z is equal to one of the roots of unity (how likely is that?) + eval_at_challenge = poly.evals[zIndex] + + # q(x) = (p(x) - p(z)) / (x - z) + tp.differenceQuotientEvalInDomain_parallel( + diffQuotientPolyFr, + poly, uint32 zIndex, invRootsMinusZ, domain, isBitReversedDomain) + + freeHeapAligned(invRootsMinusZ) + + const orderBits = C.getCurveOrderBitwidth() + let diffQuotientPolyBigInt = allocHeapAligned(array[N, BigInt[orderBits]], alignment = 64) + + syncScope: + tp.parallelFor i in 0 ..< N: + captures: {diffQuotientPolyBigInt, diffQuotientPolyFr} + diffQuotientPolyBigInt[i].fromField(diffQuotientPolyFr.evals[i]) + + freeHeapAligned(diffQuotientPolyFr) + + var proofJac {.noInit.}: ECP_ShortW_Jac[Fp[C], G1] + tp.multiScalarMul_vartime_parallel(proofJac, diffQuotientPolyBigInt[], powers_of_tau.evals) + proof.affine(proofJac) + + freeHeapAligned(diffQuotientPolyBigInt) + +proc kzg_verify_batch_parallel*[bits: static int, F2; C: static Curve]( + tp: Threadpool, + commitments: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]], + challenges: ptr UncheckedArray[Fr[C]], + evals_at_challenges: ptr UncheckedArray[BigInt[bits]], + proofs: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]], + linearIndepRandNumbers: ptr UncheckedArray[Fr[C]], + n: int, + tauG2: ECP_ShortW_Aff[F2, G2]): bool {.tags:[HeapAlloc, Alloca, Vartime].} = + ## Verify multiple KZG proofs efficiently + ## + ## Parameters + ## + ## `n` verification sets + ## A verification set i (commitmentᵢ, challengeᵢ, eval_at_challengeᵢ, proofᵢ) + ## is passed in a "struct-of-arrays" fashion. + ## + ## Notation: + ## i ∈ [0, n), a verification set with ID i + ## [a]₁ corresponds to the scalar multiplication [a]G by the generator G of the group 𝔾1 + ## + ## - `commitments`: `n` commitments [commitmentᵢ]₁ + ## - `challenges`: `n` challenges zᵢ + ## - `evals_at_challenges`: `n` evaluation yᵢ = pᵢ(zᵢ) + ## - `proofs`: `n` [proof]₁ + ## - `linearIndepRandNumbers`: `n` linearly independant numbers that are not in control + ## of a prover (potentially malicious). + ## - `n`: the number of verification sets + ## + ## For all (commitmentᵢ, challengeᵢ, eval_at_challengeᵢ, proofᵢ), + ## we verify the relation + ## proofᵢ.(τ - zᵢ) = pᵢ(τ)-pᵢ(zᵢ) + ## + ## As τ is the secret from the trusted setup, boxed in [τ]₁ and [τ]₂, + ## we rewrite the equality check using pairings + ## + ## e([proofᵢ]₁, [τ]₂ - [challengeᵢ]₂) . e([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁, [-1]₂) = 1 + ## + ## Or batched using Feist-Khovratovich method + ## + ## e(∑ [rᵢ][proofᵢ]₁, [τ]₂) . e(∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁) + ∑[rᵢ][zᵢ][proofᵢ]₁, [-1]₂) = 1 + ## + ## Parallelism: This only returns when computation is fully done + + static: doAssert BigInt[bits] is matchingOrderBigInt(C) + + var sums_jac {.noInit.}: array[2, ECP_ShortW_Jac[Fp[C], G1]] + template sum_rand_proofs: untyped = sums_jac[0] + template sum_commit_minus_evals_G1: untyped = sums_jac[1] + var sum_rand_challenge_proofs {.noInit.}: ECP_ShortW_Jac[Fp[C], G1] + + # ∑ [rᵢ][proofᵢ]₁ + # --------------- + let coefs = allocHeapArrayAligned(matchingOrderBigInt(C), n, alignment = 64) + + syncScope: + tp.parallelFor i in 0 ..< n: + captures: {coefs, linearIndepRandNumbers} + coefs[i].fromField(linearIndepRandNumbers[i]) + + let sum_rand_proofs_fv = tp.spawnAwaitable tp.multiScalarMul_vartime_parallel(sum_rand_proofs.addr, coefs, proofs, n) + + # ∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁) + # --------------------------------------------- + # + # We interleave allocation and deallocation, which hurts cache reuse + # i.e. when alloc is being done, it's better to do all allocs as the metadata will already be in cache + # + # but it's more important to minimize memory usage especially if we want to commit with 2^26+ points + # + # We dealloc in reverse alloc order, to avoid leaving holes in the allocator pages. + proc compute_sum_commitments_minus_evals(tp: Threadpool, + sum_commit_minus_evals_G1: ptr ECP_ShortW_Jac[Fp[C], G1], + commitments: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]], + evals_at_challenges: ptr UncheckedArray[BigInt[bits]], + coefs: ptr UncheckedArray[BigInt[bits]], + n: int) {.nimcall.} = + let commits_min_evals = allocHeapArrayAligned(ECP_ShortW_Aff[Fp[C], G1], n, alignment = 64) + let commits_min_evals_jac = allocHeapArrayAligned(ECP_ShortW_Jac[Fp[C], G1], n, alignment = 64) + + syncScope: + tp.parallelFor i in 0 ..< n: + captures: {commits_min_evals_jac, commitments, evals_at_challenges} + + commits_min_evals_jac[i].fromAffine(commitments[i]) + var boxed_eval {.noInit.}: ECP_ShortW_Jac[Fp[C], G1] + boxed_eval.fromAffine(C.getGenerator("G1")) + boxed_eval.scalarMul_vartime(evals_at_challenges[i]) + commits_min_evals_jac[i].diff_vartime(commits_min_evals_jac[i], boxed_eval) + + commits_min_evals.batchAffine(commits_min_evals_jac, n) + freeHeapAligned(commits_min_evals_jac) + tp.multiScalarMul_vartime(sum_commit_minus_evals_G1, coefs, commits_min_evals, n) + freeHeapAligned(commits_min_evals) + + let sum_commit_minus_evals_G1_fv = tp.spawnAwaitable tp.compute_sum_commitments_minus_evals( + sum_commit_minus_evals_G1.addr, + commitments, + evals_at_challenges, + coefs, + n) + + # ∑[rᵢ][zᵢ][proofᵢ]₁ + # ------------------ + proc compute_sum_rand_challenge_proofs(tp: Threadpool, + sum_rand_challenge_proofs: ptr ECP_ShortW_Jac[Fp[C], G1], + linearIndepRandNumbers: ptr UncheckedArray[Fr[C]], + challenges: ptr UncheckedArray[Fr[C]], + proofs: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]], + n: int) {.nimcall.} = + + let rand_coefs = allocHeapArrayAligned(matchingOrderBigInt(C), n, alignment = 64) + let rand_coefs_fr = allocHeapArrayAligned(Fr[C], n, alignment = 64) + + syncScope: + tp.parallelFor i in 0 ..< n: + rand_coefs_fr[i].prod(linearIndepRandNumbers[i], challenges[i]) + rand_coefs[i].fromField(rand_coefs_fr[i]) + + tp.multiScalarMul_vartime(sum_rand_challenge_proofs, rand_coefs, proofs, n) + + freeHeapAligned(rand_coefs_fr) + freeHeapAligned(rand_coefs) + + let sum_rand_challenge_proofs_fv = tp.spawnAwaitable tp.compute_sum_rand_challenge_proofs( + sum_rand_challenge_proofs, + linearIndepRandNumbers, + challenges, + proofs, + n) + + # e(∑ [rᵢ][proofᵢ]₁, [τ]₂) . e(∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁) + ∑[rᵢ][zᵢ][proofᵢ]₁, [-1]₂) = 1 + # ----------------------------------------------------------------------------------------------------------- + template sum_of_sums: untyped = sums_jac[1] + + discard sync sum_commit_minus_evals_G1_fv + discard sync sum_rand_challenge_proofs_fv + + sum_of_sums.sum_vartime(sum_commit_minus_evals_G1, sum_rand_challenge_proofs) + + discard sync sum_rand_proofs_fv + freeHeapAligned(coefs) + + var sums {.noInit.}: array[2, ECP_ShortW_Aff[Fp[C], G1]] + sums.batchAffine(sums_jac) + + var negG2 {.noInit.}: ECP_ShortW_Aff[F2, G2] + negG2.neg(C.getGenerator("G2")) + + var gt {.noInit.}: C.getGT() + gt.pairing(sums, [tauG2, negG2]) + + return gt.isOne().bool() \ No newline at end of file diff --git a/constantine/ethereum_bls_signatures_parallel.nim b/constantine/ethereum_bls_signatures_parallel.nim new file mode 100644 index 0000000..c86388c --- /dev/null +++ b/constantine/ethereum_bls_signatures_parallel.nim @@ -0,0 +1,146 @@ +# Constantine +# Copyright (c) 2018-2019 Status Research & Development GmbH +# Copyright (c) 2020-Present Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +## ############################################################ +## +## BLS Signatures on for Ethereum +## Parallel edition +## +## ############################################################ + +when not compileOption("threads"): + {.error: "This requires --threads:on compilation flag".} + +# Reexport the serial API +import ./ethereum_bls_signatures {.all.} +export ethereum_bls_signatures + +import + std/importutils, + ./zoo_exports, + ./platforms/views, + ./threadpool/threadpool, + ./signatures/bls_signatures_parallel + +# No exceptions allowed in core cryptographic operations +{.push raises: [].} +{.push checks: off.} + +# C FFI +proc batch_verify_parallel*[Msg]( + tp: Threadpool, + pubkeys: ptr UncheckedArray[PublicKey], + messages: ptr UncheckedArray[View[byte]], + signatures: ptr UncheckedArray[Signature], + len: int, + secureRandomBytes: array[32, byte]): CttBLSStatus {.libPrefix: prefix_ffi.} = + ## Verify that all (pubkey, message, signature) triplets are valid + ## returns `true` if all signatures are valid, `false` if at least one is invalid. + ## + ## For message domain separation purpose, the tag is `BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_` + ## + ## Input: + ## - Public keys initialized by one of the key derivation or deserialization procedure. + ## Or validated via validate_pubkey + ## - Messages + ## - Signatures initialized by one of the key derivation or deserialization procedure. + ## Or validated via validate_signature + ## + ## In particular, the public keys and signature are assumed to be on curve subgroup checked. + ## + ## To avoid splitting zeros and rogue keys attack: + ## 1. Cryptographically-secure random bytes must be provided. + ## 2. Augmentation or Proof of possessions must used for each public keys. + ## + ## The secureRandomBytes will serve as input not under the attacker control to foil potential splitting zeros inputs. + ## The scheme assumes that the attacker cannot + ## resubmit 2^64 times forged (publickey, message, signature) triplets + ## against the same `secureRandomBytes` + + privateAccess(PublicKey) + privateAccess(Signature) + + if len == 0: + # IETF spec precondition + return cttBLS_ZeroLengthAggregation + + # Deal with cases were pubkey or signature were mistakenly zero-init, due to a generic aggregation tentative for example + for i in 0 ..< len: + if pubkeys[i].raw.isInf().bool: + return cttBLS_PointAtInfinity + + for i in 0 ..< len: + if signatures[i].raw.isInf().bool: + return cttBLS_PointAtInfinity + + let verified = tp.batchVerify_parallel( + pubkeys.toOpenArray(len).unwrap(), + messages, + signatures.toOpenArray(len).unwrap(), + sha256, 128, DomainSeparationTag, secureRandomBytes) + if verified: + return cttBLS_Success + return cttBLS_VerificationFailure + +# Nim +proc batch_verify_parallel*[Msg]( + tp: Threadpool, + pubkeys: openArray[PublicKey], + messages: openarray[Msg], + signatures: openArray[Signature], + secureRandomBytes: array[32, byte]): CttBLSStatus = + ## Verify that all (pubkey, message, signature) triplets are valid + ## returns `true` if all signatures are valid, `false` if at least one is invalid. + ## + ## For message domain separation purpose, the tag is `BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_` + ## + ## Input: + ## - Public keys initialized by one of the key derivation or deserialization procedure. + ## Or validated via validate_pubkey + ## - Messages + ## - Signatures initialized by one of the key derivation or deserialization procedure. + ## Or validated via validate_signature + ## + ## In particular, the public keys and signature are assumed to be on curve subgroup checked. + ## + ## To avoid splitting zeros and rogue keys attack: + ## 1. Cryptographically-secure random bytes must be provided. + ## 2. Augmentation or Proof of possessions must used for each public keys. + ## + ## The secureRandomBytes will serve as input not under the attacker control to foil potential splitting zeros inputs. + ## The scheme assumes that the attacker cannot + ## resubmit 2^64 times forged (publickey, message, signature) triplets + ## against the same `secureRandomBytes` + + privateAccess(PublicKey) + privateAccess(Signature) + + if pubkeys.len == 0: + # IETF spec precondition + return cttBLS_ZeroLengthAggregation + + if pubkeys.len != messages.len or pubkeys.len != signatures.len: + return cttBLS_InconsistentLengthsOfInputs + + # Deal with cases were pubkey or signature were mistakenly zero-init, due to a generic aggregation tentative for example + for i in 0 ..< pubkeys.len: + if pubkeys[i].raw.isInf().bool: + return cttBLS_PointAtInfinity + + for i in 0 ..< signatures.len: + if signatures[i].raw.isInf().bool: + return cttBLS_PointAtInfinity + + let verified = tp.batchVerify_parallel( + pubkeys.unwrap(), + messages, + signatures.unwrap(), + sha256, 128, DomainSeparationTag, secureRandomBytes) + if verified: + return cttBLS_Success + return cttBLS_VerificationFailure \ No newline at end of file diff --git a/constantine/ethereum_eip4844_kzg_polynomial_commitments.nim b/constantine/ethereum_eip4844_kzg.nim similarity index 95% rename from constantine/ethereum_eip4844_kzg_polynomial_commitments.nim rename to constantine/ethereum_eip4844_kzg.nim index f5651b8..47a91bb 100644 --- a/constantine/ethereum_eip4844_kzg_polynomial_commitments.nim +++ b/constantine/ethereum_eip4844_kzg.nim @@ -102,7 +102,7 @@ func fromDigest(dst: var Fr[BLS12_381], src: array[32, byte]) = Fr[BLS12_381].getNegInvModWord(), Fr[BLS12_381].getSpareBits()) -func fiatShamirChallenge(dst: var Fr[BLS12_381], blob: Blob, commitmentBytes: array[BYTES_PER_COMMITMENT, byte]) = +func fiatShamirChallenge(dst: ptr Fr[BLS12_381], blob: ptr Blob, commitmentBytes: ptr array[BYTES_PER_COMMITMENT, byte]) = ## Compute a Fiat-Shamir challenge ## compute_challenge: https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/polynomial-commitments.md#compute_challenge var transcript {.noInit.}: sha256 @@ -114,12 +114,12 @@ func fiatShamirChallenge(dst: var Fr[BLS12_381], blob: Blob, commitmentBytes: ar transcript.update(default(array[16-sizeof(uint64), byte])) transcript.update(FIELD_ELEMENTS_PER_BLOB.uint64.toBytes(bigEndian)) - transcript.update(blob) - transcript.update(commitmentBytes) + transcript.update(blob[]) + transcript.update(commitmentBytes[]) var challenge {.noInit.}: array[32, byte] transcript.finish(challenge) - dst.fromDigest(challenge) + dst[].fromDigest(challenge) func computePowers(dst: ptr UncheckedArray[Fr[BLS12_381]], len: int, base: Fr[BLS12_381]) = ## We need linearly independent random numbers @@ -217,7 +217,7 @@ func blob_to_field_polynomial( # - Either we are in "HappyPath" section that shortcuts to resource cleanup on error # - or there are no resources to clean and we can early return from a function. -template check(evalExpr: CttCodecScalarStatus): untyped {.dirty.} = +template checkReturn(evalExpr: CttCodecScalarStatus): untyped {.dirty.} = # Translate codec status code to KZG status code # Beware of resource cleanup like heap allocation, this can early exit the caller. block: @@ -227,7 +227,7 @@ template check(evalExpr: CttCodecScalarStatus): untyped {.dirty.} = of cttCodecScalar_Zero: discard of cttCodecScalar_ScalarLargerThanCurveOrder: return cttEthKZG_ScalarLargerThanCurveOrder -template check(evalExpr: CttCodecEccStatus): untyped {.dirty.} = +template checkReturn(evalExpr: CttCodecEccStatus): untyped {.dirty.} = # Translate codec status code to KZG status code # Beware of resource cleanup like heap allocation, this can early exit the caller. block: @@ -248,7 +248,7 @@ template check(Section: untyped, evalExpr: CttCodecScalarStatus): untyped {.dirt case status of cttCodecScalar_Success: discard of cttCodecScalar_Zero: discard - of cttCodecScalar_ScalarLargerThanCurveOrder: result = cttEthKZG_EccPointNotInSubGroup; break Section + of cttCodecScalar_ScalarLargerThanCurveOrder: result = cttEthKZG_ScalarLargerThanCurveOrder; break Section template check(Section: untyped, evalExpr: CttCodecEccStatus): untyped {.dirty.} = # Translate codec status code to KZG status code @@ -305,8 +305,8 @@ func compute_kzg_proof*( blob: ptr Blob, z_bytes: array[32, byte]): CttEthKzgStatus {.tags:[Alloca, HeapAlloc, Vartime].} = ## Generate: + ## - A proof of correct evaluation. ## - y = p(z), the evaluation of p at the challenge z, with p being the Blob interpreted as a polynomial. - ## - A zero-knowledge proof of correct evaluation. ## ## Mathematical description ## [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁, with p(τ) being the commitment, i.e. the evaluation of p at the powers of τ @@ -320,7 +320,7 @@ func compute_kzg_proof*( # Random or Fiat-Shamir challenge var z {.noInit.}: Fr[BLS12_381] - check z.bytes_to_bls_field(z_bytes) + checkReturn z.bytes_to_bls_field(z_bytes) let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64) @@ -354,16 +354,16 @@ func verify_kzg_proof*( ## Verify KZG proof that p(z) == y where p(z) is the polynomial represented by "polynomial_kzg" var commitment {.noInit.}: KZGCommitment - check commitment.bytes_to_kzg_commitment(commitment_bytes) + checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes) var challenge {.noInit.}: matchingOrderBigInt(BLS12_381) - check challenge.bytes_to_bls_bigint(z_bytes) + checkReturn challenge.bytes_to_bls_bigint(z_bytes) var eval_at_challenge {.noInit.}: matchingOrderBigInt(BLS12_381) - check eval_at_challenge.bytes_to_bls_bigint(y_bytes) + checkReturn eval_at_challenge.bytes_to_bls_bigint(y_bytes) var proof {.noInit.}: KZGProof - check proof.bytes_to_kzg_proof(proof_bytes) + checkReturn proof.bytes_to_kzg_proof(proof_bytes) let verif = kzg_verify(ECP_ShortW_Aff[Fp[BLS12_381], G1](commitment), challenge, eval_at_challenge, @@ -383,7 +383,7 @@ func compute_blob_kzg_proof*( ## This method does not verify that the commitment is correct with respect to `blob`. var commitment {.noInit.}: KZGCommitment - check commitment.bytes_to_kzg_commitment(commitment_bytes) + checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes) # Blob -> Polynomial let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64) @@ -394,7 +394,7 @@ func compute_blob_kzg_proof*( # Fiat-Shamir challenge var challenge {.noInit.}: Fr[BLS12_381] - challenge.fiatShamirChallenge(blob[], commitment_bytes) + challenge.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr) # KZG Prove var y {.noInit.}: Fr[BLS12_381] # y = p(z), eval at challenge z @@ -421,10 +421,10 @@ func verify_blob_kzg_proof*( ## Given a blob and a KZG proof, verify that the blob data corresponds to the provided commitment. var commitment {.noInit.}: KZGCommitment - check commitment.bytes_to_kzg_commitment(commitment_bytes) + checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes) var proof {.noInit.}: KZGProof - check proof.bytes_to_kzg_proof(proof_bytes) + checkReturn proof.bytes_to_kzg_proof(proof_bytes) let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64) let invRootsMinusZ = allocHeapAligned(array[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], alignment = 64) @@ -435,7 +435,7 @@ func verify_blob_kzg_proof*( # Fiat-Shamir challenge var challengeFr {.noInit.}: Fr[BLS12_381] - challengeFr.fiatShamirChallenge(blob[], commitment_bytes) + challengeFr.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr) var challenge, eval_at_challenge {.noInit.}: matchingOrderBigInt(BLS12_381) challenge.fromField(challengeFr) @@ -510,7 +510,7 @@ func verify_blob_kzg_proof_batch*( for i in 0 ..< n: check HappyPath, commitments[i].bytes_to_kzg_commitment(commitments_bytes[i]) check HappyPath, poly.blob_to_field_polynomial(blobs[i].addr) - challenges[i].fiatShamirChallenge(blobs[i], commitments_bytes[i]) + challenges[i].addr.fiatShamirChallenge(blobs[i].addr, commitments_bytes[i].addr) # Lagrange Polynomial evaluation # ------------------------------ diff --git a/constantine/ethereum_eip4844_kzg_parallel.nim b/constantine/ethereum_eip4844_kzg_parallel.nim new file mode 100644 index 0000000..9ff2238 --- /dev/null +++ b/constantine/ethereum_eip4844_kzg_parallel.nim @@ -0,0 +1,452 @@ +# Constantine +# Copyright (c) 2018-2019 Status Research & Development GmbH +# Copyright (c) 2020-Present Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import ethereum_eip4844_kzg {.all.} +export ethereum_eip4844_kzg + +import + ./math/config/curves, + ./math/[ec_shortweierstrass, arithmetic, extension_fields], + ./math/polynomials/polynomials_parallel, + ./hashes, + ./commitments/kzg_polynomial_commitments_parallel, + ./serialization/[codecs_status_codes, codecs_bls12_381], + ./math/io/io_fields, + ./platforms/[abstractions, allocs], + ./threadpool/threadpool + +## ############################################################ +## +## KZG Polynomial Commitments for Ethereum +## Parallel Edition +## +## ############################################################ +## +## This module implements KZG Polynomial commitments (Kate, Zaverucha, Goldberg) +## for the Ethereum blockchain. +## +## References: +## - Ethereum spec: +## https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/polynomial-commitments.md +## - KZG Paper: +## Constant-Size Commitments to Polynomials and Their Applications +## Kate, Zaverucha, Goldberg, 2010 +## https://www.iacr.org/archive/asiacrypt2010/6477178/6477178.pdf +## https://cacr.uwaterloo.ca/techreports/2010/cacr2010-10.pdf +## - Audited reference implementation +## https://github.com/ethereum/c-kzg-4844 + +proc blob_to_bigint_polynomial_parallel( + tp: Threadpool, + dst: ptr PolynomialEval[FIELD_ELEMENTS_PER_BLOB, matchingOrderBigInt(BLS12_381)], + blob: ptr Blob): CttCodecScalarStatus = + ## Convert a blob to a polynomial in evaluation form + mixin globalStatus + + static: + doAssert sizeof(dst[]) == sizeof(Blob) + doAssert sizeof(array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]) == sizeof(Blob) + + let view = cast[ptr array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]](blob) + + tp.parallelFor i in 0 ..< FIELD_ELEMENTS_PER_BLOB: + captures: {dst, view} + reduceInto(globalStatus: CttCodecScalarStatus): + prologue: + var workerStatus = cttCodecScalar_Success + forLoop: + let iterStatus = dst.evals[i].bytes_to_bls_bigint(view[i]) + if workerStatus == cttCodecScalar_Success: + # Propagate errors, if any it comes from current iteration + workerStatus = iterStatus + merge(remoteFutureStatus: Flowvar[CttCodecScalarStatus]): + let remoteStatus = sync(remoteFutureStatus) + if workerStatus == cttCodecScalar_Success: + # Propagate errors, if any it comes from remote worker + workerStatus = remoteStatus + epilogue: + return workerStatus + + return sync(globalStatus) + +proc blob_to_field_polynomial_parallel_async( + tp: Threadpool, + dst: ptr PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], + blob: ptr Blob): Flowvar[CttCodecScalarStatus] = + ## Convert a blob to a polynomial in evaluation form + ## The result is a `Flowvar` handle and MUST be awaited with `sync` + mixin globalStatus + + static: + doAssert sizeof(dst[]) == sizeof(Blob) + doAssert sizeof(array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]) == sizeof(Blob) + + let view = cast[ptr array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]](blob) + + tp.parallelFor i in 0 ..< FIELD_ELEMENTS_PER_BLOB: + captures: {dst, view} + reduceInto(globalStatus: CttCodecScalarStatus): + prologue: + var workerStatus = cttCodecScalar_Success + forLoop: + let iterStatus = dst.evals[i].bytes_to_bls_field(view[i]) + if workerStatus == cttCodecScalar_Success: + # Propagate errors, if any it comes from current iteration + workerStatus = iterStatus + merge(remoteFutureStatus: Flowvar[CttCodecScalarStatus]): + let remoteStatus = sync(remoteFutureStatus) + if workerStatus == cttCodecScalar_Success: + # Propagate errors, if any it comes from remote worker + workerStatus = remoteStatus + epilogue: + return workerStatus + + return globalStatus + +# Ethereum KZG public API +# ------------------------------------------------------------ +# +# We use a simple goto state machine to handle errors and cleanup (if allocs were done) +# and have 2 different checks: +# - Either we are in "HappyPath" section that shortcuts to resource cleanup on error +# - or there are no resources to clean and we can early return from a function. + +func kzgifyStatus(status: CttCodecScalarStatus or CttCodecEccStatus): CttEthKzgStatus {.inline.} = + checkReturn status + +proc blob_to_kzg_commitment_parallel*( + tp: Threadpool, + ctx: ptr EthereumKZGContext, + dst: var array[48, byte], + blob: ptr Blob): CttEthKzgStatus = + ## Compute a commitment to the `blob`. + ## The commitment can be verified without needing the full `blob` + ## + ## Mathematical description + ## commitment = [p(τ)]₁ + ## + ## The blob data is used as a polynomial, + ## the polynomial is evaluated at powers of tau τ, a trusted setup. + ## + ## Verification can be done by verifying the relation: + ## proof.(τ - z) = p(τ)-p(z) + ## which doesn't require the full blob but only evaluations of it + ## - at τ, p(τ) is the commitment + ## - and at the verification challenge z. + ## + ## with proof = [(p(τ) - p(z)) / (τ-z)]₁ + + let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, matchingOrderBigInt(BLS12_381)], 64) + + block HappyPath: + check HappyPath, tp.blob_to_bigint_polynomial_parallel(poly, blob) + + var r {.noinit.}: ECP_ShortW_Aff[Fp[BLS12_381], G1] + tp.kzg_commit_parallel(r, poly.evals, ctx.srs_lagrange_g1) + discard dst.serialize_g1_compressed(r) + + result = cttEthKZG_Success + + freeHeapAligned(poly) + return result + +proc compute_kzg_proof_parallel*( + tp: Threadpool, + ctx: ptr EthereumKZGContext, + proof_bytes: var array[48, byte], + y_bytes: var array[32, byte], + blob: ptr Blob, + z_bytes: array[32, byte]): CttEthKzgStatus = + ## Generate: + ## - A proof of correct evaluation. + ## - y = p(z), the evaluation of p at the challenge z, with p being the Blob interpreted as a polynomial. + ## + ## Mathematical description + ## [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁, with p(τ) being the commitment, i.e. the evaluation of p at the powers of τ + ## The notation [a]₁ corresponds to the scalar multiplication of a by the generator of 𝔾1 + ## + ## Verification can be done by verifying the relation: + ## proof.(τ - z) = p(τ)-p(z) + ## which doesn't require the full blob but only evaluations of it + ## - at τ, p(τ) is the commitment + ## - and at the verification challenge z. + + # Random or Fiat-Shamir challenge + var z {.noInit.}: Fr[BLS12_381] + checkReturn z.bytes_to_bls_field(z_bytes) + + let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64) + + block HappyPath: + # Blob -> Polynomial + check HappyPath, sync tp.blob_to_field_polynomial_parallel_async(poly, blob) + + # KZG Prove + var y {.noInit.}: Fr[BLS12_381] # y = p(z), eval at challenge z + var proof {.noInit.}: ECP_ShortW_Aff[Fp[BLS12_381], G1] # [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁ + + tp.kzg_prove_parallel( + proof, y, + poly, ctx.domain.addr, + z.addr, ctx.srs_lagrange_g1, + isBitReversedDomain = true) + + discard proof_bytes.serialize_g1_compressed(proof) # cannot fail + y_bytes.marshal(y, bigEndian) # cannot fail + result = cttEthKZG_Success + + freeHeapAligned(poly) + return result + +proc compute_blob_kzg_proof_parallel*( + tp: Threadpool, + ctx: ptr EthereumKZGContext, + proof_bytes: var array[48, byte], + blob: ptr Blob, + commitment_bytes: array[48, byte]): CttEthKzgStatus = + ## Given a blob, return the KZG proof that is used to verify it against the commitment. + ## This method does not verify that the commitment is correct with respect to `blob`. + + var commitment {.noInit.}: KZGCommitment + checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes) + + # Blob -> Polynomial + let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64) + + block HappyPath: + # Blob -> Polynomial, spawn async on other threads + let convStatus = tp.blob_to_field_polynomial_parallel_async(poly, blob) + + # Fiat-Shamir challenge + var challenge {.noInit.}: Fr[BLS12_381] + challenge.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr) + + # Await conversion to field polynomial + check HappyPath, sync(convStatus) + + # KZG Prove + var y {.noInit.}: Fr[BLS12_381] # y = p(z), eval at challenge z + var proof {.noInit.}: ECP_ShortW_Aff[Fp[BLS12_381], G1] # [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁ + + tp.kzg_prove_parallel( + proof, y, + poly, ctx.domain.addr, + challenge.addr, ctx.srs_lagrange_g1, + isBitReversedDomain = true) + + discard proof_bytes.serialize_g1_compressed(proof) # cannot fail + + result = cttEthKZG_Success + + freeHeapAligned(poly) + return result + +proc verify_blob_kzg_proof_parallel*( + tp: Threadpool, + ctx: ptr EthereumKZGContext, + blob: ptr Blob, + commitment_bytes: array[48, byte], + proof_bytes: array[48, byte]): CttEthKzgStatus = + ## Given a blob and a KZG proof, verify that the blob data corresponds to the provided commitment. + + var commitment {.noInit.}: KZGCommitment + checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes) + + var proof {.noInit.}: KZGProof + checkReturn proof.bytes_to_kzg_proof(proof_bytes) + + let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64) + let invRootsMinusZ = allocHeapAligned(array[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], alignment = 64) + + block HappyPath: + # Blob -> Polynomial, spawn async on other threads + let convStatus = tp.blob_to_field_polynomial_parallel_async(poly, blob) + + # Fiat-Shamir challenge + var challengeFr {.noInit.}: Fr[BLS12_381] + challengeFr.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr) + + var challenge, eval_at_challenge {.noInit.}: matchingOrderBigInt(BLS12_381) + challenge.fromField(challengeFr) + + # Lagrange Polynomial evaluation + # ------------------------------ + # 1. Compute 1/(ωⁱ - z) with ω a root of unity, i in [0, N). + # zIndex = i if ωⁱ - z == 0 (it is the i-th root of unity) and -1 otherwise. + let zIndex = invRootsMinusZ[].inverseRootsMinusZ_vartime( + ctx.domain, challengeFr, + earlyReturnOnZero = true) + + # Await conversion to field polynomial + check HappyPath, sync(convStatus) + + # 2. Actual evaluation + if zIndex == -1: + var eval_at_challenge_fr{.noInit.}: Fr[BLS12_381] + tp.evalPolyAt_parallel( + eval_at_challenge_fr, + poly, challengeFr.addr, + invRootsMinusZ, + ctx.domain.addr) + eval_at_challenge.fromField(eval_at_challenge_fr) + else: + eval_at_challenge.fromField(poly.evals[zIndex]) + + # KZG verification + let verif = kzg_verify(ECP_ShortW_Aff[Fp[BLS12_381], G1](commitment), + challenge, eval_at_challenge, + ECP_ShortW_Aff[Fp[BLS12_381], G1](proof), + ctx.srs_monomial_g2.coefs[1]) + if verif: + result = cttEthKZG_Success + else: + result = cttEthKZG_VerificationFailure + + freeHeapAligned(invRootsMinusZ) + freeHeapAligned(poly) + return result + +proc verify_blob_kzg_proof_batch_parallel*( + tp: Threadpool, + ctx: ptr EthereumKZGContext, + blobs: ptr UncheckedArray[Blob], + commitments_bytes: ptr UncheckedArray[array[48, byte]], + proof_bytes: ptr UncheckedArray[array[48, byte]], + n: int, + secureRandomBytes: array[32, byte]): CttEthKzgStatus = + ## Verify `n` (blob, commitment, proof) sets efficiently + ## + ## `n` is the number of verifications set + ## - if n is negative, this procedure returns verification failure + ## - if n is zero, this procedure returns verification success + ## + ## `secureRandomBytes` random byte must come from a cryptographically secure RNG + ## or computed through the Fiat-Shamir heuristic. + ## It serves as a random number + ## that is not in the control of a potential attacker to prevent potential + ## rogue commitments attacks due to homomorphic properties of pairings, + ## i.e. commitments that are linear combination of others and sum would be zero. + + mixin globalStatus + + if n < 0: + return cttEthKZG_VerificationFailure + if n == 0: + return cttEthKZG_Success + + let commitments = allocHeapArrayAligned(KZGCommitment, n, alignment = 64) + let challenges = allocHeapArrayAligned(Fr[BLS12_381], n, alignment = 64) + let evals_at_challenges = allocHeapArrayAligned(matchingOrderBigInt(BLS12_381), n, alignment = 64) + let proofs = allocHeapArrayAligned(KZGProof, n, alignment = 64) + + let polys = allocHeapArrayAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], n, alignment = 64) + let invRootsMinusZs = allocHeapArrayAligned(array[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], n, alignment = 64) + + block HappyPath: + tp.parallelFor i in 0 ..< n: + captures: {tp, ctx, + commitments, commitments_bytes, + polys, blobs, + challenges, evals_at_challenges, + proofs, proof_bytes, + invRootsMinusZs} + reduceInto(globalStatus: CttEthKzgStatus): + prologue: + var workerStatus = cttEthKZG_Success + forLoop: + let polyStatusFut = tp.blob_to_field_polynomial_parallel_async(polys[i].addr, blobs[i].addr) + let challengeStatusFut = tp.spawnAwaitable challenges[i].addr.fiatShamirChallenge(blobs[i].addr, commitments_bytes[i].addr) + + let commitmentStatus = kzgifyStatus commitments[i].bytes_to_kzg_commitment(commitments_bytes[i]) + if workerStatus == cttEthKZG_Success: + workerStatus = commitmentStatus + let polyStatus = kzgifyStatus sync(polyStatusFut) + if workerStatus == cttEthKZG_Success: + workerStatus = polyStatus + discard sync(challengeStatusFut) + + # Lagrange Polynomial evaluation + # ------------------------------ + # 1. Compute 1/(ωⁱ - z) with ω a root of unity, i in [0, N). + # zIndex = i if ωⁱ - z == 0 (it is the i-th root of unity) and -1 otherwise. + let zIndex = invRootsMinusZs[i].inverseRootsMinusZ_vartime( + ctx.domain, challenges[i], + earlyReturnOnZero = true) + # 2. Actual evaluation + if zIndex == -1: + var eval_at_challenge_fr{.noInit.}: Fr[BLS12_381] + tp.evalPolyAt_parallel( + eval_at_challenge_fr, + polys[i].addr, challenges[i].addr, + invRootsMinusZs[i].addr, + ctx.domain.addr) + evals_at_challenges[i].fromField(eval_at_challenge_fr) + else: + evals_at_challenges[i].fromField(polys[i].evals[zIndex]) + + let proofStatus = kzgifyStatus proofs[i].bytes_to_kzg_proof(proof_bytes[i]) + if workerStatus == cttEthKZG_Success: + workerStatus = proofStatus + + merge(remoteStatusFut: Flowvar[CttEthKzgStatus]): + let remoteStatus = sync(remoteStatusFut) + if workerStatus == cttEthKZG_Success: + workerStatus = remoteStatus + epilogue: + return workerStatus + + + result = sync(globalStatus) + if result != cttEthKZG_Success: + break HappyPath + + var randomBlindingFr {.noInit.}: Fr[BLS12_381] + block blinding: # Ensure we don't multiply by 0 for blinding + # 1. Try with the random number supplied + for i in 0 ..< secureRandomBytes.len: + if secureRandomBytes[i] != byte 0: + randomBlindingFr.fromDigest(secureRandomBytes) + break blinding + # 2. If it's 0 (how?!), we just hash all the Fiat-Shamir challenges + var transcript: sha256 + transcript.init() + transcript.update(RANDOM_CHALLENGE_KZG_BATCH_DOMAIN) + transcript.update(cast[ptr UncheckedArray[byte]](challenges).toOpenArray(0, n*sizeof(Fr[BLS12_381])-1)) + + var blindingBytes {.noInit.}: array[32, byte] + transcript.finish(blindingBytes) + randomBlindingFr.fromDigest(blindingBytes) + + # TODO: use parallel prefix product for parallel powers compute + let linearIndepRandNumbers = allocHeapArrayAligned(Fr[BLS12_381], n, alignment = 64) + linearIndepRandNumbers.computePowers(n, randomBlindingFr) + + type EcAffArray = ptr UncheckedArray[ECP_ShortW_Aff[Fp[BLS12_381], G1]] + let verif = kzg_verify_batch( + cast[EcAffArray](commitments), + challenges, + evals_at_challenges, + cast[EcAffArray](proofs), + linearIndepRandNumbers, + n, + ctx.srs_monomial_g2.coefs[1]) + if verif: + result = cttEthKZG_Success + else: + result = cttEthKZG_VerificationFailure + + freeHeapAligned(linearIndepRandNumbers) + + freeHeapAligned(invRootsMinusZs) + freeHeapAligned(polys) + freeHeapAligned(proofs) + freeHeapAligned(evals_at_challenges) + freeHeapAligned(challenges) + freeHeapAligned(commitments) + + return result diff --git a/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim b/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim index 9923010..325e4ca 100644 --- a/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim +++ b/constantine/math/elliptic/ec_multi_scalar_mul_parallel.nim @@ -576,3 +576,14 @@ proc multiScalarMul_vartime_parallel*[bits: static int, EC, F, G]( let N = points.len tp.multiScalarMul_dispatch_vartime_parallel(r.addr, coefs.asUnchecked(), points.asUnchecked(), N) + +proc multiScalarMul_vartime_parallel*[bits: static int, EC, F, G]( + tp: Threadpool, + r: ptr EC, + coefs: ptr UncheckedArray[BigInt[bits]], + points: ptr UncheckedArray[ECP_ShortW_Aff[F, G]], + len: int) {.meter, inline.} = + ## Multiscalar multiplication: + ## r <- [a₀]P₀ + [a₁]P₁ + ... + [aₙ]Pₙ + ## This function can be nested in another parallel function + tp.multiScalarMul_dispatch_vartime_parallel(r, coefs, points, len) diff --git a/constantine/math/elliptic/ec_shortweierstrass_batch_ops.nim b/constantine/math/elliptic/ec_shortweierstrass_batch_ops.nim index 2813ecb..583e601 100644 --- a/constantine/math/elliptic/ec_shortweierstrass_batch_ops.nim +++ b/constantine/math/elliptic/ec_shortweierstrass_batch_ops.nim @@ -371,13 +371,6 @@ func accum_half_vartime[F; G: static Subgroup]( # Batch addition - High-level # ------------------------------------------------------------ -template `+=`[F; G: static Subgroup](P: var ECP_ShortW_JacExt[F, G], Q: ECP_ShortW_Aff[F, G]) = - # All vartime procedures MUST be tagged vartime - # Hence we do not expose `+=` for extended jacobian operation to prevent `vartime` mistakes - # The following algorithms are all tagged vartime, hence for genericity - # we create a local `+=` for this module only - madd_vartime(P, P, Q) - func accumSum_chunk_vartime*[F; G: static Subgroup]( r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G] or ECP_ShortW_JacExt[F, G]), points: ptr UncheckedArray[ECP_ShortW_Aff[F, G]], len: int) {.noInline, tags:[VarTime, Alloca].} = @@ -398,7 +391,7 @@ func accumSum_chunk_vartime*[F; G: static Subgroup]( while n >= minNumPointsSerial: if (n and 1) == 1: # odd number of points ## Accumulate the last - r += points[n-1] + r.madd_vartime(r, points[n-1]) n -= 1 # Compute [0, n/2) += [n/2, n) @@ -409,7 +402,7 @@ func accumSum_chunk_vartime*[F; G: static Subgroup]( # Tail for i in 0 ..< n: - r += points[i] + r.madd_vartime(r, points[i]) func accum_batch_vartime[F; G: static Subgroup]( r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G] or ECP_ShortW_JacExt[F, G]), @@ -472,36 +465,66 @@ func sum_reduce_vartime*[F; G: static Subgroup]( type EcAddAccumulator_vartime*[EC, F; G: static Subgroup; AccumMax: static int] = object ## Elliptic curve addition accumulator ## **Variable-Time** - # The `cur` is dereferenced first so better locality if at the beginning + # The `len` is dereferenced first so better locality if at the beginning # Do we want alignment guarantees? - cur: uint32 + len: uint32 accum: EC buffer: array[AccumMax, ECP_ShortW_Aff[F, G]] func init*(ctx: var EcAddAccumulator_vartime) = static: doAssert EcAddAccumulator_vartime.AccumMax >= 16, "There is no point in a EcAddBatchAccumulator if the batch size is too small" ctx.accum.setInf() - ctx.cur = 0 + ctx.len = 0 func consumeBuffer[EC, F; G: static Subgroup; AccumMax: static int]( ctx: var EcAddAccumulator_vartime[EC, F, G, AccumMax]) {.noInline, tags: [VarTime, Alloca].}= - if ctx.cur == 0: + if ctx.len == 0: return - ctx.accum.accumSum_chunk_vartime(ctx.buffer.asUnchecked(), ctx.cur) - ctx.cur = 0 + ctx.accum.accumSum_chunk_vartime(ctx.buffer.asUnchecked(), ctx.len.int) + ctx.len = 0 func update*[EC, F, G; AccumMax: static int]( ctx: var EcAddAccumulator_vartime[EC, F, G, AccumMax], P: ECP_ShortW_Aff[F, G]) = - if ctx.cur == AccumMax: + if P.isInf().bool: + return + + if ctx.len == AccumMax: ctx.consumeBuffer() - ctx.buffer[ctx.cur] = P - ctx.cur += 1 + ctx.buffer[ctx.len] = P + ctx.len += 1 + +func handover*(ctx: var EcAddAccumulator_vartime) {.inline.} = + ctx.consumeBuffer() + +func merge*[EC, F, G; AccumMax: static int]( + ctxDst: var EcAddAccumulator_vartime[EC, F, G, AccumMax], + ctxSrc: EcAddAccumulator_vartime[EC, F, G, AccumMax]) = + + var sCur = 0'u32 + var itemsLeft = ctxSrc.len + + if ctxDst.len + ctxSrc.len >= AccumMax: + # previous partial update, fill the buffer and do a batch addition + let free = AccumMax - ctxDst.len + for i in 0 ..< free: + ctxDst.buffer[ctxDst.len+i] = ctxSrc.buffer[i] + ctxDst.len = AccumMax + ctxDst.consumeBuffer() + sCur = free + itemsLeft -= free + + # Store the tail + for i in 0 ..< itemsLeft: + ctxDst.buffer[ctxDst.len+i] = ctxSrc.buffer[sCur+i] + + ctxDst.len += itemsLeft + + ctxDst.accum.sum_vartime(ctxDst.accum, ctxSrc.accum) -# TODO: `merge` for parallel recursive divide-and-conquer processing func finish*[EC, F, G; AccumMax: static int]( ctx: var EcAddAccumulator_vartime[EC, F, G, AccumMax], diff --git a/constantine/math/elliptic/ec_shortweierstrass_batch_ops_parallel.nim b/constantine/math/elliptic/ec_shortweierstrass_batch_ops_parallel.nim index 8659100..a1d995a 100644 --- a/constantine/math/elliptic/ec_shortweierstrass_batch_ops_parallel.nim +++ b/constantine/math/elliptic/ec_shortweierstrass_batch_ops_parallel.nim @@ -30,19 +30,13 @@ proc sum_reduce_vartime_parallelChunks[F; G: static Subgroup]( points: openArray[ECP_ShortW_Aff[F, G]]) {.noInline.} = ## Batch addition of `points` into `r` ## `r` is overwritten - ## Compute is parallelized, if beneficial. - ## This function can be nested in another parallel function + ## Scales better for large number of points # Chunking constants in ec_shortweierstrass_batch_ops.nim const maxTempMem = 262144 # 2¹⁸ = 262144 const maxChunkSize = maxTempMem div sizeof(ECP_ShortW_Aff[F, G]) const minChunkSize = (maxChunkSize * 60) div 100 # We want 60%~100% full chunks - if points.len <= maxChunkSize: - r.setInf() - r.accumSum_chunk_vartime(points.asUnchecked(), points.len) - return - let chunkDesc = balancedChunksPrioSize( start = 0, stopEx = points.len, minChunkSize, maxChunkSize, @@ -72,48 +66,58 @@ proc sum_reduce_vartime_parallelChunks[F; G: static Subgroup]( partialResultsAffine.batchAffine(partialResults, chunkDesc.numChunks) r.sum_reduce_vartime(partialResultsAffine, chunkDesc.numChunks) -proc sum_reduce_vartime_parallelFor[F; G: static Subgroup]( +proc sum_reduce_vartime_parallelAccums[F; G: static Subgroup]( tp: Threadpool, r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G]), points: openArray[ECP_ShortW_Aff[F, G]]) = ## Batch addition of `points` into `r` ## `r` is overwritten - ## Compute is parallelized, if beneficial. + ## 2x faster for low number of points - mixin globalSum + const maxTempMem = 1 shl 18 # 2¹⁸ = 262144 + const maxChunkSize = maxTempMem div sizeof(ECP_ShortW_Aff[F, G]) + type Acc = EcAddAccumulator_vartime[typeof(r), F, G, maxChunkSize] - const maxTempMem = 262144 # 2¹⁸ = 262144 - const maxStride = maxTempMem div sizeof(ECP_ShortW_Aff[F, G]) + let ps = points.asUnchecked() + let N = points.len - let p = points.asUnchecked - let pointsLen = points.len + mixin globalAcc - tp.parallelFor i in 0 ..< points.len: - stride: maxStride - captures: {p, pointsLen} - reduceInto(globalSum: typeof(r)): + const chunkSize = 32 + + tp.parallelFor i in 0 ..< N: + stride: chunkSize + captures: {ps, N} + reduceInto(globalAcc: ptr Acc): prologue: - var localSum {.noInit.}: typeof(r) - localSum.setInf() + var workerAcc = allocHeap(Acc) + workerAcc[].init() forLoop: - let n = min(maxStride, pointsLen-i) - localSum.accumSum_chunk_vartime(p +% i, n) - merge(remoteSum: Flowvar[typeof(r)]): - localSum.sum_vartime(localSum, sync(remoteSum)) + for j in i ..< min(i+chunkSize, N): + workerAcc[].update(ps[j]) + merge(remoteAccFut: Flowvar[ptr Acc]): + let remoteAcc = sync(remoteAccFut) + workerAcc[].merge(remoteAcc[]) + freeHeap(remoteAcc) epilogue: - return localSum + workerAcc[].handover() + return workerAcc - r = sync(globalSum) + let ctx = sync(globalAcc) + ctx[].finish(r) + freeHeap(ctx) proc sum_reduce_vartime_parallel*[F; G: static Subgroup]( tp: Threadpool, r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G]), points: openArray[ECP_ShortW_Aff[F, G]]) {.inline.} = - ## Batch addition of `points` into `r` + ## Parallel Batch addition of `points` into `r` ## `r` is overwritten - ## Compute is parallelized, if beneficial. - ## This function cannot be nested in another parallel function - when false: - tp.sum_reduce_vartime_parallelFor(r, points) + + if points.len < 256: + r.setInf() + r.accumSum_chunk_vartime(points.asUnchecked(), points.len) + elif points.len < 8192: + tp.sum_reduce_vartime_parallelAccums(r, points) else: tp.sum_reduce_vartime_parallelChunks(r, points) diff --git a/constantine/math/pairings/miller_accumulators.nim b/constantine/math/pairings/miller_accumulators.nim index 995cb9b..ea927e7 100644 --- a/constantine/math/pairings/miller_accumulators.nim +++ b/constantine/math/pairings/miller_accumulators.nim @@ -50,40 +50,40 @@ import # and we can choose N to be way less than 68. # So for compactness we take Aranha's approach. -const AccumMax = 8 +const MillerAccumMax = 8 # Max buffer size before triggering a Miller Loop. # Assuming pairing costs 100, with 50 for Miller Loop and 50 for Final exponentiation. # # N unbatched pairings would cost N*100 # N maximally batched pairings would cost N*50 + 50 -# N AccumMax batched pairings would cost N*50 + N/AccumMax*(Fpᵏ mul) + 50 +# N AccumMax batched pairings would cost N*50 + N/MillerAccumMax*(Fpᵏ mul) + 50 # # Fpᵏ mul costs 0.7% of a Miller Loop and so is negligeable. # By choosing AccumMax = 8, we amortized the cost to below 0.1% per pairing. type MillerAccumulator*[FF1, FF2; FpK: ExtensionField] = object accum: FpK - Ps: array[AccumMax, ECP_ShortW_Aff[FF1, G1]] - Qs: array[AccumMax, ECP_ShortW_Aff[FF2, G2]] - cur: uint32 + Ps: array[MillerAccumMax, ECP_ShortW_Aff[FF1, G1]] + Qs: array[MillerAccumMax, ECP_ShortW_Aff[FF2, G2]] + len: uint32 accOnce: bool func init*(ctx: var MillerAccumulator) = - ctx.cur = 0 + ctx.len = 0 ctx.accOnce = false func consumeBuffers[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK]) = - if ctx.cur == 0: + if ctx.len == 0: return var t{.noInit.}: FpK - t.millerLoop(ctx.Qs.asUnchecked(), ctx.Ps.asUnchecked(), ctx.cur.int) + t.millerLoop(ctx.Qs.asUnchecked(), ctx.Ps.asUnchecked(), ctx.len.int) if ctx.accOnce: ctx.accum *= t else: ctx.accum = t ctx.accOnce = true - ctx.cur = 0 + ctx.len = 0 func update*[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK], P: ECP_ShortW_Aff[FF1, G1], Q: ECP_ShortW_Aff[FF2, G2]): bool = ## Aggregate another set for pairing @@ -94,34 +94,54 @@ func update*[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK], P: ECP_Sh if P.isInf().bool or Q.isInf().bool: return false - if ctx.cur == AccumMax: + if ctx.len == MillerAccumMax: ctx.consumeBuffers() - ctx.Ps[ctx.cur] = P - ctx.Qs[ctx.cur] = Q - ctx.cur += 1 + ctx.Ps[ctx.len] = P + ctx.Qs[ctx.len] = Q + ctx.len += 1 return true +func handover*(ctx: var MillerAccumulator) {.inline.} = + ## Prepare accumulator for cheaper merging. + ## + ## In a multi-threaded context, multiple accumulators can be created and process subsets of the batch in parallel. + ## Accumulators can then be merged: + ## merger_accumulator += mergee_accumulator + ## Merging will involve an expensive reduction operation when an accumulation threshold of 8 is reached. + ## However merging two reduced accumulators is 136x cheaper. + ## + ## `Handover` forces this reduction on local threads to limit the burden on the merger thread. + ctx.consumeBuffers() + func merge*(ctxDst: var MillerAccumulator, ctxSrc: MillerAccumulator) = ## Merge ctxDst <- ctxDst + ctxSrc - var dCur = ctxDst.cur var sCur = 0'u - var itemsLeft = ctxSrc.cur + var itemsLeft = ctxSrc.len - if dCur != 0 and dCur+itemsLeft >= AccumMax: + if ctxDst.len + itemsLeft >= MillerAccumMax: # Previous partial update, fill the buffer and do one miller loop - let free = AccumMax - dCur + let free = MillerAccumMax - ctxDst.len for i in 0 ..< free: - ctxDst[dCur+i] = ctxSrc[i] + ctxDst.Ps[ctxDst.len+i] = ctxSrc.Ps[i] + ctxDst.Qs[ctxDst.len+i] = ctxSrc.Qs[i] + ctxDst.len = MillerAccumMax ctxDst.consumeBuffers() - dCur = 0 sCur = free itemsLeft -= free - if itemsLeft != 0: - # Store the tail - for i in 0 ..< itemsLeft: - ctxDst[dCur+i] = ctxSrc[sCur+i] + # Store the tail + for i in 0 ..< itemsLeft: + ctxDst.Ps[ctxDst.len+i] = ctxSrc.Ps[sCur+i] + ctxDst.Qs[ctxDst.len+i] = ctxSrc.Qs[sCur+i] + + ctxDst.len += itemsLeft + + if ctxDst.accOnce and ctxSrc.accOnce: + ctxDst.accum *= ctxSrc.accum + elif ctxSrc.accOnce: + ctxDst.accum = ctxSrc.accum + ctxDst.accOnce = true func finish*[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK], multiMillerLoopResult: var Fpk) = ## Output the accumulation of multiple Miller Loops diff --git a/constantine/math/polynomials/polynomials_parallel.nim b/constantine/math/polynomials/polynomials_parallel.nim new file mode 100644 index 0000000..b184239 --- /dev/null +++ b/constantine/math/polynomials/polynomials_parallel.nim @@ -0,0 +1,156 @@ +# Constantine +# Copyright (c) 2018-2019 Status Research & Development GmbH +# Copyright (c) 2020-Present Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import ./polynomials {.all.} +export polynomials + +import + ../config/curves, + ../arithmetic, + ../../platforms/bithacks, + ../../threadpool/threadpool + +## ############################################################ +## +## Polynomials +## Parallel Edition +## +## ############################################################ + +proc evalPolyAt_parallel*[N: static int, Field]( + tp: Threadpool, + r: var Field, + poly: ptr PolynomialEval[N, Field], + z: ptr Field, + invRootsMinusZ: ptr array[N, Field], + domain: ptr PolyDomainEval[N, Field]) = + ## Evaluate a polynomial in evaluation form + ## at the point z + ## z MUST NOT be one of the roots of unity + ## + ## Parallelism: This only returns when computation is fully done + + # p(z) = (1-zⁿ)/n ∑ ωⁱ/(ωⁱ-z) . p(ωⁱ) + + mixin globalSum + static: doAssert N.isPowerOf2_vartime() + + tp.parallelFor i in 0 ..< N: + captures: {poly, domain, invRootsMinusZ} + reduceInto(globalSum: Field): + prologue: + var workerSum {.noInit.}: Field + workerSum.setZero() + forLoop: + var iterSummand {.noInit.}: Field + iterSummand.prod(domain.rootsOfUnity[i], invRootsMinusZ[i]) + iterSummand *= poly.evals[i] + workerSum += iterSummand + merge(remoteSum: Flowvar[Field]): + workerSum += sync(remoteSum) + epilogue: + return workerSum + + var t {.noInit.}: Field + t = z[] + const numDoublings = log2_vartime(uint32 N) # N is a power of 2 + t.square_repeated(int numDoublings) # exponentiation by a power of 2 + t.diff(Field(mres: Field.getMontyOne()), t) # TODO: refactor getMontyOne to getOne and return a field element. + + r.prod(t, domain.invMaxDegree) + r *= sync(globalSum) + +proc differenceQuotientEvalOffDomain_parallel*[N: static int, Field]( + tp: Threadpool, + r: ptr PolynomialEval[N, Field], + poly: ptr PolynomialEval[N, Field], + pZ: ptr Field, + invRootsMinusZ: ptr array[N, Field]) = + ## Compute r(x) = (p(x) - p(z)) / (x - z) + ## + ## for z != ωⁱ a power of a root of unity + ## + ## Input: + ## - invRootsMinusZ: 1/(ωⁱ-z) + ## - poly: p(x) a polynomial in evaluation form as an array of p(ωⁱ) + ## - rootsOfUnity: ωⁱ + ## - p(z) + ## + ## Parallelism: This only returns when computation is fully done + # TODO: we might want either awaitable for-loops + # or awaitable individual iterations + # for latency-hiding techniques + + syncScope: + tp.parallelFor i in 0 ..< N: + captures: {r, poly, pZ, invRootsMinusZ} + # qᵢ = (p(ωⁱ) - p(z))/(ωⁱ-z) + var qi {.noinit.}: Field + qi.diff(poly.evals[i], pZ[]) + r.evals[i].prod(qi, invRootsMinusZ[i]) + +proc differenceQuotientEvalInDomain_parallel*[N: static int, Field]( + tp: Threadpool, + r: ptr PolynomialEval[N, Field], + poly: ptr PolynomialEval[N, Field], + zIndex: uint32, + invRootsMinusZ: ptr array[N, Field], + domain: ptr PolyDomainEval[N, Field], + isBitReversedDomain: static bool) = + ## Compute r(x) = (p(x) - p(z)) / (x - z) + ## + ## for z = ωⁱ a power of a root of unity + ## + ## Input: + ## - poly: p(x) a polynomial in evaluation form as an array of p(ωⁱ) + ## - rootsOfUnity: ωⁱ + ## - invRootsMinusZ: 1/(ωⁱ-z) + ## - zIndex: the index of the root of unity power that matches z = ωⁱᵈˣ + ## + ## Parallelism: This only returns when computation is fully done + + static: + # For powers of 2: x mod N == x and (N-1) + doAssert N.isPowerOf2_vartime() + + mixin evalsZindex + + tp.parallelFor i in 0 ..< N: + captures: {r, poly, domain, invRootsMinusZ, zIndex} + reduceInto(evalsZindex: Field): + prologue: + var worker_ri {.noInit.}: Field + worker_ri.setZero() + forLoop: + var iter_ri {.noInit.}: Field + if i == int(zIndex): + iter_ri.setZero() + else: + # qᵢ = (p(ωⁱ) - p(z))/(ωⁱ-z) + var qi {.noinit.}: Field + qi.diff(poly.evals[i], poly.evals[zIndex]) + r.evals[i].prod(qi, invRootsMinusZ[i]) + + # q'ᵢ = -qᵢ * ωⁱ/z + # q'idx = ∑ q'ᵢ + iter_ri.neg(r.evals[i]) # -qᵢ + when isBitReversedDomain: + const logN = log2_vartime(uint32 N) + let invZidx = N - reverseBits(uint32 zIndex, logN) + let canonI = reverseBits(uint32 i, logN) + let idx = reverseBits((canonI + invZidx) and (N-1), logN) + iter_ri *= domain.rootsOfUnity[idx] # -qᵢ * ωⁱ/z (explanation at the bottom of serial impl) + else: + iter_ri *= domain.rootsOfUnity[(i+N-zIndex) and (N-1)] # -qᵢ * ωⁱ/z (explanation at the bottom of serial impl) + worker_ri += iter_ri + merge(remote_ri: Flowvar[Field]): + worker_ri += sync(remote_ri) + epilogue: + return worker_ri + + r.evals[zIndex] = sync(evalsZindex) \ No newline at end of file diff --git a/constantine/platforms/views.nim b/constantine/platforms/views.nim index e488752..5baf297 100644 --- a/constantine/platforms/views.nim +++ b/constantine/platforms/views.nim @@ -6,7 +6,9 @@ # * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. -import std/macros +import + std/macros, + ./primitives # OpenArray type # --------------------------------------------------------- @@ -29,12 +31,23 @@ type View*[T] = object template toOpenArray*[T](v: View[T]): openArray[T] = v.data.toOpenArray(0, v.len-1) -func toView*[T](data: ptr UncheckedArray[T], len: int) {.inline.} = +func toView*[T](oa: openArray[T]): View[T] {.inline.} = + View[T](data: cast[ptr UncheckedArray[T]](oa[0].unsafeAddr), len: oa.len) + +func toView*[T](data: ptr UncheckedArray[T], len: int): View[T] {.inline.} = View[T](data: data, len: len) func `[]`*[T](v: View[T], idx: int): lent T {.inline.} = v.data[idx] +func chunk*[T](v: View[T], start, len: int): View[T] {.inline.} = + ## Create a sub-chunk from a view + debug: + doAssert start >= 0 + doAssert start + len <= v.len + result.data = v.data +% start + result.len = len + type MutableView*[T] {.borrow: `.`.} = distinct View[T] template toOpenArray*[T](v: MutableView[T]): openArray[T] = diff --git a/constantine/signatures/bls_signatures.nim b/constantine/signatures/bls_signatures.nim index 6bf4a2c..c1e483e 100644 --- a/constantine/signatures/bls_signatures.nim +++ b/constantine/signatures/bls_signatures.nim @@ -198,7 +198,7 @@ func update*[Pubkey: ECP_ShortW_Aff]( augmentation = "", message, ctx.domainSepTag.toOpenArray(0, ctx.dst_len.int - 1)) - ctx.millerAccum.update(pubkey, hmsgG2_aff) + return ctx.millerAccum.update(pubkey, hmsgG2_aff) else: # Pubkey on G2, H(message) and Signature on G1 @@ -209,7 +209,7 @@ func update*[Pubkey: ECP_ShortW_Aff]( augmentation = "", message, ctx.domainSepTag.toOpenArray(0, ctx.dst_len.int - 1)) - ctx.millerAccum.update(hmsgG1_aff, pubkey) + return ctx.millerAccum.update(hmsgG1_aff, pubkey) func update*[Pubkey: ECP_ShortW_Aff]( ctx: var BLSAggregateSigAccumulator, @@ -227,6 +227,7 @@ func merge*(ctxDst: var BLSAggregateSigAccumulator, ctxSrc: BLSAggregateSigAccum return false ctxDst.millerAccum.merge(ctxSrc.millerAccum) + return true func finalVerify*[F, G](ctx: var BLSAggregateSigAccumulator, aggregateSignature: ECP_ShortW_Aff[F, G]): bool = ## Finish batch and/or aggregate signature verification and returns the final result. @@ -439,7 +440,7 @@ func update*[Pubkey, Sig: ECP_ShortW_Aff]( augmentation = "", message, ctx.domainSepTag.toOpenArray(0, ctx.dst_len.int - 1)) - ctx.millerAccum.update(pkG1_aff, hmsgG2_aff) + return ctx.millerAccum.update(pkG1_aff, hmsgG2_aff) else: # Pubkey on G2, H(message) and Signature on G1 @@ -467,7 +468,7 @@ func update*[Pubkey, Sig: ECP_ShortW_Aff]( type FF1 = BLSBatchSigAccumulator.FF1 var hmsgG1_aff {.noInit.}: ECP_ShortW_Aff[FF1, G1] hmsgG1_aff.affine(hmsgG1_jac) - ctx.millerAccum.update(hmsgG1_aff, pubkey) + return ctx.millerAccum.update(hmsgG1_aff, pubkey) func update*[Pubkey, Sig: ECP_ShortW_Aff]( ctx: var BLSBatchSigAccumulator, @@ -476,13 +477,25 @@ func update*[Pubkey, Sig: ECP_ShortW_Aff]( signature: Sig): bool {.inline.} = ctx.update(pubkey, message, signature) +func handover*(ctx: var BLSBatchSigAccumulator) {.inline.} = + ## Prepare accumulator for cheaper merging. + ## + ## In a multi-threaded context, multiple accumulators can be created and process subsets of the batch in parallel. + ## Accumulators can then be merged: + ## merger_accumulator += mergee_accumulator + ## Merging will involve an expensive reduction operation when an accumulation threshold of 8 is reached. + ## However merging two reduced accumulators is 136x cheaper. + ## + ## `Handover` forces this reduction on local threads to limit the burden on the merger thread. + ctx.millerAccum.handover() + func merge*(ctxDst: var BLSBatchSigAccumulator, ctxSrc: BLSBatchSigAccumulator): bool = ## Merge 2 BLS signature accumulators: ctxDst <- ctxDst + ctxSrc ## ## Returns false if they have inconsistent DomainSeparationTag and true otherwise. if ctxDst.dst_len != ctxSrc.dst_len: return false - if not equalMem(ctxDst.domainSepTag.addr, ctxSrc.domainSepTag.addr, ctxDst.domainSepTag.len): + if not equalMem(ctxDst.domainSepTag.addr, ctxSrc.domainSepTag.unsafeAddr, ctxDst.domainSepTag.len): return false ctxDst.millerAccum.merge(ctxSrc.millerAccum) @@ -494,6 +507,7 @@ func merge*(ctxDst: var BLSBatchSigAccumulator, ctxSrc: BLSBatchSigAccumulator): ctxDst.aggSigOnce = true BLSBatchSigAccumulator.H.hash(ctxDst.secureBlinding, ctxDst.secureBlinding, ctxSrc.secureBlinding) + return true func finalVerify*(ctx: var BLSBatchSigAccumulator): bool = ## Finish batch and/or aggregate signature verification and returns the final result. diff --git a/constantine/signatures/bls_signatures_parallel.nim b/constantine/signatures/bls_signatures_parallel.nim new file mode 100644 index 0000000..cf83356 --- /dev/null +++ b/constantine/signatures/bls_signatures_parallel.nim @@ -0,0 +1,181 @@ +# Constantine +# Copyright (c) 2018-2019 Status Research & Development GmbH +# Copyright (c) 2020-Present Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# ############################################################ +# +# BLS Signatures +# Parallel edition +# +# ############################################################ + +when not compileOption("threads"): + {.error: "This requires --threads:on compilation flag".} + +# Import all bls_signature including private fields and reexport +import ./bls_signatures{.all.} +export bls_signatures + +import + # Standard library + std/atomics, + # Constantine + ../threadpool/[threadpool, partitioners], + ../platforms/[abstractions, allocs, views], + ../serialization/endians, + ../hashes, + ../math/ec_shortweierstrass + +# No exceptions allowed in core cryptographic operations +{.push raises: [].} +{.push checks: off.} + +# Parallelized Batch Verifier +# ---------------------------------------------------------------------- +# Parallel pairing computation requires the following steps +# +# Assuming we have N (public key, message, signature) triplets to verify +# on P processor/threads. +# We want B batches with B = (idle) P +# Each processing W work items with W = N/B or N/B + 1 +# +# Step 0: Initialize an accumulator per thread. +# Step 1: Compute partial pairings, W work items per thread. (~190μs - Miller loops) +# Step 2: Merge the B partial pairings (~1.3μs - Fp12 multiplications) +# Step 4: Final verification (~233μs - Final Exponentiation) +# +# (Timings are per operation on a 2.6GHz, turbo 5Ghz i9-11980HK CPU for BLS12-381 pairings.) +# +# We rely on the lazy tree splitting +# of Constantine's threadpool to only split computation if there is an idle worker. +# We force the base case for splitting to be 2 for efficiency but +# the actual base case auto-adapts to runtime conditions +# and may be 100 for example if all other threads are busy. +# +# In Ethereum consensus, blocks may require up to 6 verifications: +# - block proposals signatures +# - randao reveal signatures +# - proposer slashings signatures +# - attester slashings signatures +# - attestations signatures +# - validator exits signatures +# not counting deposits signatures which may be invalid +# +# And signature verification is the bottleneck for fast syncing and may reduce sync speed +# by hours or days. + +proc batchVerify_parallel*[Msg, Pubkey, Sig]( + tp: Threadpool, + pubkeys: openArray[Pubkey], + messages: openArray[Msg], + signatures: openArray[Sig], + H: type CryptoHash, + k: static int, + domainSepTag: openArray[byte], + secureRandomBytes: array[32, byte]): bool {.noInline, genCharAPI.} = + ## Verify that all (pubkey, message, signature) triplets are valid + ## + ## Returns false if there is at least one incorrect signature + ## + ## Assumes pubkeys and signatures have been checked for non-infinity and group-checked. + ## + ## This requires cryptographically-secure generated random bytes + ## for scalar blinding + ## to defend against forged signatures that would not + ## verify individually but would verify while aggregated. + ## I.e. we need an input that is not under the attacker control. + ## + ## The blinding scheme also assumes that the attacker cannot + ## resubmit 2^64 times forged (publickey, message, signature) triplets + ## against the same `secureRandomBytes` + + if tp.numThreads == 1: + return batchVerify(pubkeys, messages, signatures, H, k, domainSepTag, secureRandomBytes) + + if pubkeys.len == 0: + return false + + if pubkeys.len != messages.len or pubkeys.len != signatures.len: + return false + + type FF1 = Pubkey.F + type FF2 = Sig.F + type FpK = Sig.F.C.getGT() + + # Stage 0a: Setup per-thread accumulators + debug: doAssert pubkeys.len <= 1 shl 32 + let N = pubkeys.len.uint32 + let numAccums = min(N, tp.numThreads.uint32) + let accums = allocHeapArray(BLSBatchSigAccumulator[H, FF1, FF2, Fpk, ECP_ShortW_Jac[Sig.F, Sig.G], k], numAccums) + + # Stage 0b: Setup synchronization + var currentItem {.noInit.}: Atomic[uint32] + var terminateSignal {.noInit.}: Atomic[bool] + currentItem.store(0, moRelaxed) + terminateSignal.store(false, moRelaxed) + + # Stage 1: Accumulate partial pairings (Miller Loops) + # --------------------------------------------------- + proc accumulate( + ctx: ptr BLSBatchSigAccumulator, + pubkeys: ptr UncheckedArray[Pubkey], + messages: ptr UncheckedArray[Msg], + signatures: ptr UncheckedArray[Sig], + N: uint32, + domainSepTag: View[byte], + secureRandomBytes: ptr array[32, byte], + accumSepTag: array[sizeof(int), byte], + terminateSignal: ptr Atomic[bool], + currentItem: ptr Atomic[uint32]): bool {.nimcall, gcsafe.} = + ctx[].init( + domainSepTag.toOpenArray(), + secureRandomBytes[], + accumSepTag) + + while not terminateSignal[].load(moRelaxed): + let i = currentItem[].fetchAdd(1, moRelaxed) + if i >= N: + break + + if not ctx[].update(pubkeys[i], messages[i], signatures[i]): + terminateSignal[].store(true, moRelaxed) + return false + + ctx[].handover() + return true + + # Stage 2: Schedule work + # --------------------------------------------------- + let partialStates = allocStackArray(Flowvar[bool], numAccums) + for id in 0 ..< numAccums: + partialStates[id] = tp.spawn accumulate( + accums[id].addr, + pubkeys.asUnchecked(), + messages.asUnchecked(), + signatures.asUnchecked(), + N, + domainSepTag.toView(), + secureRandomBytes.unsafeAddr, + id.uint.toBytes(bigEndian), + terminateSignal.addr, + currentItem.addr) + + # Stage 3: Reduce partial pairings + # -------------------------------- + # Linear merge with latency hiding, we could consider a parallel logarithmic merge via a binary tree merge / divide-and-conquer + block HappyPath: # sync must be called even if result is false in the middle to avoid tasks leaking + result = sync partialStates[0] + for i in 1 ..< numAccums: + result = result and sync partialStates[i] + if result: # As long as no error is returned, accumulate + result = result and accums[0].merge(accums[i]) + if not result: # Don't proceed to final exponentiation if there is already an error + break HappyPath + + result = accums[0].finalVerify() + + freeHeap(accums) diff --git a/constantine/threadpool/benchmarks/black_scholes/threadpool_black_scholes.nim b/constantine/threadpool/benchmarks/black_scholes/threadpool_black_scholes.nim index 07bc4b3..1610304 100644 --- a/constantine/threadpool/benchmarks/black_scholes/threadpool_black_scholes.nim +++ b/constantine/threadpool/benchmarks/black_scholes/threadpool_black_scholes.nim @@ -337,7 +337,7 @@ proc main() = flt = ru.ru_minflt let start = wtime_msec() - var tp = Threadpool.new(numThreads = nthreads) + let tp = Threadpool.new(numThreads = nthreads) tp.blackScholesConstantine(ctx.addr) tp.shutdown() diff --git a/constantine/threadpool/demos/raytracing/smallpt.nim b/constantine/threadpool/demos/raytracing/smallpt.nim index afb8b4d..2d1ee50 100644 --- a/constantine/threadpool/demos/raytracing/smallpt.nim +++ b/constantine/threadpool/demos/raytracing/smallpt.nim @@ -223,7 +223,7 @@ when compileOption("threads"): # We need the buffer raw address let buf = cast[ptr UncheckedArray[Vec]](C[0].addr) - var tp = Threadpool.new() + let tp = Threadpool.new() tp.parallelFor y in 0 ..< h: # Loop over image rows captures: {tp, buf, samples} @@ -269,7 +269,7 @@ when compileOption("threads"): # We need the buffer raw address let buf = cast[ptr UncheckedArray[Vec]](C[0].addr) - var tp = Threadpool.new() + let tp = Threadpool.new() tp.parallelFor y in 0 ..< h: # Loop over image rows captures: {buf, samples} diff --git a/constantine/threadpool/examples/e01_simple_tasks.nim b/constantine/threadpool/examples/e01_simple_tasks.nim index 931e598..8d468a2 100644 --- a/constantine/threadpool/examples/e01_simple_tasks.nim +++ b/constantine/threadpool/examples/e01_simple_tasks.nim @@ -16,7 +16,7 @@ block: # Async without result echo "\nSanity check 1: Printing 123456 654321 in parallel" - var tp = Threadpool.new(numThreads = 4) + let tp = Threadpool.new(numThreads = 4) tp.spawn displayInt(123456) tp.spawn displayInt(654321) tp.shutdown() diff --git a/constantine/threadpool/examples/e02_parallel_pi.nim b/constantine/threadpool/examples/e02_parallel_pi.nim index 3407652..31461f5 100644 --- a/constantine/threadpool/examples/e02_parallel_pi.nim +++ b/constantine/threadpool/examples/e02_parallel_pi.nim @@ -28,7 +28,7 @@ proc main() = var n = 1_000_000 var nthreads = countProcessors() - var tp = Threadpool.new(num_threads = nthreads) # Default to the number of hardware threads. + let tp = Threadpool.new(num_threads = nthreads) # Default to the number of hardware threads. echo formatFloat(tp.piApprox(n)) diff --git a/constantine/threadpool/examples/e03_parallel_for.nim b/constantine/threadpool/examples/e03_parallel_for.nim index c462f00..7973ec9 100644 --- a/constantine/threadpool/examples/e03_parallel_for.nim +++ b/constantine/threadpool/examples/e03_parallel_for.nim @@ -6,7 +6,7 @@ block: echo "Running 'threadpool/examples/e03_parallel_for.nim'" echo "==============================================================================================" - var tp = Threadpool.new(numThreads = 4) + let tp = Threadpool.new(numThreads = 4) tp.parallelFor i in 0 ..< 100: log("%d\n", i) @@ -24,7 +24,7 @@ block: # Capturing outside scope echo "Running 'threadpool/examples/e03_parallel_for.nim'" echo "==============================================================================================" - var tp = Threadpool.new(numThreads = 4) + let tp = Threadpool.new(numThreads = 4) var a = 100 var b = 10 @@ -45,7 +45,7 @@ block: # Nested loops echo "Running 'threadpool/examples/e03_parallel_for.nim'" echo "==============================================================================================" - var tp = Threadpool.new(numThreads = 4) + let tp = Threadpool.new(numThreads = 4) tp.parallelFor i in 0 ..< 4: tp.parallelFor j in 0 ..< 8: diff --git a/constantine/threadpool/examples/e04_parallel_reduce.nim b/constantine/threadpool/examples/e04_parallel_reduce.nim index c7cadda..e113a44 100644 --- a/constantine/threadpool/examples/e04_parallel_reduce.nim +++ b/constantine/threadpool/examples/e04_parallel_reduce.nim @@ -20,7 +20,7 @@ block: result = sync(globalSum) - var tp = Threadpool.new(numThreads = 4) + let tp = Threadpool.new(numThreads = 4) let sum1M = tp.sumReduce(1000000) echo "Sum reduce(0..1000000): ", sum1M diff --git a/constantine/threadpool/parallel_offloading.nim b/constantine/threadpool/parallel_offloading.nim index b963855..0e7d080 100644 --- a/constantine/threadpool/parallel_offloading.nim +++ b/constantine/threadpool/parallel_offloading.nim @@ -38,7 +38,7 @@ import proc needTempStorage(argTy: NimNode): bool = case argTy.kind of nnkVarTy: - error("It is unsafe to capture a `var` parameter and pass it to another thread. Its memory location could be invalidated if the spawning proc returns before the worker thread finishes.") + error("It is unsafe to capture a `var` parameter '" & repr(argTy) & "' and pass it to another thread. Its memory location could be invalidated if the spawning proc returns before the worker thread finishes.") of nnkStaticTy: return false of nnkBracketExpr: diff --git a/constantine/threadpool/threadpool.nim b/constantine/threadpool/threadpool.nim index 1ce8631..e0de206 100644 --- a/constantine/threadpool/threadpool.nim +++ b/constantine/threadpool/threadpool.nim @@ -950,7 +950,7 @@ proc new*(T: type Threadpool, numThreads = countProcessors()): T {.raises: [Reso ## will not impact correctness but may impact performance. type TpObj = typeof(default(Threadpool)[]) # due to C import, we need a dynamic sizeof - var tp = allocHeapUncheckedAlignedPtr(Threadpool, sizeof(TpObj), alignment = 64) + let tp = allocHeapUncheckedAlignedPtr(Threadpool, sizeof(TpObj), alignment = 64) tp.barrier.init(numThreads.uint32) tp.globalBackoff.initialize() @@ -978,7 +978,7 @@ proc new*(T: type Threadpool, numThreads = countProcessors()): T {.raises: [Reso profileStart(run_task) return tp -proc cleanup(tp: var Threadpool) {.raises: [].} = +proc cleanup(tp: Threadpool) {.raises: [].} = ## Cleanup all resources allocated by the threadpool preCondition: workerContext.currentTask.isRootTask() @@ -993,7 +993,7 @@ proc cleanup(tp: var Threadpool) {.raises: [].} = tp.freeHeapAligned() -proc shutdown*(tp: var Threadpool) {.raises:[].} = +proc shutdown*(tp: Threadpool) {.raises:[].} = ## Wait until all tasks are processed and then shutdown the threadpool preCondition: workerContext.currentTask.isRootTask() tp.syncAll() diff --git a/tests/parallel/t_ec_template_parallel.nim b/tests/parallel/t_ec_template_parallel.nim index dc9da46..bda269a 100644 --- a/tests/parallel/t_ec_template_parallel.nim +++ b/tests/parallel/t_ec_template_parallel.nim @@ -83,7 +83,7 @@ proc run_EC_batch_add_parallel_impl*[N: static int]( for n in numPoints: test $ec & " parallel sum reduction (N=" & $n & ")": proc test(EC: typedesc, gen: RandomGen) = - var tp = Threadpool.new() + let tp = Threadpool.new() defer: tp.shutdown() var points = newSeq[ECP_ShortW_Aff[EC.F, EC.G]](n) @@ -108,7 +108,7 @@ proc run_EC_batch_add_parallel_impl*[N: static int]( test "EC " & $ec.G & " parallel sum reduction (N=" & $n & ") - special cases": proc test(EC: typedesc, gen: RandomGen) = - var tp = Threadpool.new() + let tp = Threadpool.new() defer: tp.shutdown() var points = newSeq[ECP_ShortW_Aff[EC.F, EC.G]](n) @@ -162,7 +162,7 @@ proc run_EC_multi_scalar_mul_parallel_impl*[N: static int]( let bucketBits = bestBucketBitSize(n, ec.F.C.getCurveOrderBitwidth(), useSignedBuckets = false, useManualTuning = false) test $ec & " Parallel Multi-scalar-mul (N=" & $n & ", bucket bits: " & $bucketBits & ")": proc test(EC: typedesc, gen: RandomGen) = - var tp = Threadpool.new() + let tp = Threadpool.new() defer: tp.shutdown() var points = newSeq[ECP_ShortW_Aff[EC.F, EC.G]](n) var coefs = newSeq[BigInt[EC.F.C.getCurveOrderBitwidth()]](n) diff --git a/tests/t_ethereum_bls_signatures.nim b/tests/t_ethereum_bls_signatures.nim index 6dcf4cb..4f36d25 100644 --- a/tests/t_ethereum_bls_signatures.nim +++ b/tests/t_ethereum_bls_signatures.nim @@ -9,9 +9,10 @@ import std/[os, unittest, strutils], pkg/jsony, - ../constantine/ethereum_bls_signatures, + ../constantine/ethereum_bls_signatures_parallel, ../constantine/serialization/codecs, - ../constantine/hashes + ../constantine/hashes, + ../constantine/threadpool/threadpool type # https://github.com/ethereum/bls12-381-tests/blob/master/formats/ @@ -301,6 +302,13 @@ testGen(batch_verify, testVector, BatchVerify_test): status[0] = pubkeys.batch_verify(testVector.input.messages, signatures, randomBytes) + let tp = Threadpool.new(numThreads = 4) + let parallelStatus = tp.batch_verify_parallel(pubkeys, testVector.input.messages, signatures, randomBytes) + doAssert status[0] == parallelStatus, block: + "\nSerial status: " & $status[0] & + "\nParallel status: " & $parallelStatus & '\n' + tp.shutdown() + let success = status == (cttBLS_Success, cttCodecEcc_Success) doAssert success == testVector.output, block: "Verification differs from expected \n" & diff --git a/tests/t_ethereum_bls_signatures.nim.cfg b/tests/t_ethereum_bls_signatures.nim.cfg new file mode 100644 index 0000000..9d57ecf --- /dev/null +++ b/tests/t_ethereum_bls_signatures.nim.cfg @@ -0,0 +1 @@ +--threads:on \ No newline at end of file diff --git a/tests/t_ethereum_eip4844_deneb_kzg.nim b/tests/t_ethereum_eip4844_deneb_kzg.nim index 49dd019..49b4cdc 100644 --- a/tests/t_ethereum_eip4844_deneb_kzg.nim +++ b/tests/t_ethereum_eip4844_deneb_kzg.nim @@ -14,7 +14,7 @@ import # Internals ../constantine/hashes, ../constantine/serialization/codecs, - ../constantine/ethereum_eip4844_kzg_polynomial_commitments + ../constantine/ethereum_eip4844_kzg # Organization # diff --git a/tests/t_ethereum_eip4844_deneb_kzg_parallel.nim b/tests/t_ethereum_eip4844_deneb_kzg_parallel.nim new file mode 100644 index 0000000..d6aa0a4 --- /dev/null +++ b/tests/t_ethereum_eip4844_deneb_kzg_parallel.nim @@ -0,0 +1,274 @@ +# Constantine +# Copyright (c) 2018-2019 Status Research & Development GmbH +# Copyright (c) 2020-Present Mamy André-Ratsimbazafy +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import + # Standard library + std/[os, strutils, streams, unittest], + # 3rd party + pkg/yaml, + # Internals + ../constantine/hashes, + ../constantine/serialization/codecs, + ../constantine/ethereum_eip4844_kzg_parallel, + ../constantine/threadpool/threadpool + +# Organization +# +# We choose not to use a type schema here, unlike with the other json-based tests +# like: +# - t_ethereum_bls_signatures +# - t_ethereum_evem_precompiles +# +# They'll add a lot of verbosity due to all the KZG types +# and failure modes (subgroups, ...) +# https://nimyaml.org/serialization.html + +const + TestVectorsDir = + currentSourcePath.rsplit(DirSep, 1)[0] / "protocol_ethereum_eip4844_deneb_kzg" + +const SkippedTests = [ + "" +] + +iterator walkTests*(testDir: string, skipped: var int): (string, string) = + for file in walkDirRec(testDir, relative = true): + if file in SkippedTests: + echo "[WARNING] Skipping - ", file + inc skipped + continue + + yield (testDir, file) + +proc loadVectors(filename: string): YamlNode = + var s = filename.openFileStream() + defer: s.close() + load(s, result) + +template testGen*(name, testData: untyped, body: untyped): untyped {.dirty.} = + ## Generates a test proc + ## with identifier "test_name" + ## The test vector data is available as JsonNode under the + ## the variable passed as `testData` + proc `test _ name`(tp: Threadpool, ctx: ptr EthereumKZGContext) = + var count = 0 # Need to fail if walkDir doesn't return anything + var skipped = 0 + const testdir = TestVectorsDir / astToStr(name)/"small" + for dir, file in walkTests(testdir, skipped): + stdout.write(" " & alignLeft(astToStr(name) & " test:", 36) & alignLeft(file, 90)) + let testData = loadVectors(dir/file) + + body + + inc count + + doAssert count > 0, "Empty or inexisting test folder: " & astToStr(name) + if skipped > 0: + echo "[Warning]: ", skipped, " tests skipped." + +template parseAssign(dstVariable: untyped, size: static int, hexInput: string) = + block: + let prefixBytes = 2*int(hexInput.startsWith("0x")) + let expectedLength = size*2 + prefixBytes + if hexInput.len != expectedLength: + let encodedBytes = (hexInput.len - prefixBytes) div 2 + stdout.write "[ Incorrect input length for '" & + astToStr(dstVariable) & + "': encoding " & $encodedBytes & " bytes" & + " instead of expected " & $size & " ]\n" + + doAssert testVector["output"].content == "null" + # We're in a template, this shortcuts the caller `walkTests` + continue + + var dstVariable{.inject.} = new(array[size, byte]) + dstVariable[].fromHex(hexInput) + +template parseAssignList(dstVariable: untyped, elemSize: static int, hexListInput: YamlNode) = + + var dstVariable{.inject.} = newSeq[array[elemSize, byte]]() + + block exitHappyPath: + block exitException: + for elem in hexListInput: + let hexInput = elem.content + + let prefixBytes = 2*int(hexInput.startsWith("0x")) + let expectedLength = elemSize*2 + prefixBytes + if hexInput.len != expectedLength: + let encodedBytes = (hexInput.len - prefixBytes) div 2 + stdout.write "[ Incorrect input length for '" & + astToStr(dstVariable) & + "': encoding " & $encodedBytes & " bytes" & + " instead of expected " & $elemSize & " ]\n" + + doAssert testVector["output"].content == "null" + break exitException + else: + dstVariable.setLen(dstVariable.len + 1) + dstVariable[^1].fromHex(hexInput) + + break exitHappyPath + + # We're in a template, this shortcuts the caller `walkTests` + continue + +testGen(blob_to_kzg_commitment, testVector): + parseAssign(blob, 32*4096, testVector["input"]["blob"].content) + + var commitment: array[48, byte] + + let status = tp.blob_to_kzg_commitment_parallel(ctx, commitment, blob[].addr) + stdout.write "[" & $status & "]\n" + + if status == cttEthKZG_Success: + parseAssign(expectedCommit, 48, testVector["output"].content) + doAssert bool(commitment == expectedCommit[]), block: + "\ncommitment: " & commitment.toHex() & + "\nexpected: " & expectedCommit[].toHex() & "\n" + else: + doAssert testVector["output"].content == "null" + +testGen(compute_kzg_proof, testVector): + parseAssign(blob, 32*4096, testVector["input"]["blob"].content) + parseAssign(z, 32, testVector["input"]["z"].content) + + var proof: array[48, byte] + var y: array[32, byte] + + let status = tp.compute_kzg_proof_parallel(ctx, proof, y, blob[].addr, z[]) + stdout.write "[" & $status & "]\n" + + if status == cttEthKZG_Success: + parseAssign(expectedEvalAtChallenge, 32, testVector["output"][1].content) + parseAssign(expectedProof, 48, testVector["output"][0].content) + + doAssert bool(y == expectedEvalAtChallenge[]), block: + "\ny (= p(z)): " & y.toHex() & + "\nexpected: " & expectedEvalAtChallenge[].toHex() & "\n" + doAssert bool(proof == expectedProof[]), block: + "\nproof: " & proof.toHex() & + "\nexpected: " & expectedProof[].toHex() & "\n" + else: + doAssert testVector["output"].content == "null" + +testGen(verify_kzg_proof, testVector): + parseAssign(commitment, 48, testVector["input"]["commitment"].content) + parseAssign(z, 32, testVector["input"]["z"].content) + parseAssign(y, 32, testVector["input"]["y"].content) + parseAssign(proof, 48, testVector["input"]["proof"].content) + + let status = verify_kzg_proof(ctx, commitment[], z[], y[], proof[]) + stdout.write "[" & $status & "]\n" + + if status == cttEthKZG_Success: + doAssert testVector["output"].content == "true" + elif status == cttEthKZG_VerificationFailure: + doAssert testVector["output"].content == "false" + else: + doAssert testVector["output"].content == "null" + +testGen(compute_blob_kzg_proof, testVector): + parseAssign(blob, 32*4096, testVector["input"]["blob"].content) + parseAssign(commitment, 48, testVector["input"]["commitment"].content) + + var proof: array[48, byte] + + let status = tp.compute_blob_kzg_proof_parallel(ctx, proof, blob[].addr, commitment[]) + stdout.write "[" & $status & "]\n" + + if status == cttEthKZG_Success: + parseAssign(expectedProof, 48, testVector["output"].content) + + doAssert bool(proof == expectedProof[]), block: + "\nproof: " & proof.toHex() & + "\nexpected: " & expectedProof[].toHex() & "\n" + else: + doAssert testVector["output"].content == "null" + +testGen(verify_blob_kzg_proof, testVector): + parseAssign(blob, 32*4096, testVector["input"]["blob"].content) + parseAssign(commitment, 48, testVector["input"]["commitment"].content) + parseAssign(proof, 48, testVector["input"]["proof"].content) + + let status = tp.verify_blob_kzg_proof_parallel(ctx, blob[].addr, commitment[], proof[]) + stdout.write "[" & $status & "]\n" + + if status == cttEthKZG_Success: + doAssert testVector["output"].content == "true" + elif status == cttEthKZG_VerificationFailure: + doAssert testVector["output"].content == "false" + else: + doAssert testVector["output"].content == "null" + +testGen(verify_blob_kzg_proof_batch, testVector): + parseAssignList(blobs, 32*4096, testVector["input"]["blobs"]) + parseAssignList(commitments, 48, testVector["input"]["commitments"]) + parseAssignList(proofs, 48, testVector["input"]["proofs"]) + + if blobs.len != commitments.len: + stdout.write "[ Length mismatch between blobs and commitments ]\n" + doAssert testVector["output"].content == "null" + continue + if blobs.len != proofs.len: + stdout.write "[ Length mismatch between blobs and proofs ]\n" + doAssert testVector["output"].content == "null" + continue + + # For reproducibility/debugging we don't use the CSPRNG here + var randomBlinding {.noInit.}: array[32, byte] + sha256.hash(randomBlinding, "The wizard quickly jinxed the gnomes before they vaporized.") + + template asUnchecked[T](a: openArray[T]): ptr UncheckedArray[T] = + if a.len > 0: + cast[ptr UncheckedArray[T]](a[0].unsafeAddr) + else: + nil + + let status = tp.verify_blob_kzg_proof_batch_parallel( + ctx, + blobs.asUnchecked(), + commitments.asUnchecked(), + proofs.asUnchecked(), + blobs.len, + randomBlinding) + stdout.write "[" & $status & "]\n" + + if status == cttEthKZG_Success: + doAssert testVector["output"].content == "true" + elif status == cttEthKZG_VerificationFailure: + doAssert testVector["output"].content == "false" + else: + doAssert testVector["output"].content == "null" + +block: + suite "Ethereum Deneb Hardfork / EIP-4844 / Proto-Danksharding / KZG Polynomial Commitments (Parallel)": + let ctx = load_ethereum_kzg_test_trusted_setup_mainnet() + let tp = Threadpool.new() + + test "blob_to_kzg_commitment_parallel(tp: Threadpool, dst: var array[48, byte], blob: ptr array[4096, byte])": + test_blob_to_kzg_commitment(tp, ctx) + + test "compute_kzg_proof_parallel(tp: Threadpool, proof: var array[48, byte], y: var array[32, byte], blob: ptr array[4096, byte], z: array[32, byte])": + test_compute_kzg_proof(tp, ctx) + + # Not parallelized + # test "verify_kzg_proof(commitment: array[48, byte], z, y: array[32, byte], proof: array[48, byte]) -> bool": + # test_verify_kzg_proof(tp, ctx) + + test "compute_blob_kzg_proof_parallel(tp: Threadpool, proof: var array[48, byte], blob: ptr array[4096, byte], commitment: array[48, byte])": + test_compute_blob_kzg_proof(tp, ctx) + + test "verify_blob_kzg_proof_parallel(tp: Threadpool, blob: ptr array[4096, byte], commitment, proof: array[48, byte])": + test_verify_blob_kzg_proof(tp, ctx) + + test "verify_blob_kzg_proof_batch_parallel(tp: Threadpool, blobs: ptr UncheckedArray[array[4096, byte]], commitments, proofs: ptr UncheckedArray[array[48, byte]], n: int, secureRandomBytes: array[32, byte])": + test_verify_blob_kzg_proof_batch(tp, ctx) + + tp.shutdown() + ctx.delete() diff --git a/tests/t_ethereum_eip4844_deneb_kzg_parallel.nim.cfg b/tests/t_ethereum_eip4844_deneb_kzg_parallel.nim.cfg new file mode 100644 index 0000000..fdc54e2 --- /dev/null +++ b/tests/t_ethereum_eip4844_deneb_kzg_parallel.nim.cfg @@ -0,0 +1,3 @@ +# NimYAML requires ORC instead of ARC for memory management to deal with cycles +--mm:orc +--threads:on \ No newline at end of file